[Python]Cloud DataflowからCloud SQLに接続する

  • #Cloud Dataflow
  • #Cloud SQL
  • #Google Cloud Platform
  • #Python

はじめに

こんにちは。開発チームの南です。 GCPのETLサービスであるCloud Dataflowですが、 BigQueryやDatastoreとの連携は、SDKとして用意されていますがCloud SQLはありません。 ということで今回はCloud DataflowからCloud SQLに接続する方法について紹介します。

対象読者

Cloud Dataflowを利用したことがある。 Cloud DataflowからCloud SQLを利用したい。

手順

以下の手順で実装していきます。

  1. Cloud SQLのプライベートIPを有効する
  2. Cloud DataflowのParDoにてCloud SQLに接続する
  3. Cloud Dataflow起動時にCloud SQLに接続しているネットワークを指定する

1. Cloud SQLのプライベートIPを有効する

Cloud SQLインスタンスにプライベートIPを使用して接続します。 また、プライベート接続を作成するネットワークを選択する必要があります。 今回は「default」というネットワークを選択しました。

CloudSQLインスタンスの設定

テスト用のテーブルを以下のように作成しました。

CREATE DATABASE connect_by_dataflow DEFAULT CHARACTER SET utf8;
USE connect_by_dataflow;
SET SQL_SAFE_UPDATES=0;

CREATE TABLE `users` (
    `id` CHAR(8) not null primary key,
    `name` VARCHAR(100) not null
)ENGINE=InnoDB DEFAULT CHARSET=utf8;

2. Cloud DataflowのParDoにてCloud SQLに接続する

ここからは、Cloud DataflowからCloud SQLに接続するコードを書いていきます。 まずは、apache beamの公式が提供しているパッケージとmysqlに接続するためパッケージをインストールします。

pip install 'apache-beam[gcp] mysql-connector-python

次にCloud Dataflowを起動するためのソースコードを書きます。 インメモリからデータを読み込み、先ほと作成したCloud SQLのusersテーブルに挿入するコードになっています。

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import mysql.connector as mydb
import argparse
import logging


class WriteToCloudSQL(beam.DoFn):
    def __init__(self, host, password):
        self.host = host
        self.password = password
    
    def process(self, message):
        id = message["id"]
        name = message["name"]
        try:
            conn= mydb.connect(
                host=self.host,
                user="root",
                password=self.password,
                database="connect_by_dataflow"
            )
            cursor = conn.cursor()
            insert = "INSERT INTO users VALUES(%s,%s)"
            cursor.execute(insert, (id, name))
        except Exception as e:
            print(e)
        conn.commit()
        conn.close()

def run(
    host,
    password,
    pipeline_args=None
): 
    pipeline_options=PipelineOptions(
        pipeline_args,
        streaming=False,
        save_main_session=True
    )

    with beam.Pipeline(options=pipeline_options) as p:
        (p
        | beam.Create([
            {"id": "20220001", "name": "curucuru太郎"}
        ]) 
        | beam.ParDo(WriteToCloudSQL(host, password))
        )
if __name__=='__main__':
    logging.getLogger().setLevel(logging.INFO)
    parser = argparse.ArgumentParser()

    parser.add_argument(
        "--cloudsql_host"
    )
    parser.add_argument(
        "--cloudsql_password"
    )

    known_args, pipeline_args = parser.parse_known_args()
    run(
        known_args.cloudsql_host,
        known_args.cloudsql_password,
        pipeline_args
    )

3. Cloud Dataflow起動時にCloud SQLに接続しているネットワークを指定する

最後に、作成したCloud Dataflowを起動します。 起動時に、Cloud SQLに接続しているネットワークを指定する必要があります。 また、mysql接続時に指定するhostは、Cloud SQLのプライベートIPにします。

python connect_cloudsql.py \
--project=${PROJECT_ID} \
--region=${REGION} \
--temp_location=${TEMP_LOCATION} \
--runner=DataflowRunner \
--requirements_file=./requirements.txt \
--machine_type=n1-standard-1 \
--network=${CLOUDSQL_NETWORK} \
--cloudsql_host=${CLOUDSQL_PRIVATE_IP} \
--cloudsql_password=${CLOUDSQL_PASSWORD}

最後に

今回は、Cloud DataflowからCloud SQLに接続する方法を紹介しました。 今回、紹介した方法が皆さんの参考になれば幸いです。

CURUCURUでは、仲間になってくれるエンジニアの方を募集しております! 少しでも興味をお持ちいただけたら、気軽にカジュアル面談しませんか? ご応募お待ちしております! https://www.wantedly.com/companies/curucuru