Tích hợp ActiveMQ với Spring
This post hasn't been updated for 7 years
Trong bài viết này tôi muốn giới thiệu với các bạn về ActiveMQ và việc tích hợp nó với framework srping 1.ActiveMQ là gì? ActiveMQ là một messaging open source nổi tiếng và mạnh mẽ,ActiveMQ có thể chạy độc lâp hay bên trong các tiến trình khác, ứng dụng server, hay ứng dụng JEE.- Hỗ trợ mọi thứ JMS yêu cầu, và có thể mở rộng.- Ngoài Java thì ActiveMQ có thể ứng dụng với .NET, C/C++, Ruby, Delphy. Bạn có thể tham khảo thêm thông tin tại trang http://activemq.apache.org/ 2.Lợi ích và làm thế nào để impliment ActiveMQ trong framwork spring ? 2.1 lợi ích :Một trong những tính năng rất hay của activeMQ đó là nó có cơ chế lưu lại các message trong queue và queu này có thể được lưu trữ dưới ổ đĩa hoặc database,khi đó nếu server bị crash hoặc message trong queue chưa được lấy ra vì lý do nào đó thì chúng ta cũng ko bị mất message vì nó đã được lưu trữ dưới file hoặc database,nên khi ta bật lại server thì ta có thẻ lấy lại các message chưa được gửi và gửi tiếp ngoài ra còn nhiều lợi ích khác nữa mà bạn có thể tham khảo tại đây 3.Giả sử bạn đã có một project maven+spring với đấy đủ các config.Tiếp đến tôi sẽ trình bay một demo nho nhỏ kết hợp giữa JMS và Activemq Ta có file activemq như sau dùng để nhúng service vào project
<!-- START SNIPPET: xbean -->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<context:property-placeholder location="classpath:application.properties"/>
<broker useJmx="false" xmlns="http://activemq.apache.org/schema/core" persistent="true">//it will save messages,default is false
<transportConnectors>
<transportConnector uri="${activemq.uri}" />//tcp://localhost:61616
</transportConnectors>
<persistenceAdapter>
<kahaPersistenceAdapter directory="${activemq.folder.queue}" maxDataFileLength="33554432"/>//where it will save the queue,nit kb,you can save into database
</persistenceAdapter>
</broker>
</beans>
<!-- END SNIPPET: xbean -->
5.Tiếp đến ta thiết lập kết nối tới service
<bean id="brokerContainer" class="org.apache.activemq.xbean.BrokerFactoryBean">
<property name="config" value="classpath:activemq.xml"/>
</bean>
<!-- JMS ConnectionFactory to use, configuring the embedded broker using XML -->
<bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory" scope="singleton">
<property name="brokerURL" value="${activemq.brokerURL}" />//fill broker url tcp://localhost:61616
</bean>
<bean id="producer" class="com.ActiveMQ.Producer">
</bean>
<bean id="consumer" class="com.ActiveMQ.Consumer" scope="singleton">
</bean>
6.Tạo class producer dùng để gửi một message tới hàng đợi
public class Producer {
@Autowired
private ActiveMQConnectionFactory activeMQConnectionFactory;
public void sendMailToQueue(MessageForm messageForm) throws JMSException {
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(CommonConstant.ACTIVE_QUEUE);
MessageProducer producer = session.createProducer(destination);
ObjectMessage objectMessage = session.createObjectMessage();
objectMessage.setObject(messageForm);
producer.send(objectMessage);
producer.close();
session.close();
connection.close();
}
}
7.Tạo class consumer dùng để nhận mesage và gửi đi băng JMS
public class Consumer {
@Autowired
private JavaMailSender mailSender;
@Autowired
private ActiveMQConnectionFactory activeMQConnectionFactory;
public void sendMail() throws MailAuthenticationException, MailParseException, MessagingException {
Connection connection = null;
try {
connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(CommonConstant.ACTIVE_QUEUE);
MessageConsumer consumer = session.createConsumer(destination);
Message message = consumer.receive(1000L);
consumer.close();
session.close();
connection.close();
if (message != null) {
if (message instanceof ObjectMessage) {
ObjectMessage objectMessage = (ObjectMessage) message;
MessageForm messageForm= (MessageForm) objectMessage.getObject();
MimeMessage mimeMessage = mailSender.createMimeMessage();
MimeMessageHelper messageHelper = null;
messageHelper = new MimeMessageHelper(mimeMessage, true);
messageHelper.setFrom(new InternetAddress(messageForm.getFrom(),messageForm.getFrom()));
messageHelper.setTo(messageForm.getTo());
messageHelper.setSubject(messageForm.getSubject());
String text = messageForm.getText();
if (null == text) {
text = "";
}
mimeMessage.setContent(text, "text/html; charset=utf-8");
mailSender.send(mimeMessage);
}
}
} catch (JMSException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
public class MessageForm implements Serializable {
private String from;
private String to;
private String subject;
private String text;
public MessageForm() {
}
public String getSubject() {
return subject;
}
public void setSubject(String subject) {
this.subject = subject;
}
public MessageForm(String from, String to, String subject, String text) {
this.from = from;
this.to = to;
this.subject = subject;
this.text = text;
}
public String getFrom() {
return from;
}
public void setFrom(String from) {
this.from = from;
}
public String getTo() {
return to;
}
public void setTo(String to) {
this.to = to;
}
public String getText() {
return text;
}
public void setText(String text) {
this.text = text;
}
}
Hy vọng bài viết này sẽ giúp ích bạn trong quá trình học tập và công tac Tài liệu tham khảo http://activemq.apache.org/
All Rights Reserved