Cloud DataflowをCloud Functionsから起動する

  • #Cloud Dataflow
  • #Cloud Functions
  • #Google Cloud Platform(GCP)
  • #Python
  • #Go言語

こんにちは。開発チームの南です。 今回はCloud DataflowをCloud Functinsから起動する方法について紹介しようと思います。

はじめに

Cloud Dataflowを使っていると、特定のタイミングで実行したい時ってありますよね。 そんな時の方法の一つとして、Cloud FunctionsからCloud Dataflowを起動する方法を紹介します。

対象読者

Cloud Dataflowを利用したことがある Cloud Dataflowを任意のタイミングで実行したい

手順

以下のような手順で作成していきます。

  1. Dataflowテンプレートを作成する
  2. カスタムDockerイメージでDataflow テンプレートジョブを作成する
  3. Cloud FunctionsからDataflowテンプレートジョブを起動する

1. Dataflowテンプレートを作成する

apache-beamをインストールします

pip install ‘apache-beam[gcp]

dataflowを実行する以下のようなファイルを作成します。 辞書型の文字列をCloud Storageにtextファイルとして保存する簡単な処理となっています。

import apache_beam as beam
import logging
import argparse
from apache_beam.pipeline import PipelineOptions

def run(
    gcs_output_path,
    pipeline_args=None
):
    pipeline_options = PipelineOptions(
        pipeline_args,
        streaming=False,
        save_main_session=True,
    )
    with beam.Pipeline(options=pipeline_options) as p:
        (p
        | "Create Elements" >> beam.Create({
            "key1": "value1",
            "key2": "value2"
        })
        | "Write to GCS" >> beam.io.WriteToText(gcs_output_path))

if __name__=='__main__':
    logging.getLogger().setLevel(logging.INFO)
    parser = argparse.ArgumentParser()

    parser.add_argument(
        "--gcs_output_path"
    )

    known_args, pipeline_args = parser.parse_known_args()

    run(
        known_args.gcs_output_path,
        pipeline_args
    )

2. カスタムDockerイメージでDataflow Flex テンプレートを作成する

DataflowをCloud Functionsから利用できるようにするためのテンプレートを作成します。 テンプレートは、Dockerイメージを利用して作成します。 まず、以下のようなDockerfileを作成します。 baseイメージはGCPの公式のものを使用すると便利です。 https://cloud.google.com/dataflow/docs/reference/flex-templates-launcher-images

FROM gcr.io/dataflow-templates-base/python39-template-launcher-base

ARG WORKDIR=/dataflow/template
RUN mkdir -p ${WORKDIR}
WORKDIR ${WORKDIR}

COPY . .
RUN apt-get update && apt-get install -y

ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/write_to_gcs.py"

RUN pip install --upgrade pip
RUN pip install apache-beam[gcp]==2.38.0

作成したDockerイメージをbuildします。

gcloud builds submit -t ${IMAGE_URL} --project=${PROJECT_NAME};

metadata.jsonファイルを作成します。 metadata.jsonを利用することで、dataflowを起動する際のパラメータを追加できます。 今回は、出力するCloud Storageのパスをパラメータとして指定できるようにしました。

{
    "name": "Streaming beam Python flex template",
    "parameters": [
        {
            "name": "gcs_output_path",
            "label": "gcs_output_path",
            "helpText": "gcs outpu path"
        }
    ]
}

dataflow flexテンプレートを作成します。

gcloud dataflow flex-template build ${FLEX_TEMPLATE_PATH} \
    --image ${IMAGE_URL} \
    --sdk-language "PYTHON" \
    --metadata-file "metadata.json"

3. Cloud FunctionsからDataflowテンプレートジョブを起動する

作成したDataflowテンプレートをCloud Functionsから起動します。 Cloud Functionsはgo言語で作成しました。

package launch

import (
    "context"
    "log"
    "net/http"
    "os"

    dataflow "cloud.google.com/go/dataflow/apiv1beta3"
    dataflowpb "google.golang.org/genproto/googleapis/dataflow/v1beta3"
)

var (
    GCSOutputPath       string
    ProjectName         string
    Location            string
    FlexTemplateGCSPath string
    MachineType         string
    JobName             string
)

func init() {
    ProjectName = os.Getenv("PROJECT_NAME")
    Location = os.Getenv("LOCATION")
    FlexTemplateGCSPath = os.Getenv("FLEX_TEMPLATE_GCS_PATH")
    JobName = os.Getenv("JOB_NAME")
    GCSOutputPath = os.Getenv("GCS_OUTPUT_PATH")
}

func LaunchDataflowTest(w http.ResponseWriter, r *http.Request) {
    ctx := context.Background()
    client, err := dataflow.NewFlexTemplatesClient(ctx)
    if err != nil {
        w.WriteHeader(500)
        return
    }
    defer client.Close()

    req := mapLaunchFlexTemplateRequest()
    log.Println(req)
    _, err = client.LaunchFlexTemplate(ctx, req)
    if err != nil {
        w.WriteHeader(500)
        return
    }
}

func mapLaunchFlexTemplateRequest() *dataflowpb.LaunchFlexTemplateRequest {
    return &dataflowpb.LaunchFlexTemplateRequest{
        ProjectId:       ProjectName,
        Location:        Location,
        LaunchParameter: mapLaunchFlexTemplateParameter(),
    }
}

func mapLaunchFlexTemplateParameter() *dataflowpb.LaunchFlexTemplateParameter {
    return &dataflowpb.LaunchFlexTemplateParameter{
        JobName: JobName,
        Template: &dataflowpb.LaunchFlexTemplateParameter_ContainerSpecGcsPath{
            ContainerSpecGcsPath: FlexTemplateGCSPath,
        },
        Environment: &dataflowpb.FlexTemplateRuntimeEnvironment{
            MachineType: MachineType,
        },
        Parameters: map[string]string{
            "gcs_output_path": GCSOutputPath,
        },
    }
}

Cloud Functionsをデプロイします。

    gcloud functions deploy LaunchDataflowTest \
    --project=${PROJECT_NAME} \
    --allow-unauthenticated \
    --trigger-http \
    --region=${REGION} \
    --runtime=go116 \
    --source=./../launch \
    --set-env-vars \
    PROJECT_NAME=${PROJECT_NAME},\
    LOCATION=${REGION},\
    FLEX_TEMPLATE_GCS_PATH=${FLEX_TEMPLATE_PATH},\
    JOB_NAME=${JOB_NAME},\
    MACHENE_TYPE=${MACHENE_TYPE},\
    GCS_OUTPUT_PATH=${GCS_FILE_OUTPUT}

Cloud Functionsを起動します。 (urlをプロジェクト名・リージョンによって変更してください)

curl https://asia-northeast1-curucuru-logs.cloudfunctions.net/LaunchDataflowTest

Dataflowが起動し、正常に動作することを確認できました。

最後に

今回は、Cloud DataflowをCloud Functionsから起動する方法について見てきました。 今回、紹介した方法が皆さんの参考になれば幸いです。

CURUCURUでは、仲間になってくれるエンジニアの方を募集しております! 少しでも興味をお持ちいただけたら、気軽にカジュアル面談しませんか? ご応募お待ちしております! https://www.wantedly.com/companies/curucuru