使用 Storage Write API 串流資料
本文說明如何使用 BigQuery Storage Write API,將資料串流至 BigQuery。
在串流情境中,資料會持續傳送,且應可在讀取時以最短的延遲時間提供。將 BigQuery Storage Write API 用於串流工作負載時,請考慮您需要哪些保證:
- 如果應用程式只需要至少一次語義,請使用預設串流。
- 如果您需要確切一次語義,請在已提交類型中建立一或多個資料流,並使用資料流偏移值來確保確切一次寫入。
在已提交類型中,一旦伺服器確認寫入要求,寫入串流的資料就會立即可供查詢。預設串流也會使用已提交的類型,但不會提供確切一次保證。
使用預設串流來處理至少一次語意
如果應用程式可以接受在目標資料表中出現重複記錄的可能性,建議您在串流情境中使用預設串流。
以下程式碼說明如何將資料寫入預設串流:
Java
如要瞭解如何安裝及使用 BigQuery 用戶端程式庫,請參閱「BigQuery 用戶端程式庫」。詳情請參閱 BigQuery Java API 參考說明文件。
如要向 BigQuery 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證機制」。
在執行程式碼範例之前,請將 GOOGLE_CLOUD_UNIVERSE_DOMAIN
環境變數設為 s3nsapis.fr
。
Node.js
如要瞭解如何安裝及使用 BigQuery 用戶端程式庫,請參閱「BigQuery 用戶端程式庫」。
如要向 BigQuery 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證機制」。
在執行程式碼範例之前,請將 GOOGLE_CLOUD_UNIVERSE_DOMAIN
環境變數設為 s3nsapis.fr
。
Python
以下範例說明如何使用預設串流插入含有兩個欄位的記錄:
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
from google.protobuf import descriptor_pb2
import logging
import json
import sample_data_pb2
# The list of columns from the table's schema to search in the given data to write to BigQuery.
TABLE_COLUMNS_TO_CHECK = [
"name",
"age"
]
# Function to create a batch of row data to be serialized.
def create_row_data(data):
row = sample_data_pb2.SampleData()
for field in TABLE_COLUMNS_TO_CHECK:
# Optional fields will be passed as null if not provided
if field in data:
setattr(row, field, data[field])
return row.SerializeToString()
class BigQueryStorageWriteAppend(object):
# The stream name is: projects/{project}/datasets/{dataset}/tables/{table}/_default
def append_rows_proto2(
project_id: str, dataset_id: str, table_id: str, data: dict
):
write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path(project_id, dataset_id, table_id)
stream_name = f'{parent}/_default'
write_stream = types.WriteStream()
# Create a template with fields needed for the first request.
request_template = types.AppendRowsRequest()
# The request must contain the stream name.
request_template.write_stream = stream_name
# Generating the protocol buffer representation of the message descriptor.
proto_schema = types.ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto()
sample_data_pb2.SampleData.DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data
# Construct an AppendRowsStream to send an arbitrary number of requests to a stream.
append_rows_stream = writer.AppendRowsStream(write_client, request_template)
# Append proto2 serialized bytes to the serialized_rows repeated field using create_row_data.
proto_rows = types.ProtoRows()
for row in data:
proto_rows.serialized_rows.append(create_row_data(row))
# Appends data to the given stream.
request = types.AppendRowsRequest()
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data
append_rows_stream.send(request)
print(f"Rows to table: '{parent}' have been written.")
if __name__ == "__main__":
###### Uncomment the below block to provide additional logging capabilities ######
#logging.basicConfig(
# level=logging.DEBUG,
# format="%(asctime)s [%(levelname)s] %(message)s",
# handlers=[
# logging.StreamHandler()
# ]
#)
###### Uncomment the above block to provide additional logging capabilities ######
with open('entries.json', 'r') as json_file:
data = json.load(json_file)
# Change this to your specific BigQuery project, dataset, table details
BigQueryStorageWriteAppend.append_rows_proto2("PROJECT_ID","DATASET_ID", "TABLE_ID ",data=data)
這個程式碼範例會依附編譯的通訊協定模組 sample_data_pb2.py
。如要建立已編譯的模組,請執行 protoc --python_out=. sample_data.proto
指令,其中 protoc
是通訊協定緩衝區編譯器。sample_data.proto
檔案會定義 Python 範例中使用的訊息格式。如要安裝 protoc
編譯器,請按照「Protocol Buffers - Google 的資料交換格式」中的操作說明進行。
以下是 sample_data.proto
檔案的內容:
message SampleData {
required string name = 1;
required int64 age = 2;
}
這個指令碼會使用 entries.json
檔案,其中包含要插入 BigQuery 資料表的範例資料列:
{"name": "Jim", "age": 35}
{"name": "Jane", "age": 27}
使用多工處理
您只需在串流寫入器層級啟用預設串流的多重取樣功能,如要在 Java 中啟用多工處理,請在建構 StreamWriter
或 JsonStreamWriter
物件時呼叫 setEnableConnectionPool
方法。
啟用連線集區後,Java 用戶端程式庫會在背景管理您的連線,如果系統認為現有連線過於繁忙,就會擴大連線。如要讓自動調整資源配置功能更有效,建議您降低 maxInflightRequests
限制。
// One possible way for constructing StreamWriter StreamWriter.newBuilder(streamName) .setWriterSchema(protoSchema) .setEnableConnectionPool(true) .setMaxInflightRequests(100) .build(); // One possible way for constructing JsonStreamWriter JsonStreamWriter.newBuilder(tableName, bigqueryClient) .setEnableConnectionPool(true) .setMaxInflightRequests(100) .build();
如要在 Go 中啟用多工處理,請參閱「連線共用 (多工處理)」一文。
使用已提交類型來實現「僅限一次」語意
如果您需要「僅限一次」寫入語意,請使用已提交類型建立寫入串流。在已提交類型中,只要用戶端收到後端的確認訊息,即可查詢記錄。
已提交的類型會透過使用記錄偏移量,在串流中提供「僅傳送一次」的傳送機制。應用程式會使用記錄偏移量,在每次對 AppendRows
的呼叫中指定下一個附加偏移量。只有在偏移值與下一個附加偏移值相符時,系統才會執行寫入作業。詳情請參閱「管理串流偏移值,以達到「一次一語義」」。
如果您未提供偏移量,系統會將記錄附加至串流的目前結尾。在這種情況下,如果附加要求傳回錯誤,重試可能會導致記錄在串流中出現多次。
如要使用已提交的類型,請執行下列步驟:
Java
- 呼叫
CreateWriteStream
以建立已提交類型的一或多個串流。 - 針對每個串流,在迴圈中呼叫
AppendRows
以寫入批次記錄。 - 針對每個串流呼叫
FinalizeWriteStream
,即可釋放串流。呼叫這個方法後,您就無法再將任何資料列寫入資料流。在已提交類型中,這個步驟為選用步驟,但有助於避免超過有效串流的限制。詳情請參閱「限制建立串流的頻率」。
Node.js
- 呼叫
createWriteStreamFullResponse
以建立已提交類型的一或多個串流。 - 針對每個串流,在迴圈中呼叫
appendRows
以寫入批次記錄。 - 針對每個串流呼叫
finalize
,即可釋放串流。呼叫這個方法後,您就無法再將任何資料列寫入資料流。在已提交類型中,這個步驟為選用步驟,但有助於避免超過有效串流的限制。詳情請參閱「限制建立串流的頻率」。
您無法明確刪除串流。串流會遵循系統定義的存留時間 (TTL):
- 如果沒有流量,已提交的串流 TTL 為三天。
- 如果緩衝串流沒有流量,則預設的存留時間為七天。
以下程式碼會展示如何使用已提交類型:
Java
如要瞭解如何安裝及使用 BigQuery 用戶端程式庫,請參閱「BigQuery 用戶端程式庫」。詳情請參閱 BigQuery Java API 參考說明文件。
如要向 BigQuery 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證機制」。
在執行程式碼範例之前,請將 GOOGLE_CLOUD_UNIVERSE_DOMAIN
環境變數設為 s3nsapis.fr
。
Node.js
如要瞭解如何安裝及使用 BigQuery 用戶端程式庫,請參閱「BigQuery 用戶端程式庫」。
如要向 BigQuery 進行驗證,請設定應用程式預設憑證。詳情請參閱「設定用戶端程式庫的驗證機制」。
在執行程式碼範例之前,請將 GOOGLE_CLOUD_UNIVERSE_DOMAIN
環境變數設為 s3nsapis.fr
。
使用 Apache Arrow 格式擷取資料
下列程式碼說明如何使用 Apache Arrow 格式擷取資料。如需更詳細的端對端範例,請參閱 GitHub 上的 PyArrow 範例。
Python
本範例說明如何使用預設串流擷取序列化的 PyArrow 資料表。
from google.cloud.bigquery_storage_v1 import types as gapic_types
from google.cloud.bigquery_storage_v1.writer import AppendRowsStream
from google.cloud import bigquery_storage_v1
def append_rows_with_pyarrow(
pyarrow_table: pyarrow.Table,
project_id: str,
dataset_id: str,
table_id: str,
):
bqstorage_write_client = bigquery_storage_v1.BigQueryWriteClient()
# Create request_template.
request_template = gapic_types.AppendRowsRequest()
request_template.write_stream = (
f"projects/{project_id}/datasets/{dataset_id}/tables/{table_id}/_default"
)
arrow_data = gapic_types.AppendRowsRequest.ArrowData()
arrow_data.writer_schema.serialized_schema = (
pyarrow_table.schema.serialize().to_pybytes()
)
request_template.arrow_rows = arrow_data
# Create AppendRowsStream.
append_rows_stream = AppendRowsStream(
bqstorage_write_client,
request_template,
)
# Create request with table data.
request = gapic_types.AppendRowsRequest()
request.arrow_rows.rows.serialized_record_batch = (
pyarrow_table.to_batches()[0].serialize().to_pybytes()
)
# Send request.
future = append_rows_stream.send(request)
# Wait for result.
future.result()