Ons消息队列

本文最后更新于:2021年6月15日 晚上

Ons消息队列

前引:公司项目需要写一个推广系统,推广系统是一个项目,和主项目分离,项目间的通信使用Ons来完成,以下是公司代码略微修改的结果,以供参考。

术语:

Producer:消息生产者,负责产生消息,一般由业务系统负责产生消息。

Consumer:消息消费者,负责消费消息,一般是后台系统负责异步消费。

Producer ID:一类 Producer 的集合名称,这类 Producer 通常发送一类消息,丏发送逻辑一致。

Consumer ID:一类 Consumer 的集合名称,这类 Consumer 通常消费一类消息,丏消费逻辑一致。

广播消费:一条消息被多个 Consumer 消费,即使这些 Consumer 属于同一个 Consumer ID,消息也会被 Consumer ID中的每个 Consumer 都消费一次,广播消费中的 Consumer ID 概念可以讣为在消息划分方面无意义。

集群消费:一个 Consumer ID 中的 Consumer 实例平均分摊消费消息。 例如某个 Topic 有 9 条消息, 其中一个 ConsumerId 有 3 个实例(可能是 3 个进程,或者 3 台机器) ,那么每个实例只消费其中的 3 条消息

  • 引入依赖

    • Maven方式
    1
    2
    3
    4
    5
    <dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>ons-client</artifactId>
    <version>1.1.2</version>
    </dependency>
    • jar包方式

      http://onsall.oss-cn-hangzhou.aliyuncs.com/aliyun-ons-client-java.tar.gz


一、发送消息

1、在阿里云控制台的消息队列中配置 Topic、Producer ID, Consumer ID

2、创建Ons消息生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Service
public class ProducerService implements IProducerService {

private Producer producer;

@Override
public void sendMessage(String topic, String tag, String content) {
try {
byte[] bytes = content.getBytes("UTF-8");
Message msg = new Message(topic, tag, bytes);

SendResult sendResult = producer.send(msg);
System.out.println("SendMessageIng... topic:" + topic + " tag:" + tag + " "+ sendResult);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}

}

@PostConstruct
public void init() {
Properties properties = new Properties();
//PropertyKeyConst 是Ons官方定义的常量
//需要在阿里云控制台创建 AccessKey,SecretKey
properties.put(PropertyKeyConst.ProducerId, ProducerConstants.PRODUCERID_PCHOME);
//PRODUCERID_PCHOME是在控制台配置Topic 的 Producer ID
properties.put(PropertyKeyConst.AccessKey, ProducerConstants.ACCESSID);
properties.put(PropertyKeyConst.SecretKey, ProducerConstants.ACCESSKEY);
producer = ONSFactory.createProducer(properties);

// 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
producer.start();
}

@PreDestroy
public void destory() {
// 在应用退出前,销毁Producer对象
// 注意:如果不销毁也没有问题
producer.shutdown();
}
}

3、创建发送消息类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import net.minidev.json.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;

@Service
public class OnsService implements IOnsService, MessageListener {

@Resource
private IProducerService producerService;
public static final String TOPIC = "TopicTestONS";
public static final String TOPIC_TAG = "TagA";
@Override
public Action consume(Message message, ConsumeContext context) {
return null;
}

@Override
public void sendBindObject(int uid, int tid, int eid, String pageUrl) {
JSONObject json = new JSONObject();
json.put("uid", uid);
json.put("tid", tid);
json.put("eid", eid);
json.put("pageUrl", pageUrl);
producerService.sendMessage(TOPIC, TOPIC_TAG, json.toJSONString());
//第三个参数就是需要发送的内容,公司使用的Json
}
}

二、订阅消息

1、创建Ons消息消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Service
@Lazy(value=false)
public class ConsumerService implements IConsumerService {
private Consumer consumer;

@PostConstruct
public void init() {
System.out.println("init....");
Properties properties = new Properties();
properties.put(PropertyKeyConst.ConsumerId, ConsumerConstants.CONSUMERID);
properties.put(PropertyKeyConst.AccessKey,ConsumerConstants.ACCESSID);
properties.put(PropertyKeyConst.SecretKey,ConsumerConstants.ACCESSKEY);
properties.put(PropertyKeyConst.MessageModel,PropertyValueConst.CLUSTERING);
consumer = ONSFactory.createConsumer(properties);
consumer.start();
}

//MessageListener 是阿里Ons包内的消息监听器
@Override
public void subscribe(final String topic, final String subExpression,
final MessageListener listener) {
consumer.subscribe(topic, subExpression, listener);
}

@PreDestroy
public void desotry() {
if (consumer != null) {
consumer.shutdown();
}
}

}

2、创建消费消息类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Service
public class OnsConsumerService implements IOnsConsumerService {
@Resource
private IConsumerService consumerService;
public static final String TOPIC = "TopicTestONS";

@PostConstruct
public void init() {
System.out.println("starting...");
onsConsumerService.subscribe(TOPIC, "*", new MessageListener() {
@Override
public Action consume(Message message, ConsumeContext context) {
JSONObject parse = (JSONObject) JSONValue.parse(message.getBody());
int uid = JsonUtil.getInt(parse, "uid");
int tid = JsonUtil.getInt(parse, "tid");
int eid = JsonUtil.getInt(parse, "eid");
String pageUrl = JsonUtil.getString(parse, "pageUrl");

System.out.println(uid + tid + eid + pageUrl);
return Action.CommitMessage;
}
});
}
}

以上为Ons的集群订阅消息代码。

参考:

Ons官方文档

http://onsteam.oss-cn-hangzhou.aliyuncs.com/ALIYUN_ONS_USER_GUIDE.pdf


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!