ユーザー定義関数(UDF)の概要

JavaScript のユーザー定義関数(UDF)は、単一メッセージ変換(SMT)の一種です。UDF は、BigQuery JavaScript UDF と同様に、Pub/Sub 内でカスタム変換ロジックを実装する柔軟な方法を提供します。

UDF は入力として 1 つのメッセージを受け取り、入力に対して定義されたアクションを実行し、プロセスの結果を返します。

UDF には次の主なプロパティがあります。

  • 関数名: Pub/Sub がメッセージに適用する、指定されたコード内の JavaScript 関数の名前。

  • コード: 変換ロジックを定義する JavaScript コード。このコードには、次のシグネチャを持つ関数が含まれている必要があります。

    /**
    * Transforms a Pub/Sub message.
    * @return {(Object<string, (string | Object<string, string>)>|* null)} - To
    * filter a message, return `null`. To transform a message, return a map with
    * the following keys:
    *   - (required) 'data' : {string}
    *   - (optional) 'attributes' : {Object<string, string>}
    * Returning empty `attributes` will remove all attributes from the message.
    *
    * @param  {(Object<string, (string | Object<string, string>)>} - Pub/Sub
    * message. Keys:
    *   - (required) 'data' : {string}
    *   - (required) 'attributes' : {Object<string, string>}
    *
    * @param  {Object<string, any>} metadata - Pub/Sub message metadata.
    * Keys:
    *   - (optional) 'message_id'  : {string}
    *   - (optional) 'publish_time': {string} YYYY-MM-DDTHH:MM:SSZ format
    *   - (optional) 'ordering_key': {string}
    */
    function <function_name>(message, metadata) {
      // Perform custom transformation logic
      return message; // to filter a message instead, return `null`
    }
    

入力

  • message 引数: Pub/Sub メッセージを表す JavaScript オブジェクト。次のプロパティが含まれます。

    • data:(String、必須)メッセージ ペイロード。

    • attributes:(Object<String, String>、省略可)メッセージ属性を表すキーと値のペアのマップ。

  • metadata 引数: Pub/Sub メッセージに関する不変のメタデータを含む JavaScript オブジェクト。

    • message_id:(String、省略可)メッセージの一意の ID。

    • publish_time:(String、省略可)メッセージのパブリッシュ時刻(RFC 3339 形式(YYYY-MM-DDTHH:mm:ssZ))。

    • ordering_key:(String、省略可)該当する場合、メッセージの順序指定キー

出力

  • メッセージを変換するには、message.datamessage.attributes の内容を編集し、変更された message オブジェクトを返します。

  • メッセージをフィルタするには、null を返します。

UDF がメッセージを変換する方法

メッセージに対して UDF を実行した結果は、次のいずれかになります。

  • UDF はメッセージを変換します。

  • UDF は null を返します。

    • トピック SMT: Pub/Sub はパブリッシャーに成功を返し、フィルタされたメッセージのレスポンスにメッセージ ID を含めます。Pub/Sub はメッセージを保存せず、サブスクライバーに送信しません。

    • サブスクリプション SMT: Pub/Sub は、サブスクライバーにメッセージを送信せずにメッセージ配信を確認応答します。

  • UDF がエラーをスローします。

    • トピック SMT: Pub/Sub はパブリッシャーにエラーを返し、メッセージをパブリッシュしません。

    • サブスクリプション SMT: Pub/Sub はメッセージを否定応答します。

制限事項

Pub/Sub は、効率的な変換オペレーションを保証するために、UDF にリソースの上限を適用します。制限事項は次のとおりです。

  • UDF あたり最大 20 KB のコード
  • メッセージあたりの最大実行時間 500 ミリ秒
  • ECMAScript 標準組み込みのみをサポート
  • 外部 API への呼び出しなし
  • 外部ライブラリのインポートなし

サンプル UDF

パブリッシュとサブスクライブの UDF の例を次に示します。

関数: 曜日の整数を対応する文字列に変換します。

次の UDF をトピックまたはサブスクリプションに追加すると、メッセージのパブリッシュまたは配信時に次の変更が行われます。

  1. Pub/Sub はメッセージに関数を適用します。メッセージに JSON ペイロードがない場合、UDF はエラーをスローします。

  2. UDF は dayOfWeek というフィールドを探し、このフィールドの値が 0 ~ 6 の数値である場合は、Monday などの対応する曜日に変換します。フィールドが存在しない場合、または数値が 0 ~ 6 の範囲にない場合、コードは dayOfWeek フィールドを Unknown に設定します。

  3. UDF は、変更されたペイロードをメッセージにシリアル化して戻します。

  4. Pub/Sub は、更新されたメッセージをパイプラインの次のステップに渡します。

function intToString(message, metadata) {
  const data = JSON.parse(message.data);
  switch(`data["dayOfWeek"]`) {
    case 0:
      data["dayOfWeek"] = "Sunday";
      break;
    case 1:
      data["dayOfWeek"] = "Monday";
      break;
    case 2:
      data["dayOfWeek"] = "Tuesday";
      break;
    case 3:
      data["dayOfWeek"] = "Wednesday";
      break;
    case 4:
      data["dayOfWeek"] = "Thursday";
      break;
    case 5:
      data["dayOfWeek"] = "Friday";
      break;
    case 6:
      data["dayOfWeek"] = "Saturday";
      break;
    default:
      data["dayOfWeek"] = "Unknown";
  }
  message.data = JSON.stringify(data);
  return message;
}

機能: 社会保障番号を秘匿化する

次の UDF をトピックまたはサブスクリプションに追加すると、メッセージのパブリッシュまたは配信時に次の変更が行われます。

  1. Pub/Sub はメッセージに関数を適用します。メッセージに JSON ペイロードがない場合、UDF はエラーをスローします。

  2. UDF は、メッセージ ペイロードからフィールド ssn を削除します(存在する場合)。

  3. UDF は、変更されたペイロードをメッセージにシリアル化して戻します。

  4. Pub/Sub は、更新されたメッセージをパイプラインの次のステップに渡します。

function redactSSN(message, metadata) {
  const data = JSON.parse(message.data);
  delete data['ssn'];
  message.data = JSON.stringify(data);
  return message;
}

機能: 特定のメッセージをフィルタして自動的に確認応答する

次の UDF をトピックまたはサブスクリプションに追加すると、メッセージのパブリッシュまたは配信時に次の変更が行われます。

  1. Pub/Sub はメッセージに関数を適用します。メッセージに JSON ペイロードがない場合、UDF はエラーをスローします。

  2. UDF は、ペイロードに region というフィールドが含まれているかどうかを確認します。

  3. region フィールドの値が US でない場合、関数は null を返し、Pub/Sub はメッセージをフィルタします。

  4. region フィールドの値が US の場合、Pub/Sub は元のメッセージをパイプラインの次のステップに渡します。

function filterForUSRegion(message, metadata) {
  const data = JSON.parse(message.data);
  if (data["region"] !== "US") {
    return null;
  }
  return message;
}

関数: メッセージの内容を検証して、金額が 100 を超えていないことを確認します

次の UDF をトピックまたはサブスクリプションに追加すると、メッセージのパブリッシュまたは配信時に次の変更が行われます。

  1. Pub/Sub はメッセージに関数を適用します。メッセージに JSON ペイロードがない場合、UDF はエラーをスローします。

  2. UDF は、メッセージに amount というフィールドが含まれているかどうかを確認します。

  3. amount フィールドの値が 100 より大きい場合、関数はエラーをスローします。

  4. amount フィールドの値が 100 より大きくない場合、関数は元のメッセージを返します。

  5. Pub/Sub は、メッセージを失敗としてマークするか、元のメッセージをパイプラインの次のステップに渡します。

function validateAmount(message, metadata) {
  const data = JSON.parse(message.data);
  if (data["amount"] > 100) {
    throw new Error("Amount is invalid");
  }
  return message;
}

次のステップ