From 5e2755e5e8030b5c424d35203de5f6526746e611 Mon Sep 17 00:00:00 2001 From: Andy Kwok Date: Tue, 12 Dec 2017 14:45:24 +0800 Subject: [PATCH 1/3] Ch2-4 --- java/pom.xml | 50 +++++++++ .../SampleCode/ch2/BasicConfirmListener.java | 21 ++++ .../src/com/SampleCode/ch2/BasicConsumer.java | 35 ++++++ .../ch2/Listing2_1_HelloWorldConsumer.java | 84 ++++++++++++++ .../ch2/Listing2_1_HelloWorldProducer.java | 96 ++++++++++++++++ ...ting2_3_HelloWorldProducerWithConfirm.java | 105 ++++++++++++++++++ java/src/com/SampleCode/ch2/package-info.java | 8 ++ .../ch3/Listing3_3_LogListeners.java | 94 ++++++++++++++++ java/src/com/SampleCode/ch3/LogConsumer.java | 51 +++++++++ java/src/com/SampleCode/ch3/package-info.java | 8 ++ .../ch4/alertService/Listing4_1_2Alert.java | 96 ++++++++++++++++ .../ch4/alertService/NotifyConsumer.java | 48 ++++++++ .../ch4/alertService/package-info.java | 8 ++ .../Listing4_10_UploadPicturesTest.java | 5 + .../Listing4_11_ResizePictureConsumer.java | 5 + .../Listing4_12_rpc_server.java | 5 + .../Listing4_13_rpc_client.java | 5 + .../Listing4_7PicturesPublisher.java | 51 +++++++++ .../Listing4_8PicturesConsumer.java | 52 +++++++++ .../ch4/pictureUploader/PicConsumer.java | 29 +++++ .../ch4/pictureUploader/PicInJson.java | 24 ++++ .../ch4/pictureUploader/package-info.java | 9 ++ 22 files changed, 889 insertions(+) create mode 100644 java/pom.xml create mode 100644 java/src/com/SampleCode/ch2/BasicConfirmListener.java create mode 100644 java/src/com/SampleCode/ch2/BasicConsumer.java create mode 100644 java/src/com/SampleCode/ch2/Listing2_1_HelloWorldConsumer.java create mode 100644 java/src/com/SampleCode/ch2/Listing2_1_HelloWorldProducer.java create mode 100644 java/src/com/SampleCode/ch2/Listing2_3_HelloWorldProducerWithConfirm.java create mode 100644 java/src/com/SampleCode/ch2/package-info.java create mode 100644 java/src/com/SampleCode/ch3/Listing3_3_LogListeners.java create mode 100644 java/src/com/SampleCode/ch3/LogConsumer.java create mode 100644 java/src/com/SampleCode/ch3/package-info.java create mode 100644 java/src/com/SampleCode/ch4/alertService/Listing4_1_2Alert.java create mode 100644 java/src/com/SampleCode/ch4/alertService/NotifyConsumer.java create mode 100644 java/src/com/SampleCode/ch4/alertService/package-info.java create mode 100644 java/src/com/SampleCode/ch4/pictureUploader/Listing4_10_UploadPicturesTest.java create mode 100644 java/src/com/SampleCode/ch4/pictureUploader/Listing4_11_ResizePictureConsumer.java create mode 100644 java/src/com/SampleCode/ch4/pictureUploader/Listing4_12_rpc_server.java create mode 100644 java/src/com/SampleCode/ch4/pictureUploader/Listing4_13_rpc_client.java create mode 100644 java/src/com/SampleCode/ch4/pictureUploader/Listing4_7PicturesPublisher.java create mode 100644 java/src/com/SampleCode/ch4/pictureUploader/Listing4_8PicturesConsumer.java create mode 100644 java/src/com/SampleCode/ch4/pictureUploader/PicConsumer.java create mode 100644 java/src/com/SampleCode/ch4/pictureUploader/PicInJson.java create mode 100644 java/src/com/SampleCode/ch4/pictureUploader/package-info.java diff --git a/java/pom.xml b/java/pom.xml new file mode 100644 index 0000000..311f2b1 --- /dev/null +++ b/java/pom.xml @@ -0,0 +1,50 @@ + + 4.0.0 + RabbitMQInActionExampleCode + RabbitMQInActionExampleCode + 0.0.1-SNAPSHOT + + appendix-a + + + maven-compiler-plugin + 3.6.1 + + + + + + + + + + + + com.rabbitmq + amqp-client + 5.0.0 + + + + + + + org.apache.commons + commons-lang3 + 3.7 + + + + + + + org.json + json + 20171018 + + + + + + \ No newline at end of file diff --git a/java/src/com/SampleCode/ch2/BasicConfirmListener.java b/java/src/com/SampleCode/ch2/BasicConfirmListener.java new file mode 100644 index 0000000..838c3e9 --- /dev/null +++ b/java/src/com/SampleCode/ch2/BasicConfirmListener.java @@ -0,0 +1,21 @@ +package com.SampleCode.ch2; + +import java.io.IOException; + +import com.rabbitmq.client.ConfirmListener; + +public class BasicConfirmListener implements ConfirmListener { + + @Override + public void handleAck(long deliveryTag, boolean multiple) throws IOException { + System.out.println(" - (BasicConfirmListener); handle ack msg for: " + deliveryTag); + + } + + @Override + public void handleNack(long deliveryTag, boolean multiple) throws IOException { + System.out.println(" - (BasicConfirmListener); handle NNNack msg for: " + deliveryTag); + + } + +} diff --git a/java/src/com/SampleCode/ch2/BasicConsumer.java b/java/src/com/SampleCode/ch2/BasicConsumer.java new file mode 100644 index 0000000..3a2ec04 --- /dev/null +++ b/java/src/com/SampleCode/ch2/BasicConsumer.java @@ -0,0 +1,35 @@ +package com.SampleCode.ch2; + +import java.io.IOException; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; + +public class BasicConsumer extends DefaultConsumer{ + + public BasicConsumer(Channel channel) { + super(channel); + // TODO Auto-generated constructor stub + } + + + public void handleDelivery(String consumerTag, Envelope envelope, + AMQP.BasicProperties properties, byte[] body) + throws IOException { + /*String contentType = properties.getContentType(); + Map header = properties.getHeaders(); + long deliveryTag = envelope.getDeliveryTag(); + + System.out.println("ContentType: " + contentType); + System.out.println("Header: " + header); + System.out.println("DeliveryTag: " + deliveryTag);*/ + + + + String message = new String(body, "UTF-8"); + System.out.println(" [x] Received '" + message + "'"); + } + +} diff --git a/java/src/com/SampleCode/ch2/Listing2_1_HelloWorldConsumer.java b/java/src/com/SampleCode/ch2/Listing2_1_HelloWorldConsumer.java new file mode 100644 index 0000000..2f6666c --- /dev/null +++ b/java/src/com/SampleCode/ch2/Listing2_1_HelloWorldConsumer.java @@ -0,0 +1,84 @@ +package com.SampleCode.ch2; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.AMQP.Exchange.DeclareOk; +import com.rabbitmq.client.AMQP.Queue; + +/** + * Java version of Listing2.1 Example (Hello World consumer) + * P.31 + * @author andykwok + * + */ +public class Listing2_1_HelloWorldConsumer { + public static void main(String[] args) { + + + //Var declaration + String host = "127.0.0.1"; + int port = 5672; + String username = "guest"; + String password = "guest"; + + //Exchange config + String exName = "Hello-exchange"; + String exType = "direct"; + + + //Queue + String qName = "hello-queue"; + String routing_key = "hola"; + + //Properties + BasicProperties properties = new AMQP.BasicProperties.Builder() + .contentType("text/plain") + .build(); + + + //Construct a factory upon above parameters + ConnectionFactory factory = new ConnectionFactory (); + + factory.setHost(host); + factory.setPort(port); + factory.setUsername(username); + factory.setPassword(password); + + try(Connection conn = factory.newConnection(); + Channel ch = conn.createChannel();) { + + //Boolean: durable || autoDelete || internal (Passive)? + DeclareOk declareEx = ch.exchangeDeclare(exName, exType, true, false, false, null); + + if(declareEx != null) { + + //Declare exchange + Queue.DeclareOk declareQ = ch.queueDeclare(qName, true, false, true, null); + if (declareQ != null) { + + ch.queueBind(qName, exName, routing_key); + + System.out.println(" - All thing set (Start to consume!)"); + ch.basicConsume(qName, new BasicConsumer(ch)); + + }else { + System.err.println("Error, queue[" + qName + "] can't not be create!"); + } + }else { + System.err.println("Error, can't declare exchange !"); + } + + + + } catch (IOException | TimeoutException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } +} diff --git a/java/src/com/SampleCode/ch2/Listing2_1_HelloWorldProducer.java b/java/src/com/SampleCode/ch2/Listing2_1_HelloWorldProducer.java new file mode 100644 index 0000000..5ed7dbb --- /dev/null +++ b/java/src/com/SampleCode/ch2/Listing2_1_HelloWorldProducer.java @@ -0,0 +1,96 @@ +package com.SampleCode.ch2; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.AMQP.Queue; +import com.rabbitmq.client.AMQP.Exchange.DeclareOk; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +/** + * Java version of Listing2.1 Example (Hello World producer) + * P.29 + * + * Noted that as queue is not specificed in this case yet, all msg publish from + * this metho wouldn't be received by {@link Listing2_1_HelloWorldConsumer} which + * have specify "hello-queue" as its queue to use. + * @author andykwok + * + */ +public class Listing2_1_HelloWorldProducer { + + + public static void main(String[] args) { + + //Var declaration + String host = "127.0.0.1"; + int port = 5672; + String username = "guest"; + String password = "guest"; + + //Exchange config + String exName = "Hello-exchange"; + String exType = "direct"; + + //Queue + String qName = "hello-queue"; + + String routing_key = "hola"; + String payload = "Hello RabbitMQ"; + //Properties + BasicProperties properties = new AMQP.BasicProperties.Builder() + .contentType("text/plain") + .build(); + + //Construct a factory upon above parameters + ConnectionFactory factory = new ConnectionFactory (); + + factory.setHost(host); + factory.setPort(port); + factory.setUsername(username); + factory.setPassword(password); + + try(Connection conn = factory.newConnection(); + /* + * No com.rabbitmq.client.AMQP, + * No com.rabbitmq.client.impl.AMQPImpl + * But com.rabbitmq.client.Channel + */ + Channel ch = conn.createChannel();) { + + //Passive == internal??? + DeclareOk declareOk = ch.exchangeDeclare(exName, exType, true, false, false, null); + + + if(declareOk != null) { + + //Bind to queue "hello-queue" if want to let helloWorldConsumer to be workable. + Queue.DeclareOk declareQ = ch.queueDeclare(qName, true, false, true, null); + if (declareQ != null) { + ch.queueBind(qName, exName, routing_key); + //Unlike python, properties can set upon publish time + System.out.println(" - Everything set"); + ch.basicPublish(exName, routing_key, properties, payload.getBytes()); + System.out.println(" - Publish msg: " + payload); + } + }else { + System.err.println("Error, can't declare exchange !"); + } + + + + } catch (IOException | TimeoutException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + + } + + + +} diff --git a/java/src/com/SampleCode/ch2/Listing2_3_HelloWorldProducerWithConfirm.java b/java/src/com/SampleCode/ch2/Listing2_3_HelloWorldProducerWithConfirm.java new file mode 100644 index 0000000..f1a5e83 --- /dev/null +++ b/java/src/com/SampleCode/ch2/Listing2_3_HelloWorldProducerWithConfirm.java @@ -0,0 +1,105 @@ +package com.SampleCode.ch2; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.AMQP.Queue; +import com.rabbitmq.client.AMQP.Exchange.DeclareOk; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +/** + * Java version of Listing2.1 Example (Hello World producer) + * P.29 + * + * Noted that as queue is not specificed in this case yet, all msg publish from + * this metho wouldn't be received by {@link Listing2_1_HelloWorldConsumer} which + * have specify "hello-queue" as its queue to use. + * @author andykwok + * + */ +public class Listing2_3_HelloWorldProducerWithConfirm { + + + public static void main(String[] args) { + + //Var declaration + String host = "127.0.0.1"; + int port = 5672; + String username = "guest"; + String password = "guest"; + + //Exchange config + String exName = "Hello-exchange"; + String exType = "direct"; + + //Queue + String qName = "hello-queue"; + + String routing_key = "hola"; + String payload = "Hello RabbitMQ"; + //Properties + BasicProperties properties = new AMQP.BasicProperties.Builder() + .contentType("text/plain") + .build(); + + //Construct a factory upon above parameters + ConnectionFactory factory = new ConnectionFactory (); + + factory.setHost(host); + factory.setPort(port); + factory.setUsername(username); + factory.setPassword(password); + + try(Connection conn = factory.newConnection(); + /* + * No com.rabbitmq.client.AMQP, + * No com.rabbitmq.client.impl.AMQPImpl + * But com.rabbitmq.client.Channel + */ + Channel ch = conn.createChannel();) { + + //Passive == internal??? + DeclareOk declareOk = ch.exchangeDeclare(exName, exType, true, false, false, null); + + + if(declareOk != null) { + + //Bind to queue "hello-queue" if want to let helloWorldConsumer to be workable. + Queue.DeclareOk declareQ = ch.queueDeclare(qName, true, false, true, null); + if (declareQ != null) { + ch.queueBind(qName, exName, routing_key); + + + //Confirm listener + ch.addConfirmListener(new BasicConfirmListener()); + //Enable confirm support + ch.confirmSelect(); + /** + * However in automatic mode, all msg would be consider send out succcessfully + * after basicPublish (No checking). + */ + ch.basicPublish(exName, routing_key, properties, payload.getBytes()); + + System.out.println(" - Publish msg: " + payload); + } + }else { + System.err.println("Error, can't declare exchange !"); + } + + + + } catch (IOException | TimeoutException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + + } + + + +} diff --git a/java/src/com/SampleCode/ch2/package-info.java b/java/src/com/SampleCode/ch2/package-info.java new file mode 100644 index 0000000..9ab3482 --- /dev/null +++ b/java/src/com/SampleCode/ch2/package-info.java @@ -0,0 +1,8 @@ +/** + * + */ +/** + * @author andykwok + * + */ +package com.SampleCode.ch2; \ No newline at end of file diff --git a/java/src/com/SampleCode/ch3/Listing3_3_LogListeners.java b/java/src/com/SampleCode/ch3/Listing3_3_LogListeners.java new file mode 100644 index 0000000..c945efe --- /dev/null +++ b/java/src/com/SampleCode/ch3/Listing3_3_LogListeners.java @@ -0,0 +1,94 @@ +package com.SampleCode.ch3; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import com.SampleCode.ch2.Listing2_1_HelloWorldConsumer; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.AMQP.Exchange.DeclareOk; + +/** + * Java version of Listing3.3 example (Log Listeners) + * + * P.54 + * @author andykwok + * + */ +public class Listing3_3_LogListeners { + + public static void main(String[] args) { + + + String ExName = "amq.rabbitmq.log"; + String ExType = "topic"; + + + ConnectionFactory fact = new ConnectionFactory (); + fact.setHost("127.0.0.1"); + fact.setPort(5672); + fact.setUsername("guest"); + fact.setPassword("guest"); + + try(Connection conn = fact.newConnection(); + Channel ch = conn.createChannel();) { + + DeclareOk declare = ch.exchangeDeclare(ExName, ExType, true, false, true, null); + if (declare != null) { + + /* + * Declare three queues (Error || Warning || Info), + * As name is not specified, MQ server would generate a + * random name and return it along with declare ok msg. + */ + String QnameError = ch.queueDeclare().getQueue(); + String QnameWarning = ch.queueDeclare().getQueue(); + String QnameInfo = ch.queueDeclare().getQueue(); + //Bind Queue + ch.queueBind(QnameError, ExName, "Error"); + ch.queueBind(QnameWarning, ExName, "Warning"); + ch.queueBind(QnameInfo, ExName, "Info"); + //Callback + LogConsumer CeError = new LogConsumer(ch, "Error"); + LogConsumer CeWarning= new LogConsumer(ch, "Warning"); + LogConsumer CeInfo = new LogConsumer(ch, "Info"); + + //Publish + for (int i=0 ;i <100 ; i++) { + ch.basicPublish("", QnameError , null, ("Hello world[" + i + "]").getBytes()); + ch.basicPublish("", QnameWarning , null, ("Hello world[" + i + "]").getBytes()); + ch.basicPublish("", QnameInfo , null, ("Hello world[" + i + "]").getBytes()); + } + + + //Wait till all msg send out + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + try { + //consume + ch.basicConsume(QnameError, CeError ); + ch.basicConsume(QnameWarning, CeWarning ); + ch.basicConsume(QnameInfo, CeInfo ); + }catch (IOException ex) { + System.out.println("End of queue!"); + } + + + } + + + + } catch (IOException e) { + e.printStackTrace(); + } catch (TimeoutException e) { + e.printStackTrace(); + } + + } +} diff --git a/java/src/com/SampleCode/ch3/LogConsumer.java b/java/src/com/SampleCode/ch3/LogConsumer.java new file mode 100644 index 0000000..951fa77 --- /dev/null +++ b/java/src/com/SampleCode/ch3/LogConsumer.java @@ -0,0 +1,51 @@ +package com.SampleCode.ch3; + +import java.io.IOException; + +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; + +public class LogConsumer extends DefaultConsumer { + + String level; + + public LogConsumer(Channel channel) { + super(channel); + } + + /** + * Super always the first line + * Ref: https://stackoverflow.com/questions/23395513/implicit-super-constructor-person-is-undefined-must-explicitly-invoke-another + * @param channel + * @param level + */ + public LogConsumer(Channel channel, String level) { + super(channel); + this.level = level; + } + + + + @Override + public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) + throws IOException { + String message = new String(body, "UTF-8"); + System.out.println(level + ": " + message + "\n"); + + //Acknowledge the msg , it drag the performance down, use it with caution. + /*Channel ch = getChannel(); + long deliveryTag = envelope.getDeliveryTag(); + ch.basicAck(deliveryTag, false);*/ + + + super.handleDelivery(consumerTag, envelope, properties, body); + } + + + + + + +} diff --git a/java/src/com/SampleCode/ch3/package-info.java b/java/src/com/SampleCode/ch3/package-info.java new file mode 100644 index 0000000..376e830 --- /dev/null +++ b/java/src/com/SampleCode/ch3/package-info.java @@ -0,0 +1,8 @@ +/** + * + */ +/** + * @author andykwok + * + */ +package com.SampleCode.ch3; \ No newline at end of file diff --git a/java/src/com/SampleCode/ch4/alertService/Listing4_1_2Alert.java b/java/src/com/SampleCode/ch4/alertService/Listing4_1_2Alert.java new file mode 100644 index 0000000..2f88aba --- /dev/null +++ b/java/src/com/SampleCode/ch4/alertService/Listing4_1_2Alert.java @@ -0,0 +1,96 @@ +package com.SampleCode.ch4.alertService; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import com.rabbitmq.client.AMQP.Exchange.DeclareOk; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +/** + * Account need to be created in advanced as book say, CLI can be found in following: + * + * - rabbitmqctl add_user alert_user alertme + * - rabbitmqctl set_permissions alert_user ".*" ".*" ".*" + * @author andykwok + * + */ +public class Listing4_1_2Alert { + public static void main(String[] args) { + String ExName = "alerts_test"; + String ExType = "topic"; + + String QnameCritical = "critical"; + String QnameRate_limit = "rate_limit"; + + ConnectionFactory fact = new ConnectionFactory (); + fact.setHost("127.0.0.1"); + fact.setPort(5672); + fact.setUsername("alert_user"); + fact.setPassword("alertme"); + + try(Connection conn = fact.newConnection(); + Channel ch = conn.createChannel();) { + + //Declare Exchange + DeclareOk declare = ch.exchangeDeclare(ExName, ExType); + if (declare !=null) { + /** + * Listing 4.2 + */ + + //Declare queue + ch.queueDeclare(QnameCritical, false, false, false, null); + ch.queueDeclare(QnameRate_limit, false, false, false, null); + + //Bind Queue + ch.queueBind(QnameCritical, ExName, "critical.*"); + ch.queueBind(QnameRate_limit, ExName, "*.rate_limit"); + + System.out.println(" - Declare work done !"); + //produce + //ch.basicPublish(ExName, "critical.xxx", null, "XXXX".getBytes()); + + //A reasonable rate. + for (int i=0 ; i<2 ; i++) { + try { + ch.basicPublish("alerts_test", "Test.rate_limit", null, ("Hello world[" + i + "]").getBytes()); + ch.basicPublish("alerts_test", "critical.sdf", null,("Hello world[-" + i + "]").getBytes()); + }catch (Exception ex) { + System.err.println("Sender error"); + } + + } + + System.out.println("After publish!"); + + /** + * Listing 4.3 Consume msg + * + * Remember to catch exception from basucConsume! + * If you let throw all the way up, it would reach with Catch block + * for try-with-resources and trigger connection to close, which is not + * it suppose to be. + */ + + + System.out.println("Start consume!"); + + + NotifyConsumer consumer = new NotifyConsumer(ch); + //Only can acknowledge when autoack turn to false + ch.basicConsume("critical", false, consumer); + ch.basicConsume("rate_limit", false, consumer); + + + + System.out.println("Finish consume!"); + + } + + } catch (IOException | TimeoutException e) { + e.printStackTrace(); + } + } +} diff --git a/java/src/com/SampleCode/ch4/alertService/NotifyConsumer.java b/java/src/com/SampleCode/ch4/alertService/NotifyConsumer.java new file mode 100644 index 0000000..ee0bbd9 --- /dev/null +++ b/java/src/com/SampleCode/ch4/alertService/NotifyConsumer.java @@ -0,0 +1,48 @@ +package com.SampleCode.ch4.alertService; + +import java.io.IOException; + +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; + +/** + * Listing 4.4 Critical alerts processor + * @author andykwok + * + */ +public class NotifyConsumer extends DefaultConsumer{ + + String email; + + + public NotifyConsumer(Channel channel) { + super(channel); + + } + + @Override + public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) + { + + + try { + super.handleDelivery(consumerTag, envelope, properties, body); + String message = new String(body, "UTF-8"); + String recips = "ops.team@ourcompany.com"; + + System.out.println ("Send alert via e-mail! Alert Text: " + message + + "\nRecipients: " + recips + "\n"); + long deliveryTag = envelope.getDeliveryTag(); + getChannel().basicAck(deliveryTag, false); + }catch (Exception ex) { + System.err.println("Exception happen on HandleDelivery!"); + } + + } + + + + +} diff --git a/java/src/com/SampleCode/ch4/alertService/package-info.java b/java/src/com/SampleCode/ch4/alertService/package-info.java new file mode 100644 index 0000000..e05b4b1 --- /dev/null +++ b/java/src/com/SampleCode/ch4/alertService/package-info.java @@ -0,0 +1,8 @@ +/** + * + */ +/** + * @author andykwok + * + */ +package com.SampleCode.ch4.alertService; \ No newline at end of file diff --git a/java/src/com/SampleCode/ch4/pictureUploader/Listing4_10_UploadPicturesTest.java b/java/src/com/SampleCode/ch4/pictureUploader/Listing4_10_UploadPicturesTest.java new file mode 100644 index 0000000..6747428 --- /dev/null +++ b/java/src/com/SampleCode/ch4/pictureUploader/Listing4_10_UploadPicturesTest.java @@ -0,0 +1,5 @@ +package com.SampleCode.ch4.pictureUploader; + +public class Listing4_10_UploadPicturesTest { + +} diff --git a/java/src/com/SampleCode/ch4/pictureUploader/Listing4_11_ResizePictureConsumer.java b/java/src/com/SampleCode/ch4/pictureUploader/Listing4_11_ResizePictureConsumer.java new file mode 100644 index 0000000..331c994 --- /dev/null +++ b/java/src/com/SampleCode/ch4/pictureUploader/Listing4_11_ResizePictureConsumer.java @@ -0,0 +1,5 @@ +package com.SampleCode.ch4.pictureUploader; + +public class Listing4_11_ResizePictureConsumer { + +} diff --git a/java/src/com/SampleCode/ch4/pictureUploader/Listing4_12_rpc_server.java b/java/src/com/SampleCode/ch4/pictureUploader/Listing4_12_rpc_server.java new file mode 100644 index 0000000..1a09e50 --- /dev/null +++ b/java/src/com/SampleCode/ch4/pictureUploader/Listing4_12_rpc_server.java @@ -0,0 +1,5 @@ +package com.SampleCode.ch4.pictureUploader; + +public class Listing4_12_rpc_server { + +} diff --git a/java/src/com/SampleCode/ch4/pictureUploader/Listing4_13_rpc_client.java b/java/src/com/SampleCode/ch4/pictureUploader/Listing4_13_rpc_client.java new file mode 100644 index 0000000..8f274f3 --- /dev/null +++ b/java/src/com/SampleCode/ch4/pictureUploader/Listing4_13_rpc_client.java @@ -0,0 +1,5 @@ +package com.SampleCode.ch4.pictureUploader; + +public class Listing4_13_rpc_client { + +} diff --git a/java/src/com/SampleCode/ch4/pictureUploader/Listing4_7PicturesPublisher.java b/java/src/com/SampleCode/ch4/pictureUploader/Listing4_7PicturesPublisher.java new file mode 100644 index 0000000..c9c0921 --- /dev/null +++ b/java/src/com/SampleCode/ch4/pictureUploader/Listing4_7PicturesPublisher.java @@ -0,0 +1,51 @@ +package com.SampleCode.ch4.pictureUploader; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import com.rabbitmq.client.AMQP.Exchange.DeclareOk; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + + +/** + * Java version of listing 4.7 (Upload pictures publisher) + * @author andykwok + * + */ +public class Listing4_7PicturesPublisher { + public static void main(String[] args) { + + String ExName = "upload-pictures"; + String ExType = "fanout"; + String Qname = "add-points"; + String routeKey = "route_key"; + + + ConnectionFactory fact = new ConnectionFactory (); + fact.setHost("127.0.0.1"); + fact.setPort(5672); + fact.setUsername("guest"); + fact.setPassword("guest"); + + try(Connection conn = fact.newConnection(); + Channel ch = conn.createChannel();) { + + if (ch.exchangeDeclare(ExName, ExType, false, true, false, null) != null) { + if (ch.queueDeclare(Qname , false, false, false, null) != null) { + if (ch.queueBind(Qname, ExName, routeKey) != null) { + //Composite content + PicInJson payload = new PicInJson("img_iddd", "user_iddd", "/m/bababab"); + ch.basicPublish(ExName, "test_route_key", null, payload.getBytes()); + } + } + } + + } catch (IOException e) { + e.printStackTrace(); + } catch (TimeoutException e) { + e.printStackTrace(); + } + } +} diff --git a/java/src/com/SampleCode/ch4/pictureUploader/Listing4_8PicturesConsumer.java b/java/src/com/SampleCode/ch4/pictureUploader/Listing4_8PicturesConsumer.java new file mode 100644 index 0000000..3d13196 --- /dev/null +++ b/java/src/com/SampleCode/ch4/pictureUploader/Listing4_8PicturesConsumer.java @@ -0,0 +1,52 @@ +package com.SampleCode.ch4.pictureUploader; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.AMQP.Queue; +import com.rabbitmq.client.AMQP.Exchange.DeclareOk; + +/** + * P.76 Upload picture publisher + * @author andykwok + * + */ +public class Listing4_8PicturesConsumer { + public static void main(String[] args) { + + String ExName = "upload-pictures"; + String ExType = "fanout"; + String Qname = "add-points"; + String routeKey = "route_key"; + + + ConnectionFactory fact = new ConnectionFactory (); + fact.setHost("127.0.0.1"); + fact.setPort(5672); + fact.setUsername("guest"); + fact.setPassword("guest"); + + try(Connection conn = fact.newConnection(); + Channel ch = conn.createChannel();) { + + PicConsumer consumer = new PicConsumer(ch); + + if (ch.exchangeDeclare(ExName, ExType, false, true, false, null) != null) { + if (ch.queueDeclare(Qname , false, false, false, null) != null) { + if (ch.queueBind(Qname, ExName, routeKey) != null) { + System.out.println("Ready to consume!"); + ch.basicConsume(Qname, consumer); + ch.basicConsume(Qname, consumer); + } + } + } + } catch (IOException e) { + e.printStackTrace(); + } catch (TimeoutException e) { + e.printStackTrace(); + } + } +} diff --git a/java/src/com/SampleCode/ch4/pictureUploader/PicConsumer.java b/java/src/com/SampleCode/ch4/pictureUploader/PicConsumer.java new file mode 100644 index 0000000..3e806cd --- /dev/null +++ b/java/src/com/SampleCode/ch4/pictureUploader/PicConsumer.java @@ -0,0 +1,29 @@ +package com.SampleCode.ch4.pictureUploader; + +import java.io.IOException; + +import org.json.JSONObject; + +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; + +public class PicConsumer extends DefaultConsumer{ + + public PicConsumer(Channel channel) { + super(channel); + } + + @Override + public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) + throws IOException { + System.out.println("Hello handle delivery"); + String message = new String(body, "UTF-8"); + JSONObject json = new JSONObject (message); + System.out.println("Received msg: " + json.toString() ); + super.handleDelivery(consumerTag, envelope, properties, body); + } + + +} diff --git a/java/src/com/SampleCode/ch4/pictureUploader/PicInJson.java b/java/src/com/SampleCode/ch4/pictureUploader/PicInJson.java new file mode 100644 index 0000000..88b4839 --- /dev/null +++ b/java/src/com/SampleCode/ch4/pictureUploader/PicInJson.java @@ -0,0 +1,24 @@ +package com.SampleCode.ch4.pictureUploader; + +import org.json.JSONObject; + +public class PicInJson extends JSONObject { + + + public PicInJson(String image_id, String user_id, String image_path) { + super(); + this.put("image_id", image_id); + this.put("user_id", user_id); + this.put("image_path", image_path); + } + + + public byte[] getBytes() { + return toString( ).getBytes(); + } + + + + + +} diff --git a/java/src/com/SampleCode/ch4/pictureUploader/package-info.java b/java/src/com/SampleCode/ch4/pictureUploader/package-info.java new file mode 100644 index 0000000..ae69238 --- /dev/null +++ b/java/src/com/SampleCode/ch4/pictureUploader/package-info.java @@ -0,0 +1,9 @@ +/** + * Some pieces like 4.9 / 4.10 may be missing as it's not yet a + * implementation, so only MQ part code is done. + */ +/** + * @author andykwok + * + */ +package com.SampleCode.ch4.pictureUploader; \ No newline at end of file From 71c8d1c80c3cf381fc18299b0ff9024e4c576de1 Mon Sep 17 00:00:00 2001 From: Andy Kwok Date: Tue, 12 Dec 2017 17:54:17 +0800 Subject: [PATCH 2/3] Update for Ch6/7 - Add Util for common code Please enter the commit message for your changes. Lines starting --- .../ch2/Listing2_1_HelloWorldConsumer.java | 34 ++++++---- .../ch2/Listing2_1_HelloWorldProducer.java | 22 +++--- ...ting2_3_HelloWorldProducerWithConfirm.java | 50 +++++--------- .../ch3/Listing3_3_LogListeners.java | 16 ++--- .../SampleCode/ch6/Listing6_2StdConsumer.java | 58 ++++++++++++++++ .../ch6/Listing6_4ClusterAwareConsumer.java | 67 +++++++++++++++++++ .../ch6/Listing6_5ClusterAwareProducer.java | 56 ++++++++++++++++ java/src/com/SampleCode/ch6/package-info.java | 8 +++ .../ch7/Listing7_3ShovelConsumer.java | 62 +++++++++++++++++ .../ch7/Listing7_4ShovelProducer.java | 40 +++++++++++ java/src/com/SampleCode/ch7/Order.java | 13 ++++ 11 files changed, 355 insertions(+), 71 deletions(-) create mode 100644 java/src/com/SampleCode/ch6/Listing6_2StdConsumer.java create mode 100644 java/src/com/SampleCode/ch6/Listing6_4ClusterAwareConsumer.java create mode 100644 java/src/com/SampleCode/ch6/Listing6_5ClusterAwareProducer.java create mode 100644 java/src/com/SampleCode/ch6/package-info.java create mode 100644 java/src/com/SampleCode/ch7/Listing7_3ShovelConsumer.java create mode 100644 java/src/com/SampleCode/ch7/Listing7_4ShovelProducer.java create mode 100644 java/src/com/SampleCode/ch7/Order.java diff --git a/java/src/com/SampleCode/ch2/Listing2_1_HelloWorldConsumer.java b/java/src/com/SampleCode/ch2/Listing2_1_HelloWorldConsumer.java index 2f6666c..1515bdc 100644 --- a/java/src/com/SampleCode/ch2/Listing2_1_HelloWorldConsumer.java +++ b/java/src/com/SampleCode/ch2/Listing2_1_HelloWorldConsumer.java @@ -1,8 +1,11 @@ package com.SampleCode.ch2; import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeoutException; +import com.SampleCode.util.LocalConnFactory; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; @@ -21,12 +24,7 @@ public class Listing2_1_HelloWorldConsumer { public static void main(String[] args) { - //Var declaration - String host = "127.0.0.1"; - int port = 5672; - String username = "guest"; - String password = "guest"; - + //Exchange config String exName = "Hello-exchange"; String exType = "direct"; @@ -43,13 +41,10 @@ public static void main(String[] args) { //Construct a factory upon above parameters - ConnectionFactory factory = new ConnectionFactory (); - - factory.setHost(host); - factory.setPort(port); - factory.setUsername(username); - factory.setPassword(password); + LocalConnFactory factory = new LocalConnFactory(); + //ExecutorService es = Executors.newSingleThreadExecutor(); + //try(Connection conn = factory.newConnection(es); try(Connection conn = factory.newConnection(); Channel ch = conn.createChannel();) { @@ -65,7 +60,20 @@ public static void main(String[] args) { ch.queueBind(qName, exName, routing_key); System.out.println(" - All thing set (Start to consume!)"); - ch.basicConsume(qName, new BasicConsumer(ch)); + + BasicConsumer consumer = new BasicConsumer(ch); + for (int i=0 ; i<100 ; i++) { + System.out.println("Consumer loop[" + (i+1) + "]"); + ch.basicConsume(qName, consumer); + + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + System.err.println("Thread sleep error"); + } + + } + }else { System.err.println("Error, queue[" + qName + "] can't not be create!"); diff --git a/java/src/com/SampleCode/ch2/Listing2_1_HelloWorldProducer.java b/java/src/com/SampleCode/ch2/Listing2_1_HelloWorldProducer.java index 5ed7dbb..46f9b4e 100644 --- a/java/src/com/SampleCode/ch2/Listing2_1_HelloWorldProducer.java +++ b/java/src/com/SampleCode/ch2/Listing2_1_HelloWorldProducer.java @@ -3,6 +3,7 @@ import java.io.IOException; import java.util.concurrent.TimeoutException; +import com.SampleCode.util.LocalConnFactory; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.AMQP.Queue; @@ -26,12 +27,7 @@ public class Listing2_1_HelloWorldProducer { public static void main(String[] args) { - //Var declaration - String host = "127.0.0.1"; - int port = 5672; - String username = "guest"; - String password = "guest"; - + //Exchange config String exName = "Hello-exchange"; String exType = "direct"; @@ -47,13 +43,8 @@ public static void main(String[] args) { .build(); //Construct a factory upon above parameters - ConnectionFactory factory = new ConnectionFactory (); - - factory.setHost(host); - factory.setPort(port); - factory.setUsername(username); - factory.setPassword(password); - + LocalConnFactory factory = new LocalConnFactory(); + try(Connection conn = factory.newConnection(); /* * No com.rabbitmq.client.AMQP, @@ -74,7 +65,10 @@ public static void main(String[] args) { ch.queueBind(qName, exName, routing_key); //Unlike python, properties can set upon publish time System.out.println(" - Everything set"); - ch.basicPublish(exName, routing_key, properties, payload.getBytes()); + for (int i=0 ; i<50 ; i++) { + ch.basicPublish(exName, routing_key, properties, (payload + "[" + i + "]") .getBytes()); + } + System.out.println(" - Publish msg: " + payload); } }else { diff --git a/java/src/com/SampleCode/ch2/Listing2_3_HelloWorldProducerWithConfirm.java b/java/src/com/SampleCode/ch2/Listing2_3_HelloWorldProducerWithConfirm.java index f1a5e83..3b62e70 100644 --- a/java/src/com/SampleCode/ch2/Listing2_3_HelloWorldProducerWithConfirm.java +++ b/java/src/com/SampleCode/ch2/Listing2_3_HelloWorldProducerWithConfirm.java @@ -3,6 +3,7 @@ import java.io.IOException; import java.util.concurrent.TimeoutException; +import com.SampleCode.util.LocalConnFactory; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.AMQP.Queue; @@ -26,12 +27,6 @@ public class Listing2_3_HelloWorldProducerWithConfirm { public static void main(String[] args) { - //Var declaration - String host = "127.0.0.1"; - int port = 5672; - String username = "guest"; - String password = "guest"; - //Exchange config String exName = "Hello-exchange"; String exType = "direct"; @@ -47,12 +42,7 @@ public static void main(String[] args) { .build(); //Construct a factory upon above parameters - ConnectionFactory factory = new ConnectionFactory (); - - factory.setHost(host); - factory.setPort(port); - factory.setUsername(username); - factory.setPassword(password); + LocalConnFactory factory = new LocalConnFactory(); try(Connection conn = factory.newConnection(); /* @@ -63,28 +53,22 @@ public static void main(String[] args) { Channel ch = conn.createChannel();) { //Passive == internal??? - DeclareOk declareOk = ch.exchangeDeclare(exName, exType, true, false, false, null); - - - if(declareOk != null) { - + if(ch.exchangeDeclare(exName, exType, true, false, false, null) != null) { //Bind to queue "hello-queue" if want to let helloWorldConsumer to be workable. - Queue.DeclareOk declareQ = ch.queueDeclare(qName, true, false, true, null); - if (declareQ != null) { - ch.queueBind(qName, exName, routing_key); - - - //Confirm listener - ch.addConfirmListener(new BasicConfirmListener()); - //Enable confirm support - ch.confirmSelect(); - /** - * However in automatic mode, all msg would be consider send out succcessfully - * after basicPublish (No checking). - */ - ch.basicPublish(exName, routing_key, properties, payload.getBytes()); - - System.out.println(" - Publish msg: " + payload); + if (ch.queueDeclare(qName, true, false, true, null) != null) { + if (ch.queueBind(qName, exName, routing_key) != null ) { + //Confirm listener + ch.addConfirmListener(new BasicConfirmListener()); + //Enable confirm support + ch.confirmSelect(); + /** + * However in automatic mode, all msg would be consider send out succcessfully + * after basicPublish (No checking). + */ + ch.basicPublish(exName, routing_key, properties, payload.getBytes()); + + System.out.println(" - Publish msg: " + payload); + }else {System.err.println("Queue bind erroe!"); } } }else { System.err.println("Error, can't declare exchange !"); diff --git a/java/src/com/SampleCode/ch3/Listing3_3_LogListeners.java b/java/src/com/SampleCode/ch3/Listing3_3_LogListeners.java index c945efe..614f97e 100644 --- a/java/src/com/SampleCode/ch3/Listing3_3_LogListeners.java +++ b/java/src/com/SampleCode/ch3/Listing3_3_LogListeners.java @@ -4,6 +4,7 @@ import java.util.concurrent.TimeoutException; import com.SampleCode.ch2.Listing2_1_HelloWorldConsumer; +import com.SampleCode.util.LocalConnFactory; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; @@ -25,17 +26,12 @@ public static void main(String[] args) { String ExType = "topic"; - ConnectionFactory fact = new ConnectionFactory (); - fact.setHost("127.0.0.1"); - fact.setPort(5672); - fact.setUsername("guest"); - fact.setPassword("guest"); + ConnectionFactory fact = new LocalConnFactory(); + try(Connection conn = fact.newConnection(); Channel ch = conn.createChannel();) { - - DeclareOk declare = ch.exchangeDeclare(ExName, ExType, true, false, true, null); - if (declare != null) { + if (ch.exchangeDeclare(ExName, ExType, true, false, true, null) != null) { /* * Declare three queues (Error || Warning || Info), @@ -81,9 +77,7 @@ public static void main(String[] args) { } - - - + } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { diff --git a/java/src/com/SampleCode/ch6/Listing6_2StdConsumer.java b/java/src/com/SampleCode/ch6/Listing6_2StdConsumer.java new file mode 100644 index 0000000..5b0685f --- /dev/null +++ b/java/src/com/SampleCode/ch6/Listing6_2StdConsumer.java @@ -0,0 +1,58 @@ +package com.SampleCode.ch6; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import com.SampleCode.util.LocalConnFactory; +import com.SampleCode.util.NormalConsumer; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.Consumer; + +/** + * Normal consumer which attempt to connect MQ Server only once. + * And terminate either connection established successfully and + * consume for once or Exception happen which lead to connection + * shutdown. (Won't re-attempt in either case). + * + * @author andykwok + * + */ +public class Listing6_2StdConsumer { + + public static void main(String[] args) { + + + //Exchange config + String exName = "cluster_test"; + String exType = "direct"; + + + //Queue + String qName = "cluster_test"; + String routingKey = "cluster_test"; + + + LocalConnFactory factory = new LocalConnFactory(); + + try (Connection conn = factory.newConnection(); + Channel ch = conn.createChannel();) { + + if (ch.exchangeDeclare(exName, exType, false, true, null) != null) { + if (ch.queueDeclare(qName, false, false, true, null) != null) { + if (ch.queueBind(qName, exName, routingKey)!=null) { + + Consumer consumer = new NormalConsumer(ch); + ch.basicConsume(qName, consumer); + + }else {System.out.println("Queue binding error!"); } + }else {System.out.println("Queue declare error!");} + }else {System.out.println("Exchange declare error!");} + + + } catch (IOException | TimeoutException e) { + e.printStackTrace(); + } + + } +} diff --git a/java/src/com/SampleCode/ch6/Listing6_4ClusterAwareConsumer.java b/java/src/com/SampleCode/ch6/Listing6_4ClusterAwareConsumer.java new file mode 100644 index 0000000..6961489 --- /dev/null +++ b/java/src/com/SampleCode/ch6/Listing6_4ClusterAwareConsumer.java @@ -0,0 +1,67 @@ +package com.SampleCode.ch6; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import com.SampleCode.util.LocalConnFactory; +import com.SampleCode.util.NormalConsumer; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.Consumer; + +/** + * Similar to 6.2 but this time having a outer loop which let program automatically + * reconnect once single consumer is done || connection failure. -> it match the case + * that once the connecting node is down, then the program would smart enough to + * auto connect. + * + * + * + * @author andykwok + * + */ +public class Listing6_4ClusterAwareConsumer { + + public static void main(String[] args) { + + + //Exchange config + String exName = "cluster_test"; + String exType = "direct"; + + //Queue + String qName = "cluster_test"; + String routingKey = "cluster_test"; + + + LocalConnFactory factory = new LocalConnFactory(); + while (true) { + //100 Times for demonstration purpose + //for (int i=0 ; i<100 ; i++) { + try (Connection conn = factory.newConnection(); + Channel ch = conn.createChannel();) { + if (ch.exchangeDeclare(exName, exType, false, true, null) != null) { + if (ch.queueDeclare(qName, false, false, true, null) != null) { + if (ch.queueBind(qName, exName, routingKey)!=null) { + + System.out.println("Consumming....."); + Consumer consumer = new NormalConsumer(ch); + for (int i = 0; i < 100; i++) { + try { + Thread.sleep(1000); + ch.basicConsume(qName, consumer); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + } + }else {System.out.println("Queue binding error!"); } + }else {System.out.println("Queue declare error!");} + }else {System.out.println("Exchange declare error!");} + } catch (IOException | TimeoutException e) { + System.err.println("Exception occur, attempt to reconnect!"); + } + + } + } +} diff --git a/java/src/com/SampleCode/ch6/Listing6_5ClusterAwareProducer.java b/java/src/com/SampleCode/ch6/Listing6_5ClusterAwareProducer.java new file mode 100644 index 0000000..ae4f4af --- /dev/null +++ b/java/src/com/SampleCode/ch6/Listing6_5ClusterAwareProducer.java @@ -0,0 +1,56 @@ +package com.SampleCode.ch6; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.util.concurrent.TimeoutException; + +import org.json.JSONObject; + +import com.SampleCode.util.LocalConnFactory; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; + + + +/** + * Producer for Listing 6.2/4 which send msg to exchange "Cluster_test" + * for testing purpose. + * @author andykwok + * + */ +public class Listing6_5ClusterAwareProducer { + + public static void main(String[] args) { + + //Exchange config + String exName = "cluster_test"; + String exType = "direct"; + + //Queue + String qName = "cluster_test"; + String routingKey = "cluster_test"; + + + + + LocalConnFactory factory = new LocalConnFactory(); + + try (Connection conn = factory.newConnection(); + Channel ch = conn.createChannel();) { + if(ch.exchangeDeclare(exName, exType, false, true, null) != null ) { + if(ch.queueDeclare(qName, false, false, true, null) != null) { + if (ch.queueBind(qName, exName, routingKey) != null) { + + //Composite a JSon string + JSONObject msg = new JSONObject(); + msg.put("Content", "Cluster Test!"); + msg.put("time", LocalDateTime.now().toString()); + ch.basicPublish(exName, routingKey, null, msg.toString().getBytes()); + }else {System.out.println("Queue binding error!"); } + }else {System.out.println("Queue declare error!");} + }else {System.out.println("Exchange declare error!");} + } catch (IOException | TimeoutException e) { + e.printStackTrace(); + } + } +} diff --git a/java/src/com/SampleCode/ch6/package-info.java b/java/src/com/SampleCode/ch6/package-info.java new file mode 100644 index 0000000..233a105 --- /dev/null +++ b/java/src/com/SampleCode/ch6/package-info.java @@ -0,0 +1,8 @@ +/** + * + */ +/** + * @author andykwok + * + */ +package com.SampleCode.ch6; \ No newline at end of file diff --git a/java/src/com/SampleCode/ch7/Listing7_3ShovelConsumer.java b/java/src/com/SampleCode/ch7/Listing7_3ShovelConsumer.java new file mode 100644 index 0000000..1373ff1 --- /dev/null +++ b/java/src/com/SampleCode/ch7/Listing7_3ShovelConsumer.java @@ -0,0 +1,62 @@ +package com.SampleCode.ch7; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import com.SampleCode.util.LocalConnFactory; +import com.SampleCode.util.NormalConsumer; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.Consumer; + +/** + * + * + * @author andykwok + * + */ +public class Listing7_3ShovelConsumer { + + public static void main(String[] args) { + + + + //Exchange config + String exName = "incoming_orders"; + String exType = "direct"; + + //Queue + String qName = "incoming_orders"; + String routingKey = "warehouse"; + + + LocalConnFactory factory = new LocalConnFactory(); + + + try (Connection conn = factory.newConnection(); + Channel ch = conn.createChannel();) { + if (ch.exchangeDeclare(exName, exType, false, true, null) != null) { + if (ch.queueDeclare(qName, false, false, true, null) != null) { + if (ch.queueBind(qName, exName, routingKey)!=null) { + //listen for 100s refresh for every second + for (int i = 0; i < 100; i++) { + + try { + Thread.sleep(1000); + System.out.println("Consumming....."); + Consumer consumer = new NormalConsumer(ch); + ch.basicConsume(qName, consumer); + + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }else {System.out.println("Queue binding error!"); } + }else {System.out.println("Queue declare error!");} + }else {System.out.println("Exchange declare error!");} + } catch (IOException | TimeoutException e) { + System.err.println("Exception occur, attempt to reconnect!"); + } + + } +} diff --git a/java/src/com/SampleCode/ch7/Listing7_4ShovelProducer.java b/java/src/com/SampleCode/ch7/Listing7_4ShovelProducer.java new file mode 100644 index 0000000..03fa947 --- /dev/null +++ b/java/src/com/SampleCode/ch7/Listing7_4ShovelProducer.java @@ -0,0 +1,40 @@ +package com.SampleCode.ch7; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import com.SampleCode.util.LocalConnFactory; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; + +public class Listing7_4ShovelProducer { + public static void main(String[] args) { + + //Exchange config + String exName = "incoming_orders"; + String exType = "direct"; + + //Queue + String qName = "incoming_orders"; + String routingKey = "warehouse"; + + + + + LocalConnFactory factory = new LocalConnFactory(); + + try (Connection conn = factory.newConnection(); + Channel ch = conn.createChannel();) { + if(ch.exchangeDeclare(exName, exType, false, true, null) != null ) { + if(ch.queueDeclare(qName, false, false, true, null) != null) { + if (ch.queueBind(qName, exName, routingKey) != null) { + Order order = new Order(1, "AVOCADO_TYPE"); + ch.basicPublish(exName, routingKey, null, order.toString().getBytes()); + }else {System.out.println("Queue binding error!"); } + }else {System.out.println("Queue declare error!");} + }else {System.out.println("Exchange declare error!");} + } catch (IOException | TimeoutException e) { + e.printStackTrace(); + } + } +} diff --git a/java/src/com/SampleCode/ch7/Order.java b/java/src/com/SampleCode/ch7/Order.java new file mode 100644 index 0000000..a7421ce --- /dev/null +++ b/java/src/com/SampleCode/ch7/Order.java @@ -0,0 +1,13 @@ +package com.SampleCode.ch7; + +import org.json.JSONObject; + +public class Order extends JSONObject { + + public Order (int ordNum, String type) { + super(); + put("ordernum", ordNum); + put("type", type); + } + +} From 8c9b4959fcda2f254e056e4a7ecaef9e478346a1 Mon Sep 17 00:00:00 2001 From: Andy Kwok Date: Wed, 13 Dec 2017 12:01:16 +0800 Subject: [PATCH 3/3] Adding comment block for ch2-6 --- .../src/com/SampleCode/ch2/BasicConsumer.java | 18 +++-- .../ch2/Listing2_1_HelloWorldConsumer.java | 67 ++++++------------- .../ch2/Listing2_1_HelloWorldProducer.java | 48 ++++++------- ...ting2_3_HelloWorldProducerWithConfirm.java | 16 +++-- java/src/com/SampleCode/ch2/package-info.java | 6 +- .../ch3/Listing3_3_LogListeners.java | 10 ++- java/src/com/SampleCode/ch3/LogConsumer.java | 7 ++ .../ch4/alertService/Listing4_1_2Alert.java | 9 ++- .../ch4/alertService/NotifyConsumer.java | 4 -- 9 files changed, 86 insertions(+), 99 deletions(-) diff --git a/java/src/com/SampleCode/ch2/BasicConsumer.java b/java/src/com/SampleCode/ch2/BasicConsumer.java index 3a2ec04..6de8b16 100644 --- a/java/src/com/SampleCode/ch2/BasicConsumer.java +++ b/java/src/com/SampleCode/ch2/BasicConsumer.java @@ -1,35 +1,39 @@ package com.SampleCode.ch2; import java.io.IOException; +import java.util.Map; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; + +/** + * Consumer used within ch2 example which simply print out the msg with prefix && Meta. + * @author andykwok + * + */ public class BasicConsumer extends DefaultConsumer{ public BasicConsumer(Channel channel) { super(channel); - // TODO Auto-generated constructor stub } public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { - /*String contentType = properties.getContentType(); + String contentType = properties.getContentType(); Map header = properties.getHeaders(); long deliveryTag = envelope.getDeliveryTag(); - System.out.println("ContentType: " + contentType); - System.out.println("Header: " + header); - System.out.println("DeliveryTag: " + deliveryTag);*/ - - String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); + System.out.println(" - ContentType: " + contentType); + System.out.println(" - Header: " + header); + System.out.println(" - DeliveryTag: " + deliveryTag); } } diff --git a/java/src/com/SampleCode/ch2/Listing2_1_HelloWorldConsumer.java b/java/src/com/SampleCode/ch2/Listing2_1_HelloWorldConsumer.java index 1515bdc..c7efad0 100644 --- a/java/src/com/SampleCode/ch2/Listing2_1_HelloWorldConsumer.java +++ b/java/src/com/SampleCode/ch2/Listing2_1_HelloWorldConsumer.java @@ -24,7 +24,7 @@ public class Listing2_1_HelloWorldConsumer { public static void main(String[] args) { - + //Exchange config String exName = "Hello-exchange"; String exType = "direct"; @@ -33,60 +33,33 @@ public static void main(String[] args) { //Queue String qName = "hello-queue"; String routing_key = "hola"; - - //Properties - BasicProperties properties = new AMQP.BasicProperties.Builder() - .contentType("text/plain") - .build(); //Construct a factory upon above parameters LocalConnFactory factory = new LocalConnFactory(); - - //ExecutorService es = Executors.newSingleThreadExecutor(); - //try(Connection conn = factory.newConnection(es); try(Connection conn = factory.newConnection(); - Channel ch = conn.createChannel();) { + Channel ch = conn.createChannel();) { //Boolean: durable || autoDelete || internal (Passive)? - DeclareOk declareEx = ch.exchangeDeclare(exName, exType, true, false, false, null); - - if(declareEx != null) { - - //Declare exchange - Queue.DeclareOk declareQ = ch.queueDeclare(qName, true, false, true, null); - if (declareQ != null) { - - ch.queueBind(qName, exName, routing_key); - - System.out.println(" - All thing set (Start to consume!)"); - - BasicConsumer consumer = new BasicConsumer(ch); - for (int i=0 ; i<100 ; i++) { - System.out.println("Consumer loop[" + (i+1) + "]"); - ch.basicConsume(qName, consumer); - - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - System.err.println("Thread sleep error"); - } + if(ch.exchangeDeclare(exName, exType, true, false, false, null) != null) { + if (ch.queueDeclare(qName, true, false, true, null) != null) { + if (ch.queueBind(qName, exName, routing_key) != null) { - } - - - }else { - System.err.println("Error, queue[" + qName + "] can't not be create!"); - } - }else { - System.err.println("Error, can't declare exchange !"); - } - - - + System.out.println(" - Fetch msg once per second, repeat for 100 times"); + //Reuse consumer instance + BasicConsumer consumer = new BasicConsumer(ch); + for (int i=0 ; i<100 ; i++) { + System.out.println("/////////////Consumer loop[" + (i+1) + "]"); + //ch.basicConsume(qName, consumer); + ch.basicConsume(qName, true, consumer); + try { + Thread.sleep(1000); + } catch (InterruptedException e) {System.err.println("Thread sleep error"); } + } + }else {System.err.println("Bind Queue Error!");} + }else {System.err.println("Error, queue[" + qName + "] can't not be create!");} + }else {System.err.println("Error, can't declare exchange !");} } catch (IOException | TimeoutException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } + System.err.println("Exception happen, triggered autocloseable to shut down connection, bye~");} } } diff --git a/java/src/com/SampleCode/ch2/Listing2_1_HelloWorldProducer.java b/java/src/com/SampleCode/ch2/Listing2_1_HelloWorldProducer.java index 46f9b4e..2b0d32a 100644 --- a/java/src/com/SampleCode/ch2/Listing2_1_HelloWorldProducer.java +++ b/java/src/com/SampleCode/ch2/Listing2_1_HelloWorldProducer.java @@ -52,37 +52,27 @@ public static void main(String[] args) { * But com.rabbitmq.client.Channel */ Channel ch = conn.createChannel();) { - + //Passive == internal??? - DeclareOk declareOk = ch.exchangeDeclare(exName, exType, true, false, false, null); - - - if(declareOk != null) { - + if(ch.exchangeDeclare(exName, exType, true, false, false, null) != null) { //Bind to queue "hello-queue" if want to let helloWorldConsumer to be workable. - Queue.DeclareOk declareQ = ch.queueDeclare(qName, true, false, true, null); - if (declareQ != null) { - ch.queueBind(qName, exName, routing_key); - //Unlike python, properties can set upon publish time - System.out.println(" - Everything set"); - for (int i=0 ; i<50 ; i++) { - ch.basicPublish(exName, routing_key, properties, (payload + "[" + i + "]") .getBytes()); - } - - System.out.println(" - Publish msg: " + payload); - } - }else { - System.err.println("Error, can't declare exchange !"); - } - - - - } catch (IOException | TimeoutException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - - + if (ch.queueDeclare(qName, true, false, true, null) != null) { + if (ch.queueBind(qName, exName, routing_key) != null ) { + + System.out.println(" - Everything set, start to consume once per second for 50 times: "); + for (int i=0 ; i<50 ; i++) { + try { + Thread.sleep(1000); + ch.basicPublish(exName, routing_key, properties, (payload + "[" + i + "]") .getBytes()); + System.out.println(" - Publish msg: " + payload + "[" + i + "]"); + } catch (InterruptedException e) {e.printStackTrace();} + } + }else {System.err.println("Queue declare Error!"); } + }else {System.err.println("Declare Queue error!");} + }else {System.err.println("Error, can't declare exchange !");} + } catch (IOException | TimeoutException e) {System.err.println("Exception happen, triggered autocloseable bye~");} + + } diff --git a/java/src/com/SampleCode/ch2/Listing2_3_HelloWorldProducerWithConfirm.java b/java/src/com/SampleCode/ch2/Listing2_3_HelloWorldProducerWithConfirm.java index 3b62e70..7cf2832 100644 --- a/java/src/com/SampleCode/ch2/Listing2_3_HelloWorldProducerWithConfirm.java +++ b/java/src/com/SampleCode/ch2/Listing2_3_HelloWorldProducerWithConfirm.java @@ -66,13 +66,19 @@ public static void main(String[] args) { * after basicPublish (No checking). */ ch.basicPublish(exName, routing_key, properties, payload.getBytes()); - System.out.println(" - Publish msg: " + payload); + //ConfirmListener take time to react which is a separate thread, so + //put main thread on sleep for 5s in order to wait. + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + }else {System.err.println("Queue bind erroe!"); } - } - }else { - System.err.println("Error, can't declare exchange !"); - } + }else {System.err.println("Queue declare error!");} + }else {System.err.println("Error, can't declare exchange !");} diff --git a/java/src/com/SampleCode/ch2/package-info.java b/java/src/com/SampleCode/ch2/package-info.java index 9ab3482..a7bf924 100644 --- a/java/src/com/SampleCode/ch2/package-info.java +++ b/java/src/com/SampleCode/ch2/package-info.java @@ -1,5 +1,9 @@ /** - * + * A basic workable pair of producer and consumer to demonstrate basic + * Rabbit usage. Listign 2_3 is simalr as 2_1, the only different is + * 2_3 would wait 5s after msg send, if the msg deliver to consumer + * with in the period of time, acknowledge stage would send back to producer + * and ConfirmListener would print out acknowledge msg. */ /** * @author andykwok diff --git a/java/src/com/SampleCode/ch3/Listing3_3_LogListeners.java b/java/src/com/SampleCode/ch3/Listing3_3_LogListeners.java index 614f97e..076403c 100644 --- a/java/src/com/SampleCode/ch3/Listing3_3_LogListeners.java +++ b/java/src/com/SampleCode/ch3/Listing3_3_LogListeners.java @@ -13,6 +13,11 @@ /** * Java version of Listing3.3 example (Log Listeners) * + * As you may notice queue name is not specified at all, in this case server-side would + * automatically generate a random name zip it along with the DeclareOK object when + * ch.queueDecalre( ) is called. As the queue name is generated randomly which intend + * to be disposable and one time use -only. Which are exclusive / auto-delete / non durable + * * P.54 * @author andykwok * @@ -31,6 +36,7 @@ public static void main(String[] args) { try(Connection conn = fact.newConnection(); Channel ch = conn.createChannel();) { + if (ch.exchangeDeclare(ExName, ExType, true, false, true, null) != null) { /* @@ -57,12 +63,10 @@ public static void main(String[] args) { ch.basicPublish("", QnameInfo , null, ("Hello world[" + i + "]").getBytes()); } - //Wait till all msg send out try { - Thread.sleep(5000); + Thread.sleep(1000); } catch (InterruptedException e) { - // TODO Auto-generated catch block e.printStackTrace(); } diff --git a/java/src/com/SampleCode/ch3/LogConsumer.java b/java/src/com/SampleCode/ch3/LogConsumer.java index 951fa77..af24895 100644 --- a/java/src/com/SampleCode/ch3/LogConsumer.java +++ b/java/src/com/SampleCode/ch3/LogConsumer.java @@ -7,6 +7,13 @@ import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; + + +/** + * Custom consumer which print out the msg content while received. + * @author andykwok + * + */ public class LogConsumer extends DefaultConsumer { String level; diff --git a/java/src/com/SampleCode/ch4/alertService/Listing4_1_2Alert.java b/java/src/com/SampleCode/ch4/alertService/Listing4_1_2Alert.java index 2f88aba..b3ec0ad 100644 --- a/java/src/com/SampleCode/ch4/alertService/Listing4_1_2Alert.java +++ b/java/src/com/SampleCode/ch4/alertService/Listing4_1_2Alert.java @@ -13,6 +13,11 @@ * * - rabbitmqctl add_user alert_user alertme * - rabbitmqctl set_permissions alert_user ".*" ".*" ".*" + * + * + * This piece of code used to demonstrate how routing key can be used to filter || route the + * msg, as you see critical.* || *.rate_limit are set for routing, all msg which dun meet + * the routing key woudl be simply ignored. * @author andykwok * */ @@ -34,8 +39,7 @@ public static void main(String[] args) { Channel ch = conn.createChannel();) { //Declare Exchange - DeclareOk declare = ch.exchangeDeclare(ExName, ExType); - if (declare !=null) { + if ( ch.exchangeDeclare(ExName, ExType) !=null) { /** * Listing 4.2 */ @@ -74,7 +78,6 @@ public static void main(String[] args) { * it suppose to be. */ - System.out.println("Start consume!"); diff --git a/java/src/com/SampleCode/ch4/alertService/NotifyConsumer.java b/java/src/com/SampleCode/ch4/alertService/NotifyConsumer.java index ee0bbd9..da7f1ae 100644 --- a/java/src/com/SampleCode/ch4/alertService/NotifyConsumer.java +++ b/java/src/com/SampleCode/ch4/alertService/NotifyConsumer.java @@ -1,7 +1,5 @@ package com.SampleCode.ch4.alertService; -import java.io.IOException; - import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; @@ -34,8 +32,6 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie System.out.println ("Send alert via e-mail! Alert Text: " + message + "\nRecipients: " + recips + "\n"); - long deliveryTag = envelope.getDeliveryTag(); - getChannel().basicAck(deliveryTag, false); }catch (Exception ex) { System.err.println("Exception happen on HandleDelivery!"); }