AWS SNS/SQS

AWS SNS/SQS 发布订阅组件的详细文档

组件格式

要设置 AWS SNS/SQS 发布订阅,请创建类型为 pubsub.aws.snssqs 的组件。

默认情况下,AWS SNS/SQS 组件:

  • 生成 SNS 主题
  • 预配 SQS 队列
  • 配置队列到主题的订阅
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: snssqs-pubsub
spec:
  type: pubsub.aws.snssqs
  version: v1
  metadata:
    - name: accessKey
      value: "AKIAIOSFODNN7EXAMPLE"
    - name: secretKey
      value: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
    - name: region
      value: "us-east-1"
    # - name: consumerID # 可选。如果未提供,运行时将创建一个。
    #   value: "channel1"
    # - name: endpoint # 可选。 
    #   value: "http://localhost:4566"
    # - name: sessionToken  # 可选(如果使用 AssignedRole 则为必需;例如,临时 accessKey 和 secretKey)
    #   value: "TOKEN"
    # - name: messageVisibilityTimeout # 可选
    #   value: 10
    # - name: messageRetryLimit # 可选
    #   value: 10
    # - name: messageReceiveLimit # 可选
    #   value: 10
    # - name: sqsDeadLettersQueueName # 可选
    # - value: "myapp-dlq"
    # - name: messageWaitTimeSeconds # 可选
    #   value: 1
    # - name: messageMaxNumber # 可选
    #   value: 10
    # - name: fifo # 可选
    #   value: "true"
    # - name: fifoMessageGroupID # 可选
    #   value: "app1-mgi"
    # - name: disableEntityManagement # 可选
    #   value: "false"
    # - name: disableDeleteOnRetryLimit # 可选
    #   value: "false"
    # - name: assetsManagementTimeoutSeconds # 可选
    #   value: 5
    # - name: concurrencyMode # 可选
    #   value: "single"
    # - name: concurrencyLimit # 可选
    #   value: "0"

规范元数据字段

字段必填详情示例
accessKeyY具有对 SNS 和 SQS 适当权限的 AWS 账户/角色的 ID(见下文)"AKIAIOSFODNN7EXAMPLE"
secretKeyYAWS 用户/角色的密钥。如果使用 AssumeRole 访问,你还需要提供 sessionToken"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
regionYSNS/SQS 资源所在或将要创建的 AWS 区域。有关有效区域,请参阅此页面。确保 SNS 和 SQS 在该区域可用"us-east-1"
consumerIDN消费者 ID(消费者标签)将一个或多个消费者组织到一个组中。具有相同消费者 ID 的消费者作为一个虚拟消费者工作;例如,一条消息仅由组中的一个消费者处理一次。如果未提供 consumerID,Dapr 运行时会将其设置为 Dapr 应用程序 ID(appID)的值。请参阅发布订阅代理组件文件以了解 ConsumerID 如何自动生成。可以设置为字符串值(如上面示例中的 "channel1")或字符串格式值(如 "{podName}" 等)。查看你可以在组件元数据中使用的所有模板标签。
endpointN组件要使用的 AWS 端点。仅用于本地开发,例如使用 localstack。当针对生产环境 AWS 运行时,不需要 endpoint"http://localhost:4566"
sessionTokenN要使用的 AWS 会话令牌。仅当你使用临时安全凭证时才需要会话令牌"TOKEN"
messageReceiveLimitN在处理消息失败后,接收消息的次数,达到该次数后,会将该消息从队列中移除。如果指定了 sqsDeadLettersQueueNamemessageReceiveLimit 是在处理消息失败后接收消息的次数,达到该次数后,会将该消息移动到 SQS 死信队列。默认值:1010
sqsDeadLettersQueueNameN此应用程序的死信队列名称"myapp-dlq"
messageVisibilityTimeoutN消息发送给订阅者后,从接收请求中隐藏的时间(秒)。默认值:1010
messageRetryLimitN在处理消息失败后,从队列中移除该消息之前,重新发送该消息的次数。默认值:1010
messageWaitTimeSecondsN调用在返回之前等待消息到达队列的持续时间(秒)。如果有消息可用,则调用会早于 messageWaitTimeSeconds 返回。如果没有可用的消息且等待时间到期,则调用成功返回并返回空消息列表。默认值:11
messageMaxNumberN一次从队列接收的最大消息数。默认值:10,最大值:1010
fifoN使用 SQS FIFO 队列提供消息排序和去重。默认值:"false"。有关 SQS FIFO 的更多详情"true", "false"
fifoMessageGroupIDN如果启用了 fifo,指示 Dapr 为发布订阅部署使用自定义消息组 ID。这不是必需的,因为 Dapr 会为每个生产者创建自定义消息组 ID,从而确保每个 Dapr 生产者的消息排序。默认值:"""app1-mgi"
disableEntityManagementN当设置为 true 时,SNS 主题、SQS 队列以及 SNS 的 SQS 订阅不会自动创建。默认值:"false""true", "false"
disableDeleteOnRetryLimitN当设置为 true 时,在重试和处理消息失败 messageRetryLimit 次后,重置消息可见性超时,以便其他消费者可以尝试处理,而不是从 SQS 中删除该消息(默认行为)。默认值:"false""true", "false"
assetsManagementTimeoutSecondsNAWS 资源管理操作的超时时间(秒),超时后将取消操作。资源管理操作是在 STS、SNS 和 SQS 上执行的任何操作,除了实现默认 Dapr 组件重试行为的消息发布和消费操作。该值可以设置为任何非负浮点数/整数。默认值:50.5, 10
concurrencyModeN当从 SQS 批量接收消息时,按顺序(一次"单条"消息)或并发(“并行”)调用订阅者。默认值:"parallel""single", "parallel"
concurrencyLimitN定义处理消息的最大并发工作线程数。当 concurrencyMode 设置为 "single" 时,将忽略此值。要避免限制并发工作线程数,请将其设置为 0。默认值:0100

附加信息

符合 AWS 规范

Dapr 创建的 SNS 主题和 SQS 队列名称符合 AWS 规范。默认情况下,Dapr 根据消费者的 app-id 创建 SQS 队列名称,因此 Dapr 可能会执行名称标准化以符合 AWS 规范。

SNS/SQS 组件行为

当发布订阅 SNS/SQS 组件预配 SNS 主题时,SQS 队列和订阅在组件代表消息生产者(未部署订阅者应用程序)运行的情况下的行为,与存在订阅者应用程序(未部署发布者)的情况下的行为不同。

由于 SNS 在仅发布者设置中没有 SQS 订阅的工作方式,SQS 队列和订阅的行为类似于依赖于监听主题消息的订阅者的"经典"发布订阅系统。如果没有这些订阅者,消息:

  • 无法继续传递并会被有效丢弃
  • 对于未来的订阅者不可用(订阅者最终订阅时不会重放消息)

SQS FIFO

根据 AWS 规范,使用 SQS FIFO(fifo 元数据字段设置为 "true")提供消息排序和去重,但会降低 SQS 处理吞吐量,以及其他限制。

指定 fifoMessageGroupID 将所使用的 FIFO 队列的并发消费者数量限制为仅一个,但保证应用程序 Dapr 边车发布的消息的全局排序。请参阅此 AWS 博客文章以更好地了解消息组 ID 和 FIFO 队列的主题。

为避免丢失传递给消费者的消息顺序,SQS 组件的 FIFO 配置需要将 concurrencyMode 元数据字段设置为 "single"

默认并行 concurrencyMode

自 v1.8.0 起,该组件支持 "parallel" concurrencyMode 作为其默认模式。在以前的版本中,组件的默认行为是一次调用订阅者一条消息并等待其响应。

SQS 死信队列

在使用 SQS 死信队列配置发布订阅组件时,元数据字段 messageReceiveLimitsqsDeadLettersQueueName 必须同时设置为某个值。对于 messageReceiveLimit,该值必须大于 0,并且 sqsDeadLettersQueueName 不能为空字符串。

SNS/SQS 与 Dapr 的争用

从根本上说,SNS 通过创建对这些主题的 SQS 订阅,将来自多个发布者主题的消息聚合到单个 SQS 队列中。作为订阅者,SNS/SQS 发布订阅组件从该唯一的 SQS 队列消费消息。

然而,像任何 SQS 消费者一样,该组件无法有选择地检索发布到其特定订阅的 SNS 主题的消息。这可能导致组件接收来自没有关联处理程序的主题的消息。通常,这发生在:

  • 组件初始化: 如果基础设施订阅在组件订阅处理程序之前准备就绪,或
  • 关闭: 如果组件处理程序在基础设施订阅之前被移除。

由于此问题会影响多个 SNS 主题的任何 SQS 消费者,因此该组件无法防止消费来自缺少处理程序的主题的消息。当发生这种情况时,组件会记录一条错误,指示此类消息被错误地检索。

在这些情况下,未处理的消息将在每次拉取后在其接收计数递减的情况下重新出现在 SQS 中。因此,存在未处理的消息可能超过其 messageReceiveLimit 并丢失的风险。

创建 SNS/SQS 实例

对于本地开发,localstack 项目用于集成 AWS SNS/SQS。按照这些说明运行 localstack。

要使用 Docker 在本地从命令行运行 localstack,请应用以下命令:

docker run --rm -it -p 4566:4566 -p 4571:4571 -e SERVICES="sts,sns,sqs" -e AWS_DEFAULT_REGION="us-east-1" localstack/localstack

为了将 localstack 与你的发布订阅绑定一起使用,你需要在组件元数据中提供 endpoint 配置。当针对生产环境 AWS 运行时,不需要 endpoint

有关身份验证相关属性的信息,请参阅向 AWS 进行身份验证

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: snssqs-pubsub
spec:
  type: pubsub.aws.snssqs
  version: v1
  metadata:
    - name: accessKey
      value: "anyString"
    - name: secretKey
      value: "anyString"
    - name: endpoint
      value: http://localhost:4566
    # 如果提供给了 localstack,则使用 us-east-1 或任何其他区域,如 "AWS_DEFAULT_REGION" 环境变量所定义
    - name: region
      value: us-east-1

要在 Kubernetes 上运行 localstack,你可以应用以下配置。然后可以通过 DNS 名称 http://localstack.default.svc.cluster.local:4566 访问 Localstack(假设此配置应用于默认命名空间),该名称应作为 endpoint 使用。

apiVersion: apps/v1
kind: Deployment
metadata:
  name: localstack
spec:
  # 使用选择器,我们将暴露正在运行的部署
  # Kubernetes 知道给定的服务属于某个部署
  selector:
    matchLabels:
      app: localstack
  replicas: 1
  template:
    metadata:
      labels:
        app: localstack
    spec:
      containers:
      - name: localstack
        image: localstack/localstack:latest
        ports:
          # 暴露边缘端点
          - containerPort: 4566
---
kind: Service
apiVersion: v1
metadata:
  name: localstack
  labels:
    app: localstack
spec:
  selector:
    app: localstack
  ports:
  - protocol: TCP
    port: 4566
    targetPort: 4566
  type: LoadBalancer

为了在 AWS 中运行,请创建或分配具有对 SNS 和 SQS 服务权限的 IAM 用户,并使用类似以下的策略:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "YOUR_POLICY_NAME",
      "Effect": "Allow",
      "Action": [
        "sns:CreateTopic",
        "sns:GetTopicAttributes",
        "sns:ListSubscriptionsByTopic",
        "sns:Publish",
        "sns:Subscribe",
        "sns:TagResource",
        "sqs:ChangeMessageVisibility",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:GetQueueAttributes",
        "sqs:GetQueueUrl",
        "sqs:ReceiveMessage",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue"
      ],
      "Resource": [
        "arn:aws:sns:AWS_REGION:AWS_ACCOUNT_ID:*",
        "arn:aws:sqs:AWS_REGION:AWS_ACCOUNT_ID:*"
      ]
    }
  ]
}

使用 Kubernetes 密钥和 secretKeyRefAWS 账户 IDAWS 账户密钥插入组件元数据中的 accessKeysecretKey

或者,假设你想使用自己选择的工具(例如 Terraform)预配 SNS 和 SQS 资源,同时防止 Dapr 动态执行此操作。你需要启用 disableEntityManagement 并为你的使用 Dapr 的应用程序分配一个 IAM 角色,并使用类似以下的策略:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "YOUR_POLICY_NAME",
      "Effect": "Allow",
      "Action": [
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:ChangeMessageVisibility",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sns:Publish",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes"

      ],
      "Resource": [
        "arn:aws:sns:AWS_REGION:AWS_ACCOUNT_ID:APP_TOPIC_NAME",
        "arn:aws:sqs:AWS_REGION:AWS_ACCOUNT_ID:APP_ID"
      ]
    }
  ]
}

在上面的示例中,你在 EKS 集群上运行应用程序,并启用了动态资源创建(默认的 Dapr 行为)。

相关链接