BigQuery Write API を Python で
BigQuery Write APIはBigQueryにストリーミングでデータを突っ込む新しい(まだ正式版ではない)APIです。無料枠が月1TBと既存のinsertAllよりも太っ腹です。 gRPCでProtocolBufferを送りつけています。
まだJavaのサンプルしか公式サイトにない(2021-08-15時点)ので、Pythonのサンプルを書きました。
必要なもの
- Python 多分>=3.7
- bigquery.tables.updateData のついたサービスアカウント。bigquery.dataEditor以上があれば十分です。
- USリージョンの空テーブル。東京はうまく動いてくれませんでした。
CREATE TABLE tmp.hello_stream (col STRING, val INT64);
- ProtocolBufferのコンパイラ
apt-get install protobuf-compiler
- ライブラリ
pip3 install google-cloud-bigquery-storage>=2.6.3
- BigQueryWrite — google-cloud-bigquery-storage documentation
ProtocolBuffer周りの準備
sample.proto を作成。データスキーマを書きます。
syntax = "proto2"; package tutorial; message Sample { required string col = 1; required int64 val = 2; }
PythonでProtocolBufferを扱えるファイルを作成。sample_pb2.pyができます。
$ protoc -I=./ --python_out=./ sample.proto
本体
from google.cloud.bigquery_storage_v1beta2.services import big_query_write from google.cloud.bigquery_storage_v1beta2.types import AppendRowsRequest, ProtoRows, ProtoSchema, CreateWriteStreamRequest, WriteStream from google.protobuf import descriptor_pb2 import sample_pb2 client = big_query_write.BigQueryWriteClient() parent = client.table_path("dataset", "tmp", "hello_stream") ws = client.create_write_stream(parent=parent, write_stream=WriteStream(type_=WriteStream.Type.COMMITTED)) row = sample_pb2.Sample() row.col = "HOGE" row.val = 334 row.SerializeToString() pb = descriptor_pb2.DescriptorProto() sample_pb2.Sample.DESCRIPTOR.CopyToProto(pb) ar = AppendRowsRequest( write_stream=ws.name, proto_rows=AppendRowsRequest.ProtoData( writer_schema=ProtoSchema(proto_descriptor=pb), rows=ProtoRows(serialized_rows=[row.SerializeToString()])) ) client.append_rows(requests=iter([ar]))