連続データパイプラインを作ってみる

連続データパイプラインの作成方法をご紹介します。
以下のような方にお勧めとなります。

  • 継続してファイルの内容を取り込みたい
  • ファイル読み込みの監視は行いたくない
  • ウェアハウスの管理、変化する処理量にあわせたスケーリングなどはSnowflakeに任せたい

目次

  1. 連続データパイプラインとは?
  2. 外部ステージの準備
  3. Snowpipeの作り方
     ①準備
     ②Snowpipe作成
     ③権限の設定
     ④イベント通知設定
  4. ストリームの作り方
  5. タスクの作り方
     ①取込先のテーブルを準備
     ②タスクを作成
     ③タスクの実行権限を付与
     ④タスクの起動
  6. データパイプラインを試してみる
  7. 後処理
  8. まとめ

1.連続データパイプラインとは?

データパイプラインは、連続的なデータロードに伴う多くの手順を自動化します。
多くの場合、生データはテーブルに一時的にロードされ、その後にSQLによって変換されて保存先テーブルに挿入されます。
この流れをsnowflakeの以下機能を組み合わせて作成することができます。

①連続データロードの機能

②連続データパイプ

③変更データの追跡

④定期的なタスク

・タスク

今回は①Snowpipe + ③ストリーム + ④タスク を使用して以下のデータパイプラインを作成します。

2.外部ステージの準備

まずは外部ステージを作成します。
作成する外部ステージは、AWSのS3バケット上の特定のフォルダを参照するため、S3バケットに対するデータの読み書き権限が必要になります。
snowflakeのストレージ統合オブジェクトを使うことで、S3バケットに対する読込を許可するIAMロールの権限を取得できます。
ストレージ統合オブジェクト、ストレージ統合を参照する外部ステージは、以下のドキュメントを参考に作成してしてください。

以降の手順では、作成した外部ステージ「MY_S3_STAGE_GREEN」を使用します。

3.Snowpipeの作り方

Snowpipeを使用するとファイルがステージで利用可能になったことを検知して、ファイルからデータを継続的にロードできます。

①準備

今回ロードするファイルは、以下のようなJSON形式のデータを使用します。

Snowpipeがデータをロードする先のテーブルを作成します。
以下のCREATE文を実行して、JSON形式のデータを格納するためのVARIANT型のカラムだけを持つテーブルを作成します。

②Snowpipe作成

今回はAWSの「S3イベント通知」をトリガーとして、データをロードするようにします。
auto_ingest = true と指定することで、メッセージサービスからイベント通知を受信したときに、指定した外部ステージからデータファイルを自動的にロードします。

③権限の設定

Snowpipeを使用するには、次の権限を持つロールが必要となります。

オブジェクト権限
名前付きパイプOWNERSHIP
名前付きステージUSAGE , READ
名前付きファイル形式USAGE
ターゲットデータベースUSAGE
ターゲットスキーマUSAGE
ターゲットテーブルINSERT , SELECT

※名前付きファイル形式は作成した外部ステージが名前付きファイル形式を参照する場合にのみ必要です。
 今回は使用しないため、権限の確認は行いません。

今回はユーザ「SNOWPIPE_USER」に権限を設定します。
※「最小権限」の原則に従うためパイプを使用する個別のユーザとロールを作成することが推奨されています。

設定した権限を確認します。

必要な権限が付与されていることが確認できました。

④イベント通知設定

AWSのS3バケットのイベント通知を構成して、新しいデータをロードできるようになったときにSnowpipeに通知します。

ARNを取得

SHOW PIPES コマンドを実行します。

取得結果

notification_channel 列にあるSQS キューの ARN をコピーします。

S3バケットのイベント通知を構成

Amazon S3コンソールにログインします。
Amazon S3ドキュメント に記載されている手順を使用して、S3バケットのイベント通知を構成します。

AWSの管理コンソールから、外部ステージとして使用するS3バケットの画面を開き
「プロパティ」タブの「イベント通知を作成」を選択します。

  • イベント名:任意の名前を入力します。
  • イベント:「すべてのオブジェクト作成イベント」を選択します。
  • プレフィックス:外部ステージの配下にある特定のフォルダ内のファイルをロードしたい場合は指定します。
  • 送信先:「SQS キュー」を選択します。
  • SQS キュー を特定:「SQS キュー ARN を入力」を選択し、先ほど notification_channel 列から取得したSQS キューのARNを入力します。

上記を選択/入力したら「変更の保存」をします。

4.ストリームの作り方

ストリームはテーブルに加えられた変更(挿入、更新、削除など)を追跡できるオブジェクトです。
変更および変更に関するメタデータを記録し、変更されたデータを使用してアクションを実行できるようにします。
この機能を利用して、外部ステージからデータをロードするテーブル「Raw」のレコードに対する差分を検出するストリームを作成します。

5.タスクの作り方

タスクは以下のいずれかを実行できます。

  • 単一の SQL ステートメント
  • ストアドプロシージャへの呼び出し
  • Snowflakeスクリプトを使用した手続き型ロジック

今回は定期実行する単純なタスクを作成します。
タスクの実行がスケジュールされるたびに、ストリームにテーブルの変更データが含まれているかどうかを確認し、
変更データを消費するか、変更データが存在しない場合は現在の実行をスキップします。

「JSON形式のデータをテーブル「Raw」からストリームを利用して読み込み、各カラムに分割してテーブル「Sales」に取り込む」という処理のタスクを作成します。

①取込先のテーブルを準備
②タスクを作成
③タスクの実行権限を付与

タスクの起動には「グローバル権限」の「EXECUTE TASK」が必要なため、ACCOUNTADMINでロール「snowpipe1」に権限を付与します。
タスクの所有者ロール「SYSADMIN」がタスクを実行できるように「snowpipe1」を割り当てます。

④タスクの起動

新しく作成したタスクは「一時停止済み」の状態のため、タスクを起動します。

6.データパイプラインを試してみる

実行前のテーブルを確認します。
※SELECTは、テーブルを作成したロール「SYSADMIN」で実施します。
 ロール「snowpipe1」はテーブル「raw」のSELECT権限は付与されていますが、「sales」の権限は付与されていないためです。

S3にファイルを配置します。

テーブルを確認します。

テーブル「raw」にレコードが3件登録されました。

テーブル「sales」に指定した通りカラムが分割された状態で3件登録されました。

7.後処理

タスクが動き続けてしまうため、停止します。

今回はJSON形式のデータをロードしましたが、Snowpipeは半構造化データ型を含む全てのデータ型がサポートされています。
また、複数のタスクを実行したりストアドプロシージャを呼び出すなどもできるため、要望に合ったデータ変換を試してみてください。