一般疑難排解

瞭解實用的疑難排解步驟,解決您在使用 Pub/Sub 時遇到的問題。

無法建立主題

確認您具備必要的權限。如要建立 Pub/Sub 主題,您需要專案的 Pub/Sub 編輯者 (roles/pubsub.editor) 身分與存取權管理角色。如果沒有這個角色,請與管理員聯絡。 如要進一步瞭解如何排解主題相關問題,請參閱下列頁面:

無法建立訂閱

請確認您已完成下列事項:

  • 確認您具備必要的權限。如要建立 Pub/Sub 訂閱項目,您必須在專案中具備 Pub/Sub 編輯者 (roles/pubsub.editor) IAM 角色。如果沒有這個角色,請與管理員聯絡。

  • 指定訂閱名稱。

  • 指定要附加訂閱項目的現有主題名稱。

  • 建立推送訂閱時,在 pushEndpoint 欄位中,以小寫 (而非 http://HTTPS://) 將 https:// 指定為接收 URL 的通訊協定。

如需訂閱項目的疑難排解資訊,請參閱下列頁面:

排解權限問題

Pub/Sub 權限可控管哪些使用者和服務帳戶可以對 Pub/Sub 資源執行動作。如果權限設定錯誤,可能會導致權限遭拒錯誤,並中斷郵件流程。稽核記錄會詳細記錄所有權限變更,方便您找出問題來源。

如要使用稽核記錄排解 Pub/Sub 權限問題,請按照下列步驟操作:

  1. 取得查看記錄檔探索工具的必要權限。

    詳情請參閱「事前準備」一文。

  2. 前往 Trusted Cloud 控制台的「Logs Explorer」頁面。

    前往記錄檔探索工具

  3. 選取現有的 Trusted Cloud 專案、資料夾或組織。

  4. 以下列出可用來尋找相關記錄的篩選器:

    • resource.type="pubsub_topic" OR resource.type="pubsub_subscription":如果問題可能涉及主題或訂閱設定變更,或是存取權控管,請使用這項查詢做為疑難排解的起點。你也可以搭配其他篩選器,進一步修正搜尋結果。

    • protoPayload.methodName="google.iam.v1.SetIamPolicy":懷疑問題是由於權限不正確或遺漏時,請使用這項查詢。這項功能可協助您追蹤身分與存取權管理政策的變更者和變更內容。這有助於排解問題,例如使用者無法發布至主題或訂閱項目、應用程式遭拒存取 Pub/Sub 資源,或是存取權控管發生非預期變更。

    • protoPayload.status.code=7:如果遇到與權限明確相關的錯誤,請使用這項查詢。這有助於找出失敗的動作和嘗試執行這些動作的使用者。您可以將這項查詢與先前的查詢合併,找出可能導致權限遭拒的特定資源和 IAM 政策變更。

  5. 分析記錄,判斷事件的時間戳記、進行變更的主體,以及變更類型等因素。

  6. 您可以根據稽核記錄收集到的資訊採取修正措施。

排解 Terraform 權限問題

搭配 Terraform 使用 Pub/Sub 時,請在 Terraform 程式碼中明確授予必要角色。舉例來說,如要發布應用程式,應用程式的服務帳戶必須具備 roles/pubsub.publisher 角色。如果 Terraform 程式碼未明確定義這個角色,日後的 terraform apply 可能會移除該角色。這通常發生在不相關的更新期間,導致可靠的應用程式突然發生 PERMISSION_DENIED 錯誤。在程式碼中明確定義角色,即可避免發生這類意外回歸。

訂閱項目已刪除

刪除 Pub/Sub 訂閱項目主要有兩種方式:

  • 具備足夠權限的使用者或服務帳戶刻意刪除訂閱項目。

  • 訂閱項目會在閒置一段時間後自動刪除,預設為 31 天。如要進一步瞭解訂閱方案到期政策,請參閱「到期期限」一文。

如要排解已刪除的訂閱方案問題,請按照下列步驟操作:

  1. 前往 Trusted Cloud 控制台的 Pub/Sub 訂閱項目頁面,確認訂閱項目已不再列出。如要進一步瞭解如何列出訂閱項目,請參閱「列出訂閱項目」。

  2. 檢查稽核記錄。前往「記錄檔探索工具」。 使用篩選器 protoPayload.methodName="google.pubsub.v1.Subscriber.DeleteSubscription" 找出已刪除的訂閱項目。檢查記錄,判斷是否有人刪除訂閱項目,或訂閱項目是否因閒置而遭刪除。InternalExpireInactiveSubscription 表示訂閱項目因閒置而遭刪除。如要進一步瞭解如何使用稽核記錄排解問題,請參閱使用稽核記錄排解 Pub/Sub 問題

403 (Forbidden) 個錯誤

403 錯誤通常表示您沒有執行動作的適當權限。舉例來說,嘗試發布至主題或從訂閱項目提取內容時,可能會收到 403 User not authorized 錯誤訊息。

如果您是這類錯誤,請執行下列步驟:

  • 請確認您已在Trusted Cloud 控制台中啟用 Pub/Sub API。
  • 確定提出要求的主體在相關 Pub/Sub API 資源上具有所需權限,特別是您針對跨專案通訊使用 Pub/Sub API 時。

  • 如果您使用 Dataflow,請確認 {PROJECT_NUMBER}@cloudservices.s3ns-system.iam.gserviceaccount.com 和 Compute Engine 服務帳戶 {PROJECT_NUMBER}-compute@developer.s3ns-system.iam.gserviceaccount.com 均具備相關 Pub/Sub API 資源的必要權限。詳情請參閱「Dataflow 安全與權限」。

  • 如果您使用 App Engine,請檢查專案的「權限」頁面,確認 App Engine 服務帳戶是否列為 Pub/Sub 編輯者。如果不是,請將 App Engine 服務帳戶新增為 Pub/Sub 編輯者。通常,App Engine 服務帳戶的格式為 <project-id>@appspot.s3ns-system.iam.gserviceaccount.com

  • 您可以使用稽核記錄排解權限問題

其他常見錯誤代碼

如要查看其他常見的 Pub/Sub API 相關錯誤代碼及其說明,請參閱「錯誤代碼」。

使用過度管理作業

如果您發現自己使用太多管理作業配額,就可能需要重構程式碼。您可考慮使用此虛擬程式碼/虛擬碼作為實例。在本範例中,管理作業 (GET) 用於先檢查訂閱是否存在,然後再嘗試使用其資源。GETCREATE 都是管理員作業:

if !GetSubscription my-sub {
  CreateSubscription my-sub
}
Consume from subscription my-sub

更有效率的模式是嘗試由訂閱項目使用訊息 (假設您能夠合理確定訂閱項目名稱)。在這種樂觀的方法中,您只會在發生錯誤時取得或建立訂閱。請參閱以下範例:

try {
  Consume from subscription my-sub
} catch NotFoundError {
  CreateSubscription my-sub
  Consume from subscription my-sub
}

您可以使用下列程式碼範例,以所選語言實作這個模式:

Go

以下範例使用 Go Pub/Sub 用戶端程式庫的主要版本 (v2)。如果您仍在使用第 1 版程式庫,請參閱第 2 版遷移指南。如要查看第 1 版程式碼範例清單,請參閱 已淘汰的程式碼範例

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Go 設定說明進行操作。詳情請參閱 Pub/Sub Go API 參考說明文件

import (
	"context"
	"errors"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

// optimisticSubscribe shows the recommended pattern for optimistically
// assuming a subscription exists prior to receiving messages.
func optimisticSubscribe(w io.Writer, projectID, topic, subscriptionName string) error {
	// projectID := "my-project-id"
	// topic := "projects/my-project-id/topics/my-topic"
	// subscription := "projects/my-project/subscriptions/my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// client.Subscriber can be passed a subscription ID (e.g. "my-sub") or
	// a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub").
	// If a subscription ID is provided, the project ID from the client is used.
	sub := client.Subscriber(subscriptionName)

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	// Instead of checking if the subscription exists, optimistically try to
	// receive from the subscription assuming it exists.
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got from existing subscription: %q\n", string(msg.Data))
		msg.Ack()
	})
	if err != nil {
		if st, ok := status.FromError(err); ok {
			if st.Code() == codes.NotFound {
				// If the subscription does not exist, then create the subscription.
				subscription, err := client.SubscriptionAdminClient.CreateSubscription(ctx, &pubsubpb.Subscription{
					Name:  subscriptionName,
					Topic: topic,
				})
				if err != nil {
					return err
				}
				fmt.Fprintf(w, "Created subscription: %q\n", subscriptionName)

				// client.Subscriber can be passed a subscription ID (e.g. "my-sub") or
				// a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub").
				// If a subscription ID is provided, the project ID from the client is used.
				sub = client.Subscriber(subscription.GetName())
				err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
					fmt.Fprintf(w, "Got from new subscription: %q\n", string(msg.Data))
					msg.Ack()
				})
				if err != nil && !errors.Is(err, context.Canceled) {
					return err
				}
			}
		}
	}
	return nil
}

