AWS MQ 消息队列

概述

Amazon MQ 是 Apache ActiveMQ 和 RabbitMQ 的托管式消息代理服务,提供高可用的消息队列和发布/订阅功能。相比自建消息代理,Amazon MQ 免除了运维负担,支持单可用区和多可用区部署模式,兼容现有 RabbitMQ 和 ActiveMQ 客户端。

核心特性:

  • 全托管式 ActiveMQ 和 RabbitMQ
  • 多可用区部署(高可用,故障自动切换)
  • 与 IAM、VPC、私有链接深度集成
  • 支持强加密(静态 + 传输)
  • 兼容 JMS 1.1/2.0、AMQP 1.0、MQTT、WebSocket
  • 监控指标深度集成 CloudWatch

部署模式对比

特性 单实例(单 AZ) 备用实例(多 AZ)
部署 1 个代理节点 1 主 + 1 备用跨 AZ
高可用 无自动故障切换 自动故障切换(RTO < 30s)
价格 较低 约 2x 单实例
适用 开发测试、非关键业务 生产环境

ActiveMQ vs RabbitMQ 选择

特性 ActiveMQ Classic RabbitMQ
协议 AMQP 1.0, JMS, MQTT, OpenWire AMQP 0-9-1, STOMP, MQTT, HTTP
多租户 虚拟目的地 虚拟主机(vhost)
集群 被动主从(Replicated LevelDB) 主动集群(Quorum Queue)
管理界面 Web Console Web UI + CLI
消息堆积 一般 优秀(Lazy Queue)
推荐场景 企业集成(ESB)、JMS 老应用 微服务、云原生应用

创建 ActiveMQ 代理

Console 创建步骤

1. 打开 Amazon MQ 控制台 → 选择 ActiveMQ 引擎

2. 选择代理引擎版本(推荐 5.17.6 或 5.18)

3. 选择部署模式:备用实例(生产)或 单实例(测试)

4. 选择实例类型(mq.m5.large 起)

5. 配置 VPC、子网、安全组

6. 开启公开访问(仅测试用)或保持私有

7. 配置用户名/密码

8. 确认后创建(约 15 分钟)

AWS CLI 创建


# 创建 ActiveMQ 代理(多 AZ 高可用)
aws mq create-broker   --broker-name prod-activemq   --broker-version "5.17.6"   --engine-type ActiveMQ   --engine-version "5.17.6"   --host-instance-type mq.m5.large   --deployment-mode ACTIVE_STANDBY_MULTI_AZ   --subnet-ids subnet-0abcd1234efgh5678 subnet-0ijkl9012mnop3456   --security-groups sg-0abcd1234efgh5678   --maintenance-window-start-time "dayOfWeek=MONDAY,timeOfDay=04:00,timeZone=UTC"   --logs "general=true"   --publicly-accessible   --authentication-strategy simple   --ldap metadata     "SystemUsername=cn=admin,dc=example,dc=com"     "SystemPassword=YourSecurePassword123!"     "ServiceUsername=admin"     "ServicePassword=YourSecurePassword123!"   --output json

# 创建 RabbitMQ 代理
aws mq create-broker   --broker-name prod-rabbitmq   --broker-version "3.12.13"   --engine-type RabbitMQ   --engine-version "3.12.13"   --host-instance-type mq.m5.large   --deployment-mode ACTIVE_STANDBY_MULTI_AZ   --rabbitmq-cluster-configuration '{"InstanceType":"mq.m5.large","Principal":"arn:aws:iam::123456789012:role/RabbitMQRole","Users":[{"Password":"YourSecurePassword123!","Username":"admin"},{"Password":"UserPassword456!","Username":"appuser"}]}'   --subnet-ids subnet-0abcd1234efgh5678 subnet-0ijkl9012mnop3456   --security-groups sg-0abcd1234efgh5678   --publicly-accessible   --output json

ActiveMQ 使用

连接 ActiveMQ


from qpid_interop import qpid
import stomp
import json

# STOMP 连接(推荐,跨语言)
conn = stomp.Connection([('b-xxxxxxx-1.activemq.us-east-1.amazonaws.com', 61614)])
conn.connect('admin', 'YourSecurePassword123!', wait=True, headers={'client-id': 'app-01'})

# JMS 连接(Java 专用)
from javax.jms import Session, TextMessage
from org.apache.activemq import ActiveMQConnectionFactory

factory = ActiveMQConnectionFactory(
    'failover:(ssl://b-xxxxxxx-1.activemq.us-east-1.amazonaws.com:61617,ssl://b-xxxxxxx-2.activemq.us-east-1.amazonaws.com:61617)?randomize=false'
)

队列操作


import stomp

class MyListener(stomp.ConnectionListener):
    def on_message(self, frame):
        print(f"收到消息: {frame.body}")
        headers = frame.headers
        msg_id = headers.get('message-id')
        # 手动 ACK
        conn.ack(msg_id, subscription=headers.get('subscription'))

conn = stomp.Connection([('b-xxxxxxx-1.activemq.us-east-1.amazonaws.com', 61614)])
conn.set_listener('', MyListener())
conn.connect('admin', 'YourSecurePassword123!')
conn.subscribe(destination='/queue/orders', id='sub-1', ack='client-individual')

# 发送消息
conn.send(body=json.dumps({'order_id': '12345', 'amount': 199.00}),
          destination='/queue/orders',
          persistent='true',
          headers={'content-type': 'application/json'})

conn.disconnect()

发布/订阅(Topic)


# 发布者
conn.send(body=json.dumps({'event': 'user.created', 'user_id': 1000}),
          destination='/topic/app-events',
          persistent='true')

# 订阅者( durable subscriber)
conn.subscribe(destination='/topic/app-events', id='sub-app-01', ack='auto')

死信队列(DLQ)配置

默认情况下,ActiveMQ 会将消费失败的消息发送到 ActiveMQ.DLQ。建议为每个业务队列配置独立的 DLQ:


<!-- activemq.xml 高级配置 -->
<destinationPolicy>
  <policyMap>
    <policyEntries>
      <policyEntry queue="orders">
        <deadLetterStrategy>
          <individualDeadLetterQueue queuePrefix="DLQ." useQueueForQueueMessages="true"/>
        </deadLetterStrategy>
      </policyEntry>
    </policyEntries>
  </policyMap>
</destinationPolicy>

RabbitMQ 使用

连接 RabbitMQ


import pika
import json

# AMQP 连接(SSL)
credentials = pika.PlainCredentials('admin', 'YourSecurePassword123!')
ssl_options = pika.SSLOptions(
    context=ssl.create_default_context(),
    server_hostname='b-xxxxxxx-1.rabbitmq.us-east-1.amazonaws.com'
)

params = pika.ConnectionParameters(
    host='b-xxxxxxx-1.rabbitmq.us-east-1.amazonaws.com',
    port=5671,
    credentials=credentials,
    ssl=ssl_options,
    heartbeat=60,
    blocked_connection_timeout=300
)

connection = pika.BlockingConnection(params)
channel = connection.channel()

队列操作


# 声明队列
channel.queue_declare(queue='orders', durable=True,
                      arguments={'x-message-ttl': 86400000})  # 24h TTL

# 发布消息
channel.basic_publish(
    exchange='',
    routing_key='orders',
    body=json.dumps({'order_id': '12345', 'amount': 199.00}),
    properties=pika.BasicProperties(
        delivery_mode=2,  # 持久化
        content_type='application/json',
        expiration='86400000'  # 24h
    )
)

# 消费消息
def callback(ch, method, properties, body):
    order = json.loads(body)
    print(f"处理订单: {order}")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=10)
channel.basic_consume(queue='orders', on_message_callback=callback, auto_ack=False)
channel.start_consuming()

交换机与路由


# 声明交换机
channel.exchange_declare(exchange='app.events', exchange_type='topic', durable=True)

# 绑定队列到交换机
channel.queue_bind(exchange='app.events', queue='orders', routing_key='order.*')
channel.queue_bind(exchange='app.events', queue='notifications', routing_key='user.*')

# 发布到交换机
channel.basic_publish(
    exchange='app.events',
    routing_key='order.created',
    body=json.dumps({'event': 'order.created', 'data': {...}})
)

高可用与故障切换

ActiveMQ 自动故障切换


import stomp

# 故障切换连接字符串(两个 AZ 的 broker 均填入)
brokers = [
    ('b-xxxxxxx-1.activemq.us-east-1.amazonaws.com', 61614),
    ('b-xxxxxxx-2.activemq.us-east-1.amazonaws.com', 61614)
]
conn = stomp.Connection(brokers)
conn.connect('admin', 'YourSecurePassword123!', wait=True)

RabbitMQ 高可用

RabbitMQ ActiveMQ 风格集群在 Amazon MQ 中自动配置:

  • 主节点故障时,备用节点自动接管
  • 客户端需在连接字符串中配置所有节点
  • 队列默认镜像到所有节点(ha-mode: all

# RabbitMQ 连接多个节点实现故障切换
params = pika.ConnectionParameters(
    host='b-xxxxxxx-1.rabbitmq.us-east-1.amazonaws.com',
    port=5671,
    # 如果第一个节点失败,pika 会尝试连接第二个节点
    connection_attempts=3,
    retry_delay=5
)

监控指标

CloudWatch 关键指标

指标 说明 告警阈值建议
ConsumerCount 消费者数量 骤降为 0 需关注
MessageCount 队列消息数 持续堆积需处理
ProducerCount 生产者数量 骤降可能影响生产
EnqueueCount 入队消息数 监控流量
DequeueCount 出队消息数 与入队比判断堆积
ExpiredCount TTL 过期消息数 大量过期需检查消费
MemoryUsage 内存使用率 > 80% 告警
DiskUsage 磁盘使用率 > 70% 告警

# 设置消息堆积告警
aws cloudwatch put-metric-alarm   --alarm-name prod-mq-queue-depth-high   --alarm-description "ActiveMQ 队列消息堆积超过 1000"   --metric-name MessageCount   --namespace AWS/AmazonMQ   --statistic Maximum   --period 300   --evaluation-periods 2   --threshold 1000   --comparison-operator GreaterThanThreshold   --dimensions "Name=Broker,Value=prod-activemq,Name=Queue,Value=orders"   --alarm-actions arn:aws:sns:us-east-1:123456789012:ops-alerts

日志监控


# 查看代理日志
aws mq list-broker-instance-options   --broker prod-activemq

# 通过 CloudWatch Logs 查看
aws logs tail /aws/amazonmq/prod-activemq/general --follow

安全配置

安全组配置

方向 协议 端口 来源
入站 TCP 61617(ActiveMQ OpenWire/AMQP SSL) 应用服务器 SG
入站 TCP 5671(RabbitMQ AMQP SSL) 应用服务器 SG
入站 TCP 8883(MQTT) IoT 设备 SG
入站 TCP 8162(Web Console) 运维 VPN

IAM 策略(RabbitMQ)


{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": ["mq:GetConnection*", "mq:DescribeBroker"],
      "Resource": "arn:aws:mq:us-east-1:123456789012:broker:prod-rabbitmq:*"
    }
  ]
}

迁移与运维

从自建 ActiveMQ 迁移

1. 使用 Apache Camel 或 Spring Boot + Spring JMS 编写迁移程序

2. 双写期:新旧系统同时写入,验证数据一致性

3. 灰度切换消费者到 Amazon MQ

4. 确认消费无遗漏后停止旧系统

维护窗口操作


# 重启代理(滚动重启,零 downtime)
aws mq reboot-broker   --broker-id prod-activemq

# 增加代理实例类型(需重启)
aws mq update-broker   --broker-id prod-activemq   --host-instance-type mq.m5.xlarge

常见问题

Q: ActiveMQ 连接经常断开?

A: 1) 检查安全组是否允许所有相关端口;2) 心跳设置不当导致防火墙断连(transport.heartRate);3) 连接超时设置过短(建议 30s+);4) 确认使用 SSL 端口(61617)而非普通端口(61616)。

Q: 消息丢失?

A: 1) 生产者端:使用事务会话(Session.SESSION_TRANSACTED)或开启 publisher confirms;2) 持久化消息:确保 persistent=true;3) 消费者端:使用手动 ACK,消息处理成功后再 ACK;4) 死信队列监控,排查被丢弃的消息。

Q: RabbitMQ 队列消息堆积?

A: 1) 消费者不足或处理慢,增加消费者实例;2) 消费者异常未正常 ACK,消息被 requeue;3) 检查 prefetch_count 是否设置过小;4) 查看 RabbitMQ Management UI 确认消费者状态。