使用 Storage Write API 串流資料
本文說明如何使用 BigQuery Storage Write API,將資料串流至 BigQuery。
在串流情境中,資料會持續傳送,且應可在讀取時以最短的延遲時間提供。將 BigQuery Storage Write API 用於串流工作負載時,請考慮您需要哪些保證:
- 如果應用程式只需要至少一次語意,請使用預設串流。
- 如果您需要確切一次語義,請在已提交類型中建立一或多個串流,並使用串流偏移量來確保確切一次寫入。
在已提交類型中,一旦伺服器確認寫入要求,寫入串流的資料就會立即可供查詢。預設串流也會使用已提交的類型,但不會提供確切一次保證。
使用預設串流進行至少一次語意
如果應用程式可以接受在目標資料表中出現重複記錄的可能性,建議您在串流情境中使用預設串流。
以下程式碼說明如何將資料寫入預設串流:
Java
如要瞭解如何安裝及使用 BigQuery 用戶端程式庫,請參閱「BigQuery 用戶端程式庫」。詳情請參閱 BigQuery Java API 參考說明文件。
如要向 BigQuery 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證機制」。
Node.js
如要瞭解如何安裝及使用 BigQuery 用戶端程式庫,請參閱「BigQuery 用戶端程式庫」。
如要向 BigQuery 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證機制」。
Python
以下範例說明如何使用預設串流插入含有兩個欄位的記錄:
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
from google.protobuf import descriptor_pb2
import logging
import json
import sample_data_pb2
# The list of columns from the table's schema to search in the given data to write to BigQuery.
TABLE_COLUMNS_TO_CHECK = [
"name",
"age"
]
# Function to create a batch of row data to be serialized.
def create_row_data(data):
row = sample_data_pb2.SampleData()
for field in TABLE_COLUMNS_TO_CHECK:
# Optional fields will be passed as null if not provided
if field in data:
setattr(row, field, data[field])
return row.SerializeToString()
class BigQueryStorageWriteAppend(object):
# The stream name is: projects/{project}/datasets/{dataset}/tables/{table}/_default
def append_rows_proto2(
project_id: str, dataset_id: str, table_id: str, data: dict
):
write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path(project_id, dataset_id, table_id)
stream_name = f'{parent}/_default'
write_stream = types.WriteStream()
# Create a template with fields needed for the first request.
request_template = types.AppendRowsRequest()
# The request must contain the stream name.
request_template.write_stream = stream_name
# Generating the protocol buffer representation of the message descriptor.
proto_schema = types.ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto()
sample_data_pb2.SampleData.DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data
# Construct an AppendRowsStream to send an arbitrary number of requests to a stream.
append_rows_stream = writer.AppendRowsStream(write_client, request_template)
# Append proto2 serialized bytes to the serialized_rows repeated field using create_row_data.
proto_rows = types.ProtoRows()
for row in data:
proto_rows.serialized_rows.append(create_row_data(row))
# Appends data to the given stream.
request = types.AppendRowsRequest()
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data
append_rows_stream.send(request)
print(f"Rows to table: '{parent}' have been written.")
if __name__ == "__main__":
###### Uncomment the below block to provide additional logging capabilities ######
#logging.basicConfig(
# level=logging.DEBUG,
# format="%(asctime)s [%(levelname)s] %(message)s",
# handlers=[
# logging.StreamHandler()
# ]
#)
###### Uncomment the above block to provide additional logging capabilities ######
with open('entries.json', 'r') as json_file:
data = json.load(json_file)
# Change this to your specific BigQuery project, dataset, table details
BigQueryStorageWriteAppend.append_rows_proto2("PROJECT_ID","DATASET_ID", "TABLE_ID ",data=data)
這個程式碼範例會依附編譯的通訊協定模組 sample_data_pb2.py
。如要建立已編譯的模組,請執行 protoc --python_out=. sample_data.proto
指令,其中 protoc
是通訊協定緩衝區編譯器。sample_data.proto
檔案會定義 Python 範例中使用的訊息格式。如要安裝 protoc
編譯器,請按照「Protocol Buffers - Google 的資料交換格式」中的操作說明進行。
以下是 sample_data.proto
檔案的內容:
message SampleData {
required string name = 1;
required int64 age = 2;
}
這個指令碼會使用 entries.json
檔案,其中包含要插入 BigQuery 資料表的範例資料列:
{"name": "Jim", "age": 35}
{"name": "Jane", "age": 27}
使用多工處理
您只需在串流寫入器層級啟用預設串流的多重取樣功能,如要在 Java 中啟用多工處理,請在建構 StreamWriter
或 JsonStreamWriter
物件時呼叫 setEnableConnectionPool
方法。
啟用連線集區後,Java 用戶端程式庫會在背景管理您的連線,如果系統認為現有連線過於繁忙,就會擴大連線。如要讓自動調整資源配置功能更有效,建議您降低 maxInflightRequests
限制。
// One possible way for constructing StreamWriter StreamWriter.newBuilder(streamName) .setWriterSchema(protoSchema) .setEnableConnectionPool(true) .setMaxInflightRequests(100) .build(); // One possible way for constructing JsonStreamWriter JsonStreamWriter.newBuilder(tableName, bigqueryClient) .setEnableConnectionPool(true) .setMaxInflightRequests(100) .build();
如要在 Go 中啟用多工處理,請參閱「連線共用 (多工處理)」一文。
使用已提交類型來實現「僅限一次」語意
如果您需要「僅限一次」寫入語意,請使用已提交類型建立寫入串流。在已提交類型中,只要用戶端收到後端的確認訊息,即可查詢記錄。
已提交的類型會透過使用記錄偏移量,在串流中提供「僅傳送一次」的傳送機制。應用程式會使用記錄偏移量,在每次對 AppendRows
的呼叫中指定下一個附加偏移量。只有在偏移值與下一個附加偏移值相符時,系統才會執行寫入作業。詳情請參閱「管理串流偏移值,以達到「一次一語義」」。
如果您未提供偏移量,系統會將記錄附加至資料流的目前結尾。在這種情況下,如果附加要求傳回錯誤,重試可能會導致記錄在串流中出現多次。
如要使用已提交的類型,請執行下列步驟:
Java
- 呼叫
CreateWriteStream
以建立已提交類型的一或多個串流。 - 針對每個串流,在迴圈中呼叫
AppendRows
以寫入批次記錄。 - 針對每個串流呼叫
FinalizeWriteStream
,即可釋放串流。呼叫這個方法後,您就無法再將任何資料列寫入資料流。在已提交類型中,這個步驟為選用步驟,但有助於避免超過有效串流的限制。詳情請參閱「限制建立串流的頻率」。
Node.js
- 呼叫
createWriteStreamFullResponse
以建立已提交類型的一或多個串流。 - 針對每個串流,在迴圈中呼叫
appendRows
以寫入批次記錄。 - 針對每個串流呼叫
finalize
,即可釋放串流。呼叫這個方法後,您就無法再將任何資料列寫入資料流。在已提交類型中,這個步驟為選用步驟,但有助於避免超過有效串流的限制。詳情請參閱「限制建立串流的頻率」。
您無法明確刪除串流。串流會遵循系統定義的存留時間 (TTL):
- 如果沒有流量,已提交的串流 TTL 為三天。
- 如果緩衝串流沒有流量,則預設的存留時間為七天。
以下程式碼會展示如何使用已提交的類型:
Java
如要瞭解如何安裝及使用 BigQuery 用戶端程式庫,請參閱「BigQuery 用戶端程式庫」。詳情請參閱 BigQuery Java API 參考說明文件。
如要向 BigQuery 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證機制」。
Node.js
如要瞭解如何安裝及使用 BigQuery 用戶端程式庫,請參閱「BigQuery 用戶端程式庫」。
如要向 BigQuery 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證機制」。
使用 Apache Arrow 格式擷取資料
下列程式碼說明如何使用 Apache Arrow 格式擷取資料。如需更詳細的端對端範例,請參閱 GitHub 上的 PyArrow 範例。
Python
本範例說明如何使用預設串流擷取序列化的 PyArrow 資料表。
from google.cloud.bigquery_storage_v1 import types as gapic_types
from google.cloud.bigquery_storage_v1.writer import AppendRowsStream
from google.cloud import bigquery_storage_v1
def append_rows_with_pyarrow(
pyarrow_table: pyarrow.Table,
project_id: str,
dataset_id: str,
table_id: str,
):
bqstorage_write_client = bigquery_storage_v1.BigQueryWriteClient()
# Create request_template.
request_template = gapic_types.AppendRowsRequest()
request_template.write_stream = (
f"projects/{project_id}/datasets/{dataset_id}/tables/{table_id}/_default"
)
arrow_data = gapic_types.AppendRowsRequest.ArrowData()
arrow_data.writer_schema.serialized_schema = (
pyarrow_table.schema.serialize().to_pybytes()
)
request_template.arrow_rows = arrow_data
# Create AppendRowsStream.
append_rows_stream = AppendRowsStream(
bqstorage_write_client,
request_template,
)
# Create request with table data.
request = gapic_types.AppendRowsRequest()
request.arrow_rows.rows.serialized_record_batch = (
pyarrow_table.to_batches()[0].serialize().to_pybytes()
)
# Send request.
future = append_rows_stream.send(request)
# Wait for result.
future.result()