1.安装:
下载下面的rpm包:
erlang-17.4-1.el6.x86_64.rpm
haproxy-1.5.18-1.el6.x86_64.rpm
openssl-1.0.1e-57.el6.x86_64.rpm
rabbitmq-server-3.6.3-1.noarch.rpm
socat-1.7.2.4-1.el6.rf.x86_64.rpm
在linux中创建一个.sh文件里面复制进去下面命令:
#!/bin/bash
mydir=`pwd`
cd/etc/yum.repo.d/
rm-rf*.repo
cd$mydir
yum-yinstallopenssl-1.0.1e-57.el6.x86_64
yum-yinstallsocat-1.7.2.4-1.el6.rf.x86_64.rpm
yum-yinstallerlang-17.4-1.el6.x86_64.rpm
yum-yinstallrabbitmq-server-3.6.3-1.noarch.rpm
rabbitmq-pluginsenablerabbitmq_management
servicerabbitmq-serverrestart
rabbitmqctladd_useradminadmin
rabbitmqctlset_permissionsadmin".*"".*"".*"
rabbitmqctlset_user_tagsadminadministrator
修改创建文件的执行文件:chmod777文件名
执行完成之后:
启动:servicerabbitmq-serverstart
停止:servicerabbitmq-serverstop
开放端口号:5672(端口号)和15672(客户端连接的端口号)
启动rabbitmq错误:
FAILED-check/var/log/rabbitmq/startup_\{log,_err\}
解决:
查看一下自己的主机名:hostname
改成127.0.0.1localhost
vim/etc/hosts
2.和java做整合:
首先在spring的xml中的头文件中配置rabbit的头文件信息
xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
然后就可以配置和rabbit连接的信息,这里定义的是直连虚拟机,就是点对点的
<!--配置connnection-factory,指定连接rabbitserver的参数--><rabbit:connection-factoryid="connectionFactory"host="192.168.133.131"username="admin"password="admin"port="5672"virtual-host="1601"/><!--通过指定下面的admin信息,当前provider中的exchange和queue会在rabbitmq服务器上自动生成--><rabbit:adminconnection-factory="connectionFactory"/><!--声明队列--><rabbit:queueauto-delete="true"durable="false"exclusive="false"name="queue_1"/><!--声明directexchange路由交换机,绑定队列,并且不持久化--><rabbit:direct-exchangename="dct_ex"auto-delete="true"durable="false"><rabbit:bindings><!--绑定队列与ROUTING_KEY--><rabbit:bindingqueue="queue_1"key="rtkey"/></rabbit:bindings></rabbit:direct-exchange><!--定义rabbittemplate用于数据的接收和发送--><!--绑定关系--><rabbit:templateid="amqTemplate"connection-factory="connectionFactory"exchange="dct_ex"routing-key="rtkey"/>
自定义监听queue的类
<!--queuelitener观察监听模式当有消息到达时会通知监听在对应队列上的监听对象--><rabbit:listener-containeracknowledge="auto"connection-factory="connectionFactory"><rabbit:listenerqueue-names="queue_1"ref="consumer"method="listen"/></rabbit:listener-container><beanid="consumer"class="www.lj.com.rabbitmq.Consumer"/>
类的信息:
publicclassConsumer{@ResourceprivateTvMappertvMapper;//2.定义数据操作对象privateObjectMapperobjectMapper=newObjectMapper();//1定义json转化的对象//具体执行业务的方法publicvoidlisten(Stringjson){System.out.println("从队列中取出:"+json);//3.转化成对象try{T_tventity=objectMapper.readValue(json,T_tv.class);//4.执行保存tvMapper.insertMiddleTable(entity.getId(),entity.getTidss());}catch(Exceptione){//TODOAuto-generatedcatchblocke.printStackTrace();}}}
测试运行:
@ResourceprivateRabbitTemplatetabbitTemplate;//把对象转换json格式的字符串privateObjectMapperobjectMapper=newObjectMapper();@OverridepublicvoidinsertObject(T_tvtv)throwsException{tvMapper.insertObject(tv);Stringstring=objectMapper.writeValueAsString(tv);tabbitTemplate.convertAndSend(string);}
然后在listen方法中就会从queue队列中取出效果如下:
3.配置定时任务:
需要在springxml中的头文件中配置:
xmlns:task="http://www.springframework.org/schema/task"xmlns:context="http://www.springframework.org/schema/context"http://www.springframework.org/schema/taskhttp://www.springframework.org/schema/task/spring-task-4.3.xsdhttp://www.springframework.org/schema/contexthttp://www.springframework.org/schema/context/spring-context-4.3.xsd
下面的配置和之前的一样,多出两句一个是扫描定时任务的包,一个是启动定时任务的注解
<!--配置connnection-factory,指定连接rabbitserver的参数--><rabbit:connection-factoryid="connectionFactory"host="192.168.133.131"username="admin"password="admin"port="5672"virtual-host="1601I"/><!--通过指定下面的admin信息,当前provider中的exchange和queue会在rabbitmq服务器上自动生成--><rabbit:adminconnection-factory="connectionFactory"/><!--声明队列--><rabbit:queueauto-delete="true"durable="false"exclusive="false"name="queue_1"/><!--声明directexchange路由交换机,绑定队列,并且持久化--><rabbit:direct-exchangename="dct_ex"auto-delete="true"durable="false"><rabbit:bindings><!--绑定队列与ROUTING_KEY--><rabbit:bindingqueue="queue_1"key="rtkey"/></rabbit:bindings></rabbit:direct-exchange><!--定义rabbittemplate用于数据的接收和发送--><!--绑定关系--><rabbit:templateid="amqTemplate"connection-factory="connectionFactory"exchange="dct_ex"routing-key="rtkey"/><!--配置定时任务扫描的包--><context:component-scanbase-package="www.lj.com.rabbitmq"/><!--启动定时任务的注解--><task:annotation-driven/>
下面是扫描包下的类:
@ComponentpublicclassConsumerTask{//******//第一个*:多少秒执行一次如果为*则为每秒执行一次//第二个*:多少分钟执行一次//第三个*:多少小时执行一次//第四个*:一个月中的第几天执行一次//第五个*:第几个月执行一次//第六个*:一周中的星期几执行一次通常情况下用'?'//?只能用于第四个和第六个其中一个,因为两个会冲突//-表示范围//'/'表示每隔多长时间执行一次//'*'表示任意值@ResourceprivateRabbitTemplaterabbitTemplate;@ResourceprivateTvMappertvMapper;privateObjectMapperobjectMapper=newObjectMapper();@Scheduled(cron="*/2*****")publicvoidregularTask()throwsException{System.out.println(newSimpleDateFormat("yyyy-MM-ddhh:mm:ss").format(newDate(System.currentTimeMillis())));//接收到的消息Messagereceive=rabbitTemplate.receive("queue_1");if(receive!=null){//获取收到的数据byte[]body=receive.getBody();//将获取到的字节数组转化为String类型的字符串Stringjson=newString(body);T_tvt_tv=objectMapper.readValue(json,T_tv.class);tvMapper.insertMiddleTable(t_tv.getId(),t_tv.getTidss());}else{System.out.println("队列中无数据......");}}}
测试运行:
类里配置的2秒去queue队列中获取一次得到以下结果:
改成30秒后添加两条数据:
将时间改为2秒执行1次效果如下: