みちのいに!!

自分のメモと、他にもハマる人がいそうなことを書く

BigQuery Write API を Python で

BigQuery Write APIはBigQueryにストリーミングでデータを突っ込む新しい(まだ正式版ではない)APIです。無料枠が月1TBと既存のinsertAllよりも太っ腹です。 gRPCでProtocolBufferを送りつけています。

cloud.google.com

まだJavaのサンプルしか公式サイトにない(2021-08-15時点)ので、Pythonのサンプルを書きました。

必要なもの

  • Python 多分>=3.7
  • bigquery.tables.updateData のついたサービスアカウント。bigquery.dataEditor以上があれば十分です。
  • USリージョンの空テーブル。東京はうまく動いてくれませんでした。
    • CREATE TABLE tmp.hello_stream (col STRING, val INT64);
  • ProtocolBufferのコンパイラ
    • apt-get install protobuf-compiler
  • ライブラリ

ProtocolBuffer周りの準備

sample.proto を作成。データスキーマを書きます。

syntax = "proto2";

package tutorial;

message Sample {
    required string col = 1;
    required int64 val = 2;
}

PythonでProtocolBufferを扱えるファイルを作成。sample_pb2.pyができます。

$ protoc -I=./ --python_out=./ sample.proto

本体

from google.cloud.bigquery_storage_v1beta2.services import big_query_write
from google.cloud.bigquery_storage_v1beta2.types import AppendRowsRequest, ProtoRows, ProtoSchema, CreateWriteStreamRequest, WriteStream 

from google.protobuf import descriptor_pb2
import sample_pb2

client = big_query_write.BigQueryWriteClient()
parent = client.table_path("dataset", "tmp", "hello_stream")

ws = client.create_write_stream(parent=parent, write_stream=WriteStream(type_=WriteStream.Type.COMMITTED))

row = sample_pb2.Sample()
row.col = "HOGE"
row.val = 334
row.SerializeToString()

pb = descriptor_pb2.DescriptorProto()
sample_pb2.Sample.DESCRIPTOR.CopyToProto(pb)
ar = AppendRowsRequest(
    write_stream=ws.name,
    proto_rows=AppendRowsRequest.ProtoData(
        writer_schema=ProtoSchema(proto_descriptor=pb),
        rows=ProtoRows(serialized_rows=[row.SerializeToString()]))
)

client.append_rows(requests=iter([ar]))