Cloud DataflowをCloud Functionsから起動する
こんにちは。開発チームの南です。 今回はCloud DataflowをCloud Functinsから起動する方法について紹介しようと思います。
はじめに
Cloud Dataflowを使っていると、特定のタイミングで実行したい時ってありますよね。 そんな時の方法の一つとして、Cloud FunctionsからCloud Dataflowを起動する方法を紹介します。
対象読者
Cloud Dataflowを利用したことがある Cloud Dataflowを任意のタイミングで実行したい
手順
以下のような手順で作成していきます。
- Dataflowテンプレートを作成する
- カスタムDockerイメージでDataflow テンプレートジョブを作成する
- Cloud FunctionsからDataflowテンプレートジョブを起動する
1. Dataflowテンプレートを作成する
apache-beamをインストールします
pip install ‘apache-beam[gcp]’
dataflowを実行する以下のようなファイルを作成します。 辞書型の文字列をCloud Storageにtextファイルとして保存する簡単な処理となっています。
import apache_beam as beam
import logging
import argparse
from apache_beam.pipeline import PipelineOptions
def run(
gcs_output_path,
pipeline_args=None
):
pipeline_options = PipelineOptions(
pipeline_args,
streaming=False,
save_main_session=True,
)
with beam.Pipeline(options=pipeline_options) as p:
(p
| "Create Elements" >> beam.Create({
"key1": "value1",
"key2": "value2"
})
| "Write to GCS" >> beam.io.WriteToText(gcs_output_path))
if __name__=='__main__':
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument(
"--gcs_output_path"
)
known_args, pipeline_args = parser.parse_known_args()
run(
known_args.gcs_output_path,
pipeline_args
)
2. カスタムDockerイメージでDataflow Flex テンプレートを作成する
DataflowをCloud Functionsから利用できるようにするためのテンプレートを作成します。 テンプレートは、Dockerイメージを利用して作成します。 まず、以下のようなDockerfileを作成します。 baseイメージはGCPの公式のものを使用すると便利です。 https://cloud.google.com/dataflow/docs/reference/flex-templates-launcher-images
FROM gcr.io/dataflow-templates-base/python39-template-launcher-base
ARG WORKDIR=/dataflow/template
RUN mkdir -p ${WORKDIR}
WORKDIR ${WORKDIR}
COPY . .
RUN apt-get update && apt-get install -y
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/write_to_gcs.py"
RUN pip install --upgrade pip
RUN pip install apache-beam[gcp]==2.38.0
作成したDockerイメージをbuildします。
gcloud builds submit -t ${IMAGE_URL} --project=${PROJECT_NAME};
metadata.jsonファイルを作成します。 metadata.jsonを利用することで、dataflowを起動する際のパラメータを追加できます。 今回は、出力するCloud Storageのパスをパラメータとして指定できるようにしました。
{
"name": "Streaming beam Python flex template",
"parameters": [
{
"name": "gcs_output_path",
"label": "gcs_output_path",
"helpText": "gcs outpu path"
}
]
}
dataflow flexテンプレートを作成します。
gcloud dataflow flex-template build ${FLEX_TEMPLATE_PATH} \
--image ${IMAGE_URL} \
--sdk-language "PYTHON" \
--metadata-file "metadata.json"
3. Cloud FunctionsからDataflowテンプレートジョブを起動する
作成したDataflowテンプレートをCloud Functionsから起動します。 Cloud Functionsはgo言語で作成しました。
package launch
import (
"context"
"log"
"net/http"
"os"
dataflow "cloud.google.com/go/dataflow/apiv1beta3"
dataflowpb "google.golang.org/genproto/googleapis/dataflow/v1beta3"
)
var (
GCSOutputPath string
ProjectName string
Location string
FlexTemplateGCSPath string
MachineType string
JobName string
)
func init() {
ProjectName = os.Getenv("PROJECT_NAME")
Location = os.Getenv("LOCATION")
FlexTemplateGCSPath = os.Getenv("FLEX_TEMPLATE_GCS_PATH")
JobName = os.Getenv("JOB_NAME")
GCSOutputPath = os.Getenv("GCS_OUTPUT_PATH")
}
func LaunchDataflowTest(w http.ResponseWriter, r *http.Request) {
ctx := context.Background()
client, err := dataflow.NewFlexTemplatesClient(ctx)
if err != nil {
w.WriteHeader(500)
return
}
defer client.Close()
req := mapLaunchFlexTemplateRequest()
log.Println(req)
_, err = client.LaunchFlexTemplate(ctx, req)
if err != nil {
w.WriteHeader(500)
return
}
}
func mapLaunchFlexTemplateRequest() *dataflowpb.LaunchFlexTemplateRequest {
return &dataflowpb.LaunchFlexTemplateRequest{
ProjectId: ProjectName,
Location: Location,
LaunchParameter: mapLaunchFlexTemplateParameter(),
}
}
func mapLaunchFlexTemplateParameter() *dataflowpb.LaunchFlexTemplateParameter {
return &dataflowpb.LaunchFlexTemplateParameter{
JobName: JobName,
Template: &dataflowpb.LaunchFlexTemplateParameter_ContainerSpecGcsPath{
ContainerSpecGcsPath: FlexTemplateGCSPath,
},
Environment: &dataflowpb.FlexTemplateRuntimeEnvironment{
MachineType: MachineType,
},
Parameters: map[string]string{
"gcs_output_path": GCSOutputPath,
},
}
}
Cloud Functionsをデプロイします。
gcloud functions deploy LaunchDataflowTest \
--project=${PROJECT_NAME} \
--allow-unauthenticated \
--trigger-http \
--region=${REGION} \
--runtime=go116 \
--source=./../launch \
--set-env-vars \
PROJECT_NAME=${PROJECT_NAME},\
LOCATION=${REGION},\
FLEX_TEMPLATE_GCS_PATH=${FLEX_TEMPLATE_PATH},\
JOB_NAME=${JOB_NAME},\
MACHENE_TYPE=${MACHENE_TYPE},\
GCS_OUTPUT_PATH=${GCS_FILE_OUTPUT}
Cloud Functionsを起動します。 (urlをプロジェクト名・リージョンによって変更してください)
curl https://asia-northeast1-curucuru-logs.cloudfunctions.net/LaunchDataflowTest
Dataflowが起動し、正常に動作することを確認できました。
最後に
今回は、Cloud DataflowをCloud Functionsから起動する方法について見てきました。 今回、紹介した方法が皆さんの参考になれば幸いです。
CURUCURUでは、仲間になってくれるエンジニアの方を募集しております! 少しでも興味をお持ちいただけたら、気軽にカジュアル面談しませんか? ご応募お待ちしております! https://www.wantedly.com/companies/curucuru