Java

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Java 設定操作說明進行操作。詳情請參閱 Pub/Sub Java API 參考說明文件


import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class OptimisticSubscribeExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";
    String topicId = "your-topic-id";

    optimisticSubscribeExample(projectId, subscriptionId, topicId);
  }

  public static void optimisticSubscribeExample(
      String projectId, String subscriptionId, String topicId) throws IOException {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();

      // Listen for resource NOT_FOUND errors and rebuild the  subscriber and restart subscribing
      // when the current subscriber encounters these errors.
      subscriber.addListener(
          new Subscriber.Listener() {
            public void failed(Subscriber.State from, Throwable failure) {
              System.out.println(failure.getStackTrace());
              if (failure instanceof NotFoundException) {
                try (SubscriptionAdminClient subscriptionAdminClient =
                    SubscriptionAdminClient.create()) {
                  TopicName topicName = TopicName.of(projectId, topicId);
                  // Create a pull subscription with default acknowledgement deadline of 10 seconds.
                  // The client library will automatically extend acknowledgement deadlines.
                  Subscription subscription =
                      subscriptionAdminClient.createSubscription(
                          subscriptionName, topicName, PushConfig.getDefaultInstance(), 10);
                  System.out.println("Created pull subscription: " + subscription.getName());
                  optimisticSubscribeExample(projectId, subscriptionId, topicId);
                } catch (IOException err) {
                  System.out.println("Failed to create pull subscription: " + err.getMessage());
                }
              }
            }
          },
          MoreExecutors.directExecutor());

      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (IllegalStateException e) {
      // Prevent an exception from being thrown if it is the expected NotFoundException
      if (!(subscriber.failureCause() instanceof NotFoundException)) {
        throw e;
      }
    } catch (TimeoutException e) {
      subscriber.stopAsync();
    }
  }
}

Node.js

在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Node.js 設定說明進行操作。詳情請參閱 Pub/Sub Node.js API 參考說明文件

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function optimisticSubscribe(subscriptionNameOrId, topicNameOrId, timeout) {
  // Try using an existing subscription
  let subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = message => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Set an error handler so that we're notified if the subscription doesn't
  // already exist.
  subscription.on('error', async e => {
    // Resource Not Found
    if (e.code === 5) {
      console.log('Subscription not found, creating it');
      await pubSubClient.createSubscription(
        topicNameOrId,
        subscriptionNameOrId,
      );

      // Refresh our subscriber object and re-attach the message handler.
      subscription = pubSubClient.subscription(subscriptionNameOrId);
      subscription.on('message', messageHandler);
    }
  });

  // Listen for new messages until timeout is hit; this will attempt to
  // open the actual subscriber streams. If it fails, the error handler
  // above will be called.
  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

Node.ts

在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Node.js 設定說明進行操作。詳情請參閱 Pub/Sub Node.js API 參考說明文件

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
import {PubSub, Message, StatusError} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function optimisticSubscribe(
  subscriptionNameOrId: string,
  topicNameOrId: string,
  timeout: number,
) {
  // Try using an existing subscription
  let subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = (message: Message) => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Set an error handler so that we're notified if the subscription doesn't
  // already exist.
  subscription.on('error', async (e: StatusError) => {
    // Resource Not Found
    if (e.code === 5) {
      console.log('Subscription not found, creating it');
      await pubSubClient.createSubscription(
        topicNameOrId,
        subscriptionNameOrId,
      );

      // Refresh our subscriber object and re-attach the message handler.
      subscription = pubSubClient.subscription(subscriptionNameOrId);
      subscription.on('message', messageHandler);
    }
  });

  // Listen for new messages until timeout is hit; this will attempt to
  // open the actual subscriber streams. If it fails, the error handler
  // above will be called.
  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

Python

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Python 設定操作說明來進行。詳情請參閱 Pub/Sub Python API 參考說明文件

from google.api_core.exceptions import NotFound
from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0
# topic_id = "your-topic-id"

# Create a subscriber client.
subscriber = pubsub_v1.SubscriberClient()

# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

# Define callback to be called when a message is received.
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    # Ack message after processing it.
    message.ack()

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # Optimistically subscribe to messages on the subscription.
        streaming_pull_future = subscriber.subscribe(
            subscription_path, callback=callback
        )
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        print("Successfully subscribed until the timeout passed.")
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.
    except NotFound:
        print(f"Subscription {subscription_path} not found, creating it.")

        try:
            # If the subscription does not exist, then create it.
            publisher = pubsub_v1.PublisherClient()
            topic_path = publisher.topic_path(project_id, topic_id)
            subscription = subscriber.create_subscription(
                request={"name": subscription_path, "topic": topic_path}
            )

            if subscription:
                print(f"Subscription {subscription.name} created")
            else:
                raise ValueError("Subscription creation failed.")

            # Subscribe on the created subscription.
            try:
                streaming_pull_future = subscriber.subscribe(
                    subscription.name, callback=callback
                )
                streaming_pull_future.result(timeout=timeout)
            except TimeoutError:
                streaming_pull_future.cancel()  # Trigger the shutdown.
                streaming_pull_future.result()  # Block until the shutdown is complete.
        except Exception as e:
            print(
                f"Exception occurred when creating subscription and subscribing to it: {e}"
            )
    except Exception as e:
        print(f"Exception occurred when attempting optimistic subscribe: {e}")

C++

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 C++ 設定操作說明進行操作。詳情請參閱 Pub/Sub C++ API 參考說明文件

auto process_response = [](gc::StatusOr<pubsub::PullResponse> response) {
  if (response) {
    std::cout << "Received message " << response->message << "\n";
    std::move(response->handler).ack();
    return gc::Status();
  }
  if (response.status().code() == gc::StatusCode::kUnavailable &&
      response.status().message() == "no messages returned") {
    std::cout << "No messages returned from Pull()\n";
    return gc::Status();
  }
  return response.status();
};

// Instead of checking if the subscription exists, optimistically try to
// consume from the subscription.
auto status = process_response(subscriber.Pull());
if (status.ok()) return;
if (status.code() != gc::StatusCode::kNotFound) throw std::move(status);

// Since the subscription does not exist, create the subscription.
pubsub_admin::SubscriptionAdminClient subscription_admin_client(
    pubsub_admin::MakeSubscriptionAdminConnection());
google::pubsub::v1::Subscription request;
request.set_name(
    pubsub::Subscription(project_id, subscription_id).FullName());
request.set_topic(
    pubsub::Topic(project_id, std::move(topic_id)).FullName());
auto sub = subscription_admin_client.CreateSubscription(request);
if (!sub) throw std::move(sub).status();

// Consume from the new subscription.
status = process_response(subscriber.Pull());
if (!status.ok()) throw std::move(status);