Have you used the WSO2 MB with AMQP? If not this will help you get started on AMPQ with WSO2 MB. I am using the newly released WSO2MB 3.0.0-Alpha4 and RabbitMQ AMQP Java client in this example.
Setting up the project
For this example, I created a Maven project. Following code correspond to the pom.xml used in the project
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.github.a5anka.amqp.helloworld</groupId>
<artifactId>pub-sub</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.3.1</version>
</dependency>
</dependencies>
</project>
I have added RabbitMQ AMQP client as a dependency in the pom.xml file.
Publisher
Publisher class connect to the WSO2 MB and publish a simple text message to the message queue “hello”.
package io.github.a5anka.amqp.helloworld;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class Publisher {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv)
throws java.io.IOException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "Hello world!!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
Subscriber - Synchronous
Subscriber code specified below will connect to the WSO2 MB and retrieve the message published in “hello” message queue and print it in the console.
package io.github.a5anka.amqp.helloworld;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
public class Subscriber {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv)
throws java.io.IOException,
java.lang.InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, "amq.direct", QUEUE_NAME);
System.out.println(" [*] Waiting for messages.");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, false, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
Subscriber - Asynchronous
Following Subscription implementation receives messages asynchronously.
package io.github.a5anka.amqp.helloworld;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class AsyncSubscriber {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv)
throws java.io.IOException,
java.lang.InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, "amq.direct", QUEUE_NAME);
System.out.println(" [*] Waiting for messages.");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
Running the example
I assume you have installed the WSO2 MB in your machine and it is run using the default configuration. You can first run the publisher code. This will publish a message in to the “hello” queue. Then you can use subscriber code to retrieve and print the message in the message queue.