spring大家太熟,就不多说了
rabbitmq一个amqp的队列服务实现,具体介绍请参考本文http://lynnkong.iteye.com/blog/1699684
本文侧重介绍如何将rabbitmq整合到项目中
ps:本文只是简单一个整合介绍,属于抛砖引玉,具体实现还需大家深入研究哈..
1.首先是生产者配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
<? xml version = "1.0" encoding = "UTF-8" ?>
< beans xmlns = "http://www.springframework.org/schema/beans"
xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
xmlns:context = "http://www.springframework.org/schema/context"
xmlns:rabbit = "http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<!-- 连接服务配置 -->
< rabbit:connection-factory id = "connectionFactory" host = "localhost" username = "guest"
password = "guest" port = "5672" />
< rabbit:admin connection-factory = "connectionFactory" />
<!-- queue 队列声明-->
< rabbit:queue id = "queue_one" durable = "true" auto-delete = "false" exclusive = "false" name = "queue_one" />
<!-- exchange queue binging key 绑定 -->
< rabbit:direct-exchange name = "my-mq-exchange" durable = "true" auto-delete = "false" id="my-mq-exchange<span></ span >">
< rabbit:bindings >
< rabbit:binding queue = "queue_one" key = "queue_one_key" />
</ rabbit:bindings >
</ rabbit:direct-exchange >
<-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现 -->
< bean id = "jsonMessageConverter" class = "mq.convert.FastJsonMessageConverter" ></ bean >
<-- spring template声明-->
< rabbit:template exchange = "my-mq-exchange" id = "amqpTemplate" connection-factory = "connectionFactory" message-converter = "jsonMessageConverter" />
</ beans >
|
2.fastjson messageconver插件实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;
// 这里本人换成fastjson的jsonobject实现
import fe.json.FastJson;
public class FastJsonMessageConverter extends AbstractMessageConverter {
private static Log log = LogFactory.getLog(FastJsonMessageConverter. class );
public static final String DEFAULT_CHARSET = "UTF-8" ;
private volatile String defaultCharset = DEFAULT_CHARSET;
public FastJsonMessageConverter() {
super ();
//init();
}
public void setDefaultCharset(String defaultCharset) {
this .defaultCharset = (defaultCharset != null ) ? defaultCharset
: DEFAULT_CHARSET;
}
public Object fromMessage(Message message)
throws MessageConversionException {
return null ;
}
public <T> T fromMessage(Message message,T t) {
String json = "" ;
try {
json = new String(message.getBody(),defaultCharset);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return (T) FastJson.fromJson(json, t.getClass());
}
protected Message createMessage(Object objectToConvert,
MessageProperties messageProperties)
throws MessageConversionException {
byte [] bytes = null ;
try {
String jsonString = FastJson.toJson(objectToConvert);
bytes = jsonString.getBytes( this .defaultCharset);
} catch (UnsupportedEncodingException e) {
throw new MessageConversionException(
"Failed to convert Message content" , e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
messageProperties.setContentEncoding( this .defaultCharset);
if (bytes != null ) {
messageProperties.setContentLength(bytes.length);
}
return new Message(bytes, messageProperties);
}
} |
3.生产者端调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
import java.util.List;
import org.springframework.amqp.core.AmqpTemplate;
public class MyMqGatway {
@Autowired
private AmqpTemplate amqpTemplate;
public void sendDataToCrQueue(Object obj) {
amqpTemplate.convertAndSend( "queue_one_key" , obj);
}
} |
4.消费者端配置(与生产者端大同小异)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
<? xml version = "1.0" encoding = "UTF-8" ?>
< beans xmlns = "http://www.springframework.org/schema/beans"
xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
xmlns:context = "http://www.springframework.org/schema/context"
xmlns:rabbit = "http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<!-- 连接服务配置 -->
< rabbit:connection-factory id = "connectionFactory" host = "localhost" username = "guest"
password = "guest" port = "5672" />
< rabbit:admin connection-factory = "connectionFactory" />
<!-- queue 队列声明-->
< rabbit:queue id = "queue_one" durable = "true" auto-delete = "false" exclusive = "false" name = "queue_one" />
<!-- exchange queue binging key 绑定 -->
< rabbit:direct-exchange name = "my-mq-exchange" durable = "true" auto-delete = "false" id = "my-mq-exchange" >
< rabbit:bindings >
< rabbit:binding queue = "queue_one" key = "queue_one_key" />
</ rabbit:bindings >
</ rabbit:direct-exchange >
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 taskExecutor这个需要自己实现一个连接池 按照官方说法 除非特别大的数据量 一般不需要连接池-->
< rabbit:listener-container connection-factory = "connectionFactory" acknowledge = "auto" task-executor = "taskExecutor" >
< rabbit:listener queues = "queue_one" ref = "queueOneLitener" />
</ rabbit:listener-container >
</ beans >
|
5.消费者端调用
1
2
3
4
5
6
7
8
9
|
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class QueueOneLitener implements MessageListener{
@Override
public void onMessage(Message message) {
System.out.println( " data :" + message.getBody());
}
} |
6.由于消费端当队列有数据到达时,对应监听的对象就会被通知到,无法做到批量获取,批量入库,因此可以在消费端缓存一个临时队列,将mq取出来的数据存入本地队列,后台线程定时批量处理即可
相关推荐
Spring boot整合消息队列RabbitMQ
本项目基于Spring的AMQP模块,整合流行的开源消息队列中间件rabbitMQ,实现一个向rabbitMQ添加和读取消息的功能。并比较了两种模式:生产者-消费者模式和发布-订阅模式的区别。AMQP作为比JMS更加高级的消息协议,支持...
70、秒杀系统高并发之消息队列RabbitMQ和代码编写 71、秒杀系统高并发之RabbitMQ和spring整合
本项目基于Spring的AMQP模块,整合流行的开源消息队列中间件rabbitMQ,实现一个向rabbitMQ添加和读取消息的功能。并比较了两种模式:生产者-消费者模式和发布-订阅模式的区别。AMQP作为比JMS更加高级的消息协议,支持...
https://www.jianshu.com/p/3841059f7ca3 整合spring和rabbitmq
springcloud bus rabbitmq 分布式队列 http://knight-black-bob.iteye.com/blog/2356839
本篇文章主要介绍了消息队列 RabbitMQ 与 Spring 整合使用的实例代码,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
本篇文章主要介绍了rabbitmq结合spring实现消息队列优先级的方法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
本项目基于Spring的AMQP模块,整合流行的开源消息队列中间件rabbitMQ,实现一个向rabbitMQ添加和读取消息的功能。并比较了两种模式:生产者-消费者模式和发布-订阅模式的区别。AMQP作为比JMS更加高级的消息协议,支持...
springboot整合rabbitmq使用死信队列
mysql需开启binlog 查看是否开启binlog ... 新增队列:test.queue, 绑定canal.queue, RoutingKey:canal.routing.key canal下载及配置 https://github.com/alibaba/canal/releases/tag/canal-1.1.5 配置文件见附件
网络视频资源,如有侵权请留言/举报,资源过大上传乃是下载链接!!! 『课程介绍』: Rabbitmq企业级的消息服务系统,可以在应用间收发消息,本课程主要学习...10、整合spring方式-1 11、整合sring方式-2 12、消息持久化
积分管理系统java源码 ...3)与SpringAMQP完美整合,API丰富 4)集群模式丰富,表达式配置,HA模式,镜像队列模型 5)保证数据不丢失的前提下做到高可靠性、可用性 RabbitMQ安装与使用 Window 安装Erlang 去官
对于刚刚学习Rabbitmq的小...后续通过SpringBoot整合Rabbitmq代码示例讲解了常见面试题,如什么是死信队列、实现TTL队列、TTL实现消息队列缺陷的补足方法、延迟队列的优化方案等等,比较适合刚接触消息队列的伙伴学习。
Spring 整合 RabbitMQ简单案例,内涵详细注释,包含RabbitMQ的五种消息队列模式
SpringBoot整合RabbitMQ之Spring事件驱动模型-系统源码数据库流程图 SpringBoot整合RabbitMQ实战视频教程:https://edu.csdn.net/course/detail/9314 (感兴趣也可以加QQ联系:1974544863)
RabbitMQ连接池+SpringBoot实现。通过连接池实现将高效的管理RabbitMQ的Connection,并与springboot进行整合,实现消息发送,获取队列列表等功能。基于此可以进行更多功能的扩充。
SpringBoot-RabbitMQ消息层次 这个指南将引导你建立一个RabbitMQ AMQP服务器发布和订阅消息的过程。 声明 可以使用本人阿里云安装好的RabbitMQ服务器 host:http://120.27.114.229 username:root password:root port...
RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间传递数据,RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的。 特点: RabbitMQ底层使用Erlang语言编写,传递...
springboot整合RabbitMQ实现死信/死信队列及实现源码及教程,参考博客:https://blog.csdn.net/qq_29914837/article/details/93334313