本文最后更新于:2021年6月15日 晚上
Ons消息队列
前引:公司项目需要写一个推广系统,推广系统是一个项目,和主项目分离,项目间的通信使用Ons来完成,以下是公司代码略微修改的结果,以供参考。
术语:
Producer:消息生产者,负责产生消息,一般由业务系统负责产生消息。
Consumer:消息消费者,负责消费消息,一般是后台系统负责异步消费。
Producer ID:一类 Producer 的集合名称,这类 Producer 通常发送一类消息,丏发送逻辑一致。
Consumer ID:一类 Consumer 的集合名称,这类 Consumer 通常消费一类消息,丏消费逻辑一致。
广播消费:一条消息被多个 Consumer 消费,即使这些 Consumer 属于同一个 Consumer ID,消息也会被 Consumer ID中的每个 Consumer 都消费一次,广播消费中的 Consumer ID 概念可以讣为在消息划分方面无意义。
集群消费:一个 Consumer ID 中的 Consumer 实例平均分摊消费消息。 例如某个 Topic 有 9 条消息, 其中一个 ConsumerId 有 3 个实例(可能是 3 个进程,或者 3 台机器) ,那么每个实例只消费其中的 3 条消息
引入依赖
| <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version>1.1.2</version> </dependency>
|
一、发送消息
1、在阿里云控制台的消息队列中配置 Topic、Producer ID, Consumer ID
2、创建Ons消息生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| @Service public class ProducerService implements IProducerService {
private Producer producer;
@Override public void sendMessage(String topic, String tag, String content) { try { byte[] bytes = content.getBytes("UTF-8"); Message msg = new Message(topic, tag, bytes);
SendResult sendResult = producer.send(msg); System.out.println("SendMessageIng... topic:" + topic + " tag:" + tag + " "+ sendResult); } catch (UnsupportedEncodingException e) { e.printStackTrace(); }
}
@PostConstruct public void init() { Properties properties = new Properties(); properties.put(PropertyKeyConst.ProducerId, ProducerConstants.PRODUCERID_PCHOME); properties.put(PropertyKeyConst.AccessKey, ProducerConstants.ACCESSID); properties.put(PropertyKeyConst.SecretKey, ProducerConstants.ACCESSKEY); producer = ONSFactory.createProducer(properties);
producer.start(); }
@PreDestroy public void destory() { producer.shutdown(); } }
|
3、创建发送消息类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| import net.minidev.json.JSONObject; import com.aliyun.openservices.ons.api.Action; import com.aliyun.openservices.ons.api.ConsumeContext; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.MessageListener;
@Service public class OnsService implements IOnsService, MessageListener { @Resource private IProducerService producerService; public static final String TOPIC = "TopicTestONS"; public static final String TOPIC_TAG = "TagA"; @Override public Action consume(Message message, ConsumeContext context) { return null; }
@Override public void sendBindObject(int uid, int tid, int eid, String pageUrl) { JSONObject json = new JSONObject(); json.put("uid", uid); json.put("tid", tid); json.put("eid", eid); json.put("pageUrl", pageUrl); producerService.sendMessage(TOPIC, TOPIC_TAG, json.toJSONString()); } }
|
二、订阅消息
1、创建Ons消息消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| @Service @Lazy(value=false) public class ConsumerService implements IConsumerService { private Consumer consumer;
@PostConstruct public void init() { System.out.println("init...."); Properties properties = new Properties(); properties.put(PropertyKeyConst.ConsumerId, ConsumerConstants.CONSUMERID); properties.put(PropertyKeyConst.AccessKey,ConsumerConstants.ACCESSID); properties.put(PropertyKeyConst.SecretKey,ConsumerConstants.ACCESSKEY); properties.put(PropertyKeyConst.MessageModel,PropertyValueConst.CLUSTERING); consumer = ONSFactory.createConsumer(properties); consumer.start(); }
@Override public void subscribe(final String topic, final String subExpression, final MessageListener listener) { consumer.subscribe(topic, subExpression, listener); }
@PreDestroy public void desotry() { if (consumer != null) { consumer.shutdown(); } }
}
|
2、创建消费消息类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Service public class OnsConsumerService implements IOnsConsumerService { @Resource private IConsumerService consumerService; public static final String TOPIC = "TopicTestONS";
@PostConstruct public void init() { System.out.println("starting..."); onsConsumerService.subscribe(TOPIC, "*", new MessageListener() { @Override public Action consume(Message message, ConsumeContext context) { JSONObject parse = (JSONObject) JSONValue.parse(message.getBody()); int uid = JsonUtil.getInt(parse, "uid"); int tid = JsonUtil.getInt(parse, "tid"); int eid = JsonUtil.getInt(parse, "eid"); String pageUrl = JsonUtil.getString(parse, "pageUrl"); System.out.println(uid + tid + eid + pageUrl); return Action.CommitMessage; } }); } }
|
以上为Ons的集群订阅消息代码。
参考:
Ons官方文档
http://onsteam.oss-cn-hangzhou.aliyuncs.com/ALIYUN_ONS_USER_GUIDE.pdf