Define a RabbitMQ broker endpoint in Camel is possible with the Bluelock camel-spring-amqp (https://github.com/Bluelock/camel-spring-amqp) library. It’s an Apache Camel component that allows to natively communicate with a RabbitMQ broker and it’s implemented using Spring’s AMQP.
For first, with Eclipse IDE create a new Maven Project with Artifact ID camel-arthetype-spring. This allows using Spring DSL to configure Camel route and execute the run:camel goal of Camel Mavel Pluing (Camel Maven Plugin) in a forked JVM from Maven.
To resolve the dependencies, these entries are mandatory:
<dependency>
<groupId>com.bluelock</groupId>
<artifactId>camel-spring-amqp</artifactId>
<version>1.2</version>
</dependency>
<!-- Camel dependencies -->
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-test</artifactId>
<version>${org.camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-spring</artifactId>
<version>${org.camel.version}</version>
<exclusions>
<exclusion>
<artifactId>spring-tx</artifactId>
<groupId>org.springframework</groupId>
</exclusion>
<exclusion>
<artifactId>spring-context</artifactId>
<groupId>org.springframework</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-xstream</artifactId>
<version>${org.camel.version}</version>
</dependency> |
<dependency>
<groupId>com.bluelock</groupId>
<artifactId>camel-spring-amqp</artifactId>
<version>1.2</version>
</dependency>
<!-- Camel dependencies -->
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-test</artifactId>
<version>${org.camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-spring</artifactId>
<version>${org.camel.version}</version>
<exclusions>
<exclusion>
<artifactId>spring-tx</artifactId>
<groupId>org.springframework</groupId>
</exclusion>
<exclusion>
<artifactId>spring-context</artifactId>
<groupId>org.springframework</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-xstream</artifactId>
<version>${org.camel.version}</version>
</dependency>
At this point edit the camel-context.xml available on src/main/resource/META-INF/spring folder:
<?xml version="1.0" encoding="UTF-8"?>
<!-- Configures the Camel Context-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:camel="http://camel.apache.org/schema/spring"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="spring-amqp:KipcastDirect:KipcastQueue:KipcastRouting?type=direct&autodelete=true&durable=true" />
<log message="Message available on a RabbitMQ Queue" />
<process ref="processorTest" />
</route>
</camelContext>
<rabbit:connection-factory id="amqpConnectionFactory" />
<rabbit:template id="amqpTemplate" connection-factory="amqpConnectionFactory" message-converter="messageConverter" exchange="KipcastBean" />
<rabbit:admin connection-factory="amqpConnectionFactory"/>
<bean id="amqpConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<property name="host" value="10.211.55.20"/>
<property name="port" value="5672"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="virtualHost" value="/"/>
</bean>
<bean id="jsonMessageConverter" class="amqp.spring.converter.XStreamConverter"/>
<bean id="textMessageConverter" class="amqp.spring.converter.StringConverter"/>
<bean id="messageConverter" class="amqp.spring.converter.ContentTypeConverterFactory">
<property name="converters">
<map>
<entry key="application/json" value-ref="jsonMessageConverter"/>
<entry key="application/xml" value-ref="textMessageConverter"/>
</map>
</property>
<property name="fallbackConverter" ref="textMessageConverter"/>
</bean>
</beans> |
<?xml version="1.0" encoding="UTF-8"?>
<!-- Configures the Camel Context-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:camel="http://camel.apache.org/schema/spring"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="spring-amqp:KipcastDirect:KipcastQueue:KipcastRouting?type=direct&autodelete=true&durable=true" />
<log message="Message available on a RabbitMQ Queue" />
<process ref="processorTest" />
</route>
</camelContext>
<rabbit:connection-factory id="amqpConnectionFactory" />
<rabbit:template id="amqpTemplate" connection-factory="amqpConnectionFactory" message-converter="messageConverter" exchange="KipcastBean" />
<rabbit:admin connection-factory="amqpConnectionFactory"/>
<bean id="amqpConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<property name="host" value="10.211.55.20"/>
<property name="port" value="5672"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="virtualHost" value="/"/>
</bean>
<bean id="jsonMessageConverter" class="amqp.spring.converter.XStreamConverter"/>
<bean id="textMessageConverter" class="amqp.spring.converter.StringConverter"/>
<bean id="messageConverter" class="amqp.spring.converter.ContentTypeConverterFactory">
<property name="converters">
<map>
<entry key="application/json" value-ref="jsonMessageConverter"/>
<entry key="application/xml" value-ref="textMessageConverter"/>
</map>
</property>
<property name="fallbackConverter" ref="textMessageConverter"/>
</bean>
</beans>
In this case, a route starting from a RabbitMQ queue to a system log and a listener on your queue that will be active until you terminate your maven came:run process has been created.
It’s very important to note that, on Spring XML & has to be quoted &
The following Java code should be used send a message to the Exchange defined on Camel Route:
@Test
public void test() throws KeyManagementException, NoSuchAlgorithmException, URISyntaxException, IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.211.55.20");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("KipcastDirect", "direct",
true, /* durable */
true, /* autodelete */
null); /* */
byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties.Builder basic = new AMQP.BasicProperties.Builder();
AMQP.BasicProperties minBasic = basic.build();
minBasic = basic.priority(0).deliveryMode(1).build();
channel.basicPublish("KipcastDirect", "KipcastRouting", minBasic, messageBodyBytes);
System.out.println(" [x] Sent ");
channel.close();
} |
@Test
public void test() throws KeyManagementException, NoSuchAlgorithmException, URISyntaxException, IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.211.55.20");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("KipcastDirect", "direct",
true, /* durable */
true, /* autodelete */
null); /* */
byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties.Builder basic = new AMQP.BasicProperties.Builder();
AMQP.BasicProperties minBasic = basic.build();
minBasic = basic.priority(0).deliveryMode(1).build();
channel.basicPublish("KipcastDirect", "KipcastRouting", minBasic, messageBodyBytes);
System.out.println(" [x] Sent ");
channel.close();
}
To test if listener works, for first run the queue listener and, after that, run the Junit class to send message. The output will be:
[pache.camel.spring.Main.main()] MainSupport INFO Apache Camel 2.10.3 starting
[pache.camel.spring.Main.main()] SpringCamelContext INFO Apache Camel 2.10.3 (CamelContext: camel-1) is starting
[pache.camel.spring.Main.main()] ManagementStrategyFactory INFO JMX enabled.
[pache.camel.spring.Main.main()] DefaultTypeConverter INFO Loaded 177 type converters
[pache.camel.spring.Main.main()] SpringAMQPComponent INFO Found AMQP ConnectionFactory in registry for 10.211.55.20
[pache.camel.spring.Main.main()] SpringAMQPComponent INFO Found AMQP Template in registry
[pache.camel.spring.Main.main()] SpringAMQPComponent INFO Found AMQP Administrator in registry
[pache.camel.spring.Main.main()] SpringAMQPEndpoint INFO Creating endpoint for KipcastDirect:KipcastQueue:KipcastRouting
[pache.camel.spring.Main.main()] SpringAMQPConsumer INFO Declared exchange KipcastDirect
[pache.camel.spring.Main.main()] SpringAMQPConsumer INFO Declared queue KipcastQueue
[pache.camel.spring.Main.main()] SpringAMQPConsumer INFO Declaring binding KipcastRouting
[pache.camel.spring.Main.main()] SpringAMQPConsumer INFO Started AMQP Async Listeners for spring-amqp://KipcastDirect:KipcastQueue:KipcastRouting?autodelete=true&durable=true&type=direct
[pache.camel.spring.Main.main()] SpringCamelContext INFO Route: route1 started and consuming from: Endpoint[spring-amqp://KipcastDirect:KipcastQueue:KipcastRouting?autodelete=true&durable=true&type=direct]
[pache.camel.spring.Main.main()] ultManagementLifecycleStrategy INFO StatisticsLevel at All so enabling load performance statistics
[pache.camel.spring.Main.main()] SpringCamelContext INFO Total 1 routes, of which 1 is started.
[pache.camel.spring.Main.main()] SpringCamelContext INFO Apache Camel 2.10.3 (CamelContext: camel-1) started in 0.505 seconds
[ SimpleAsyncTaskExecutor-1] route1 INFO Message available on a RabbitMQ Queue |
[pache.camel.spring.Main.main()] MainSupport INFO Apache Camel 2.10.3 starting
[pache.camel.spring.Main.main()] SpringCamelContext INFO Apache Camel 2.10.3 (CamelContext: camel-1) is starting
[pache.camel.spring.Main.main()] ManagementStrategyFactory INFO JMX enabled.
[pache.camel.spring.Main.main()] DefaultTypeConverter INFO Loaded 177 type converters
[pache.camel.spring.Main.main()] SpringAMQPComponent INFO Found AMQP ConnectionFactory in registry for 10.211.55.20
[pache.camel.spring.Main.main()] SpringAMQPComponent INFO Found AMQP Template in registry
[pache.camel.spring.Main.main()] SpringAMQPComponent INFO Found AMQP Administrator in registry
[pache.camel.spring.Main.main()] SpringAMQPEndpoint INFO Creating endpoint for KipcastDirect:KipcastQueue:KipcastRouting
[pache.camel.spring.Main.main()] SpringAMQPConsumer INFO Declared exchange KipcastDirect
[pache.camel.spring.Main.main()] SpringAMQPConsumer INFO Declared queue KipcastQueue
[pache.camel.spring.Main.main()] SpringAMQPConsumer INFO Declaring binding KipcastRouting
[pache.camel.spring.Main.main()] SpringAMQPConsumer INFO Started AMQP Async Listeners for spring-amqp://KipcastDirect:KipcastQueue:KipcastRouting?autodelete=true&durable=true&type=direct
[pache.camel.spring.Main.main()] SpringCamelContext INFO Route: route1 started and consuming from: Endpoint[spring-amqp://KipcastDirect:KipcastQueue:KipcastRouting?autodelete=true&durable=true&type=direct]
[pache.camel.spring.Main.main()] ultManagementLifecycleStrategy INFO StatisticsLevel at All so enabling load performance statistics
[pache.camel.spring.Main.main()] SpringCamelContext INFO Total 1 routes, of which 1 is started.
[pache.camel.spring.Main.main()] SpringCamelContext INFO Apache Camel 2.10.3 (CamelContext: camel-1) started in 0.505 seconds
[ SimpleAsyncTaskExecutor-1] route1 INFO Message available on a RabbitMQ Queue
At this point, you can have fun with Camel and RabbitMQ!!!!!
NOTES:
1 – Please be careful: the URI (from and to) on Camel Spring DSL context and JUnit class must refer to same Exchange and Queue to prevent a reply-text=PRECONDITION_FAILED – parameters for queue ‘QUEUE’ in vhost ‘/’ not equivalen error or similar. To check the queues/exchanges configuration parameter using:
rabbitmqadmin -V / list queue
rabbitmqadmin -V test list exchanges |
rabbitmqadmin -V / list queue
rabbitmqadmin -V test list exchanges
if you like this post, please click on the advertise :)