diff --git a/java/pom.xml b/java/pom.xml new file mode 100644 index 0000000..311f2b1 --- /dev/null +++ b/java/pom.xml @@ -0,0 +1,50 @@ + + 4.0.0 + RabbitMQInActionExampleCode + RabbitMQInActionExampleCode + 0.0.1-SNAPSHOT + + appendix-a + + + maven-compiler-plugin + 3.6.1 + + + + + + + + + + + + com.rabbitmq + amqp-client + 5.0.0 + + + + + + + org.apache.commons + commons-lang3 + 3.7 + + + + + + + org.json + json + 20171018 + + + + + + \ No newline at end of file diff --git a/java/src/com/SampleCode/ch2/BasicConfirmListener.java b/java/src/com/SampleCode/ch2/BasicConfirmListener.java new file mode 100644 index 0000000..838c3e9 --- /dev/null +++ b/java/src/com/SampleCode/ch2/BasicConfirmListener.java @@ -0,0 +1,21 @@ +package com.SampleCode.ch2; + +import java.io.IOException; + +import com.rabbitmq.client.ConfirmListener; + +public class BasicConfirmListener implements ConfirmListener { + + @Override + public void handleAck(long deliveryTag, boolean multiple) throws IOException { + System.out.println(" - (BasicConfirmListener); handle ack msg for: " + deliveryTag); + + } + + @Override + public void handleNack(long deliveryTag, boolean multiple) throws IOException { + System.out.println(" - (BasicConfirmListener); handle NNNack msg for: " + deliveryTag); + + } + +} diff --git a/java/src/com/SampleCode/ch2/BasicConsumer.java b/java/src/com/SampleCode/ch2/BasicConsumer.java new file mode 100644 index 0000000..6de8b16 --- /dev/null +++ b/java/src/com/SampleCode/ch2/BasicConsumer.java @@ -0,0 +1,39 @@ +package com.SampleCode.ch2; + +import java.io.IOException; +import java.util.Map; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; + + +/** + * Consumer used within ch2 example which simply print out the msg with prefix && Meta. + * @author andykwok + * + */ +public class BasicConsumer extends DefaultConsumer{ + + public BasicConsumer(Channel channel) { + super(channel); + } + + + public void handleDelivery(String consumerTag, Envelope envelope, + AMQP.BasicProperties properties, byte[] body) + throws IOException { + String contentType = properties.getContentType(); + Map header = properties.getHeaders(); + long deliveryTag = envelope.getDeliveryTag(); + + + String message = new String(body, "UTF-8"); + System.out.println(" [x] Received '" + message + "'"); + System.out.println(" - ContentType: " + contentType); + System.out.println(" - Header: " + header); + System.out.println(" - DeliveryTag: " + deliveryTag); + } + +} diff --git a/java/src/com/SampleCode/ch2/Listing2_1_HelloWorldConsumer.java b/java/src/com/SampleCode/ch2/Listing2_1_HelloWorldConsumer.java new file mode 100644 index 0000000..c7efad0 --- /dev/null +++ b/java/src/com/SampleCode/ch2/Listing2_1_HelloWorldConsumer.java @@ -0,0 +1,65 @@ +package com.SampleCode.ch2; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeoutException; + +import com.SampleCode.util.LocalConnFactory; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.AMQP.Exchange.DeclareOk; +import com.rabbitmq.client.AMQP.Queue; + +/** + * Java version of Listing2.1 Example (Hello World consumer) + * P.31 + * @author andykwok + * + */ +public class Listing2_1_HelloWorldConsumer { + public static void main(String[] args) { + + + + //Exchange config + String exName = "Hello-exchange"; + String exType = "direct"; + + + //Queue + String qName = "hello-queue"; + String routing_key = "hola"; + + + //Construct a factory upon above parameters + LocalConnFactory factory = new LocalConnFactory(); + try(Connection conn = factory.newConnection(); + Channel ch = conn.createChannel();) { + + //Boolean: durable || autoDelete || internal (Passive)? + if(ch.exchangeDeclare(exName, exType, true, false, false, null) != null) { + if (ch.queueDeclare(qName, true, false, true, null) != null) { + if (ch.queueBind(qName, exName, routing_key) != null) { + + System.out.println(" - Fetch msg once per second, repeat for 100 times"); + //Reuse consumer instance + BasicConsumer consumer = new BasicConsumer(ch); + for (int i=0 ; i<100 ; i++) { + System.out.println("/////////////Consumer loop[" + (i+1) + "]"); + //ch.basicConsume(qName, consumer); + ch.basicConsume(qName, true, consumer); + try { + Thread.sleep(1000); + } catch (InterruptedException e) {System.err.println("Thread sleep error"); } + } + }else {System.err.println("Bind Queue Error!");} + }else {System.err.println("Error, queue[" + qName + "] can't not be create!");} + }else {System.err.println("Error, can't declare exchange !");} + } catch (IOException | TimeoutException e) { + System.err.println("Exception happen, triggered autocloseable to shut down connection, bye~");} + } +} diff --git a/java/src/com/SampleCode/ch2/Listing2_1_HelloWorldProducer.java b/java/src/com/SampleCode/ch2/Listing2_1_HelloWorldProducer.java new file mode 100644 index 0000000..2b0d32a --- /dev/null +++ b/java/src/com/SampleCode/ch2/Listing2_1_HelloWorldProducer.java @@ -0,0 +1,80 @@ +package com.SampleCode.ch2; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import com.SampleCode.util.LocalConnFactory; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.AMQP.Queue; +import com.rabbitmq.client.AMQP.Exchange.DeclareOk; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +/** + * Java version of Listing2.1 Example (Hello World producer) + * P.29 + * + * Noted that as queue is not specificed in this case yet, all msg publish from + * this metho wouldn't be received by {@link Listing2_1_HelloWorldConsumer} which + * have specify "hello-queue" as its queue to use. + * @author andykwok + * + */ +public class Listing2_1_HelloWorldProducer { + + + public static void main(String[] args) { + + + //Exchange config + String exName = "Hello-exchange"; + String exType = "direct"; + + //Queue + String qName = "hello-queue"; + + String routing_key = "hola"; + String payload = "Hello RabbitMQ"; + //Properties + BasicProperties properties = new AMQP.BasicProperties.Builder() + .contentType("text/plain") + .build(); + + //Construct a factory upon above parameters + LocalConnFactory factory = new LocalConnFactory(); + + try(Connection conn = factory.newConnection(); + /* + * No com.rabbitmq.client.AMQP, + * No com.rabbitmq.client.impl.AMQPImpl + * But com.rabbitmq.client.Channel + */ + Channel ch = conn.createChannel();) { + + //Passive == internal??? + if(ch.exchangeDeclare(exName, exType, true, false, false, null) != null) { + //Bind to queue "hello-queue" if want to let helloWorldConsumer to be workable. + if (ch.queueDeclare(qName, true, false, true, null) != null) { + if (ch.queueBind(qName, exName, routing_key) != null ) { + + System.out.println(" - Everything set, start to consume once per second for 50 times: "); + for (int i=0 ; i<50 ; i++) { + try { + Thread.sleep(1000); + ch.basicPublish(exName, routing_key, properties, (payload + "[" + i + "]") .getBytes()); + System.out.println(" - Publish msg: " + payload + "[" + i + "]"); + } catch (InterruptedException e) {e.printStackTrace();} + } + }else {System.err.println("Queue declare Error!"); } + }else {System.err.println("Declare Queue error!");} + }else {System.err.println("Error, can't declare exchange !");} + } catch (IOException | TimeoutException e) {System.err.println("Exception happen, triggered autocloseable bye~");} + + + } + + + +} diff --git a/java/src/com/SampleCode/ch2/Listing2_3_HelloWorldProducerWithConfirm.java b/java/src/com/SampleCode/ch2/Listing2_3_HelloWorldProducerWithConfirm.java new file mode 100644 index 0000000..7cf2832 --- /dev/null +++ b/java/src/com/SampleCode/ch2/Listing2_3_HelloWorldProducerWithConfirm.java @@ -0,0 +1,95 @@ +package com.SampleCode.ch2; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import com.SampleCode.util.LocalConnFactory; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.AMQP.Queue; +import com.rabbitmq.client.AMQP.Exchange.DeclareOk; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +/** + * Java version of Listing2.1 Example (Hello World producer) + * P.29 + * + * Noted that as queue is not specificed in this case yet, all msg publish from + * this metho wouldn't be received by {@link Listing2_1_HelloWorldConsumer} which + * have specify "hello-queue" as its queue to use. + * @author andykwok + * + */ +public class Listing2_3_HelloWorldProducerWithConfirm { + + + public static void main(String[] args) { + + //Exchange config + String exName = "Hello-exchange"; + String exType = "direct"; + + //Queue + String qName = "hello-queue"; + + String routing_key = "hola"; + String payload = "Hello RabbitMQ"; + //Properties + BasicProperties properties = new AMQP.BasicProperties.Builder() + .contentType("text/plain") + .build(); + + //Construct a factory upon above parameters + LocalConnFactory factory = new LocalConnFactory(); + + try(Connection conn = factory.newConnection(); + /* + * No com.rabbitmq.client.AMQP, + * No com.rabbitmq.client.impl.AMQPImpl + * But com.rabbitmq.client.Channel + */ + Channel ch = conn.createChannel();) { + + //Passive == internal??? + if(ch.exchangeDeclare(exName, exType, true, false, false, null) != null) { + //Bind to queue "hello-queue" if want to let helloWorldConsumer to be workable. + if (ch.queueDeclare(qName, true, false, true, null) != null) { + if (ch.queueBind(qName, exName, routing_key) != null ) { + //Confirm listener + ch.addConfirmListener(new BasicConfirmListener()); + //Enable confirm support + ch.confirmSelect(); + /** + * However in automatic mode, all msg would be consider send out succcessfully + * after basicPublish (No checking). + */ + ch.basicPublish(exName, routing_key, properties, payload.getBytes()); + System.out.println(" - Publish msg: " + payload); + //ConfirmListener take time to react which is a separate thread, so + //put main thread on sleep for 5s in order to wait. + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + }else {System.err.println("Queue bind erroe!"); } + }else {System.err.println("Queue declare error!");} + }else {System.err.println("Error, can't declare exchange !");} + + + + } catch (IOException | TimeoutException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + + } + + + +} diff --git a/java/src/com/SampleCode/ch2/package-info.java b/java/src/com/SampleCode/ch2/package-info.java new file mode 100644 index 0000000..a7bf924 --- /dev/null +++ b/java/src/com/SampleCode/ch2/package-info.java @@ -0,0 +1,12 @@ +/** + * A basic workable pair of producer and consumer to demonstrate basic + * Rabbit usage. Listign 2_3 is simalr as 2_1, the only different is + * 2_3 would wait 5s after msg send, if the msg deliver to consumer + * with in the period of time, acknowledge stage would send back to producer + * and ConfirmListener would print out acknowledge msg. + */ +/** + * @author andykwok + * + */ +package com.SampleCode.ch2; \ No newline at end of file diff --git a/java/src/com/SampleCode/ch3/Listing3_3_LogListeners.java b/java/src/com/SampleCode/ch3/Listing3_3_LogListeners.java new file mode 100644 index 0000000..076403c --- /dev/null +++ b/java/src/com/SampleCode/ch3/Listing3_3_LogListeners.java @@ -0,0 +1,92 @@ +package com.SampleCode.ch3; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import com.SampleCode.ch2.Listing2_1_HelloWorldConsumer; +import com.SampleCode.util.LocalConnFactory; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.AMQP.Exchange.DeclareOk; + +/** + * Java version of Listing3.3 example (Log Listeners) + * + * As you may notice queue name is not specified at all, in this case server-side would + * automatically generate a random name zip it along with the DeclareOK object when + * ch.queueDecalre( ) is called. As the queue name is generated randomly which intend + * to be disposable and one time use -only. Which are exclusive / auto-delete / non durable + * + * P.54 + * @author andykwok + * + */ +public class Listing3_3_LogListeners { + + public static void main(String[] args) { + + + String ExName = "amq.rabbitmq.log"; + String ExType = "topic"; + + + ConnectionFactory fact = new LocalConnFactory(); + + + try(Connection conn = fact.newConnection(); + Channel ch = conn.createChannel();) { + + if (ch.exchangeDeclare(ExName, ExType, true, false, true, null) != null) { + + /* + * Declare three queues (Error || Warning || Info), + * As name is not specified, MQ server would generate a + * random name and return it along with declare ok msg. + */ + String QnameError = ch.queueDeclare().getQueue(); + String QnameWarning = ch.queueDeclare().getQueue(); + String QnameInfo = ch.queueDeclare().getQueue(); + //Bind Queue + ch.queueBind(QnameError, ExName, "Error"); + ch.queueBind(QnameWarning, ExName, "Warning"); + ch.queueBind(QnameInfo, ExName, "Info"); + //Callback + LogConsumer CeError = new LogConsumer(ch, "Error"); + LogConsumer CeWarning= new LogConsumer(ch, "Warning"); + LogConsumer CeInfo = new LogConsumer(ch, "Info"); + + //Publish + for (int i=0 ;i <100 ; i++) { + ch.basicPublish("", QnameError , null, ("Hello world[" + i + "]").getBytes()); + ch.basicPublish("", QnameWarning , null, ("Hello world[" + i + "]").getBytes()); + ch.basicPublish("", QnameInfo , null, ("Hello world[" + i + "]").getBytes()); + } + + //Wait till all msg send out + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + try { + //consume + ch.basicConsume(QnameError, CeError ); + ch.basicConsume(QnameWarning, CeWarning ); + ch.basicConsume(QnameInfo, CeInfo ); + }catch (IOException ex) { + System.out.println("End of queue!"); + } + + + } + + } catch (IOException e) { + e.printStackTrace(); + } catch (TimeoutException e) { + e.printStackTrace(); + } + + } +} diff --git a/java/src/com/SampleCode/ch3/LogConsumer.java b/java/src/com/SampleCode/ch3/LogConsumer.java new file mode 100644 index 0000000..af24895 --- /dev/null +++ b/java/src/com/SampleCode/ch3/LogConsumer.java @@ -0,0 +1,58 @@ +package com.SampleCode.ch3; + +import java.io.IOException; + +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; + + + +/** + * Custom consumer which print out the msg content while received. + * @author andykwok + * + */ +public class LogConsumer extends DefaultConsumer { + + String level; + + public LogConsumer(Channel channel) { + super(channel); + } + + /** + * Super always the first line + * Ref: https://stackoverflow.com/questions/23395513/implicit-super-constructor-person-is-undefined-must-explicitly-invoke-another + * @param channel + * @param level + */ + public LogConsumer(Channel channel, String level) { + super(channel); + this.level = level; + } + + + + @Override + public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) + throws IOException { + String message = new String(body, "UTF-8"); + System.out.println(level + ": " + message + "\n"); + + //Acknowledge the msg , it drag the performance down, use it with caution. + /*Channel ch = getChannel(); + long deliveryTag = envelope.getDeliveryTag(); + ch.basicAck(deliveryTag, false);*/ + + + super.handleDelivery(consumerTag, envelope, properties, body); + } + + + + + + +} diff --git a/java/src/com/SampleCode/ch3/package-info.java b/java/src/com/SampleCode/ch3/package-info.java new file mode 100644 index 0000000..376e830 --- /dev/null +++ b/java/src/com/SampleCode/ch3/package-info.java @@ -0,0 +1,8 @@ +/** + * + */ +/** + * @author andykwok + * + */ +package com.SampleCode.ch3; \ No newline at end of file diff --git a/java/src/com/SampleCode/ch4/alertService/Listing4_1_2Alert.java b/java/src/com/SampleCode/ch4/alertService/Listing4_1_2Alert.java new file mode 100644 index 0000000..b3ec0ad --- /dev/null +++ b/java/src/com/SampleCode/ch4/alertService/Listing4_1_2Alert.java @@ -0,0 +1,99 @@ +package com.SampleCode.ch4.alertService; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import com.rabbitmq.client.AMQP.Exchange.DeclareOk; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +/** + * Account need to be created in advanced as book say, CLI can be found in following: + * + * - rabbitmqctl add_user alert_user alertme + * - rabbitmqctl set_permissions alert_user ".*" ".*" ".*" + * + * + * This piece of code used to demonstrate how routing key can be used to filter || route the + * msg, as you see critical.* || *.rate_limit are set for routing, all msg which dun meet + * the routing key woudl be simply ignored. + * @author andykwok + * + */ +public class Listing4_1_2Alert { + public static void main(String[] args) { + String ExName = "alerts_test"; + String ExType = "topic"; + + String QnameCritical = "critical"; + String QnameRate_limit = "rate_limit"; + + ConnectionFactory fact = new ConnectionFactory (); + fact.setHost("127.0.0.1"); + fact.setPort(5672); + fact.setUsername("alert_user"); + fact.setPassword("alertme"); + + try(Connection conn = fact.newConnection(); + Channel ch = conn.createChannel();) { + + //Declare Exchange + if ( ch.exchangeDeclare(ExName, ExType) !=null) { + /** + * Listing 4.2 + */ + + //Declare queue + ch.queueDeclare(QnameCritical, false, false, false, null); + ch.queueDeclare(QnameRate_limit, false, false, false, null); + + //Bind Queue + ch.queueBind(QnameCritical, ExName, "critical.*"); + ch.queueBind(QnameRate_limit, ExName, "*.rate_limit"); + + System.out.println(" - Declare work done !"); + //produce + //ch.basicPublish(ExName, "critical.xxx", null, "XXXX".getBytes()); + + //A reasonable rate. + for (int i=0 ; i<2 ; i++) { + try { + ch.basicPublish("alerts_test", "Test.rate_limit", null, ("Hello world[" + i + "]").getBytes()); + ch.basicPublish("alerts_test", "critical.sdf", null,("Hello world[-" + i + "]").getBytes()); + }catch (Exception ex) { + System.err.println("Sender error"); + } + + } + + System.out.println("After publish!"); + + /** + * Listing 4.3 Consume msg + * + * Remember to catch exception from basucConsume! + * If you let throw all the way up, it would reach with Catch block + * for try-with-resources and trigger connection to close, which is not + * it suppose to be. + */ + + System.out.println("Start consume!"); + + + NotifyConsumer consumer = new NotifyConsumer(ch); + //Only can acknowledge when autoack turn to false + ch.basicConsume("critical", false, consumer); + ch.basicConsume("rate_limit", false, consumer); + + + + System.out.println("Finish consume!"); + + } + + } catch (IOException | TimeoutException e) { + e.printStackTrace(); + } + } +} diff --git a/java/src/com/SampleCode/ch4/alertService/NotifyConsumer.java b/java/src/com/SampleCode/ch4/alertService/NotifyConsumer.java new file mode 100644 index 0000000..da7f1ae --- /dev/null +++ b/java/src/com/SampleCode/ch4/alertService/NotifyConsumer.java @@ -0,0 +1,44 @@ +package com.SampleCode.ch4.alertService; + +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; + +/** + * Listing 4.4 Critical alerts processor + * @author andykwok + * + */ +public class NotifyConsumer extends DefaultConsumer{ + + String email; + + + public NotifyConsumer(Channel channel) { + super(channel); + + } + + @Override + public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) + { + + + try { + super.handleDelivery(consumerTag, envelope, properties, body); + String message = new String(body, "UTF-8"); + String recips = "ops.team@ourcompany.com"; + + System.out.println ("Send alert via e-mail! Alert Text: " + message + + "\nRecipients: " + recips + "\n"); + }catch (Exception ex) { + System.err.println("Exception happen on HandleDelivery!"); + } + + } + + + + +} diff --git a/java/src/com/SampleCode/ch4/alertService/package-info.java b/java/src/com/SampleCode/ch4/alertService/package-info.java new file mode 100644 index 0000000..e05b4b1 --- /dev/null +++ b/java/src/com/SampleCode/ch4/alertService/package-info.java @@ -0,0 +1,8 @@ +/** + * + */ +/** + * @author andykwok + * + */ +package com.SampleCode.ch4.alertService; \ No newline at end of file diff --git a/java/src/com/SampleCode/ch4/pictureUploader/Listing4_10_UploadPicturesTest.java b/java/src/com/SampleCode/ch4/pictureUploader/Listing4_10_UploadPicturesTest.java new file mode 100644 index 0000000..6747428 --- /dev/null +++ b/java/src/com/SampleCode/ch4/pictureUploader/Listing4_10_UploadPicturesTest.java @@ -0,0 +1,5 @@ +package com.SampleCode.ch4.pictureUploader; + +public class Listing4_10_UploadPicturesTest { + +} diff --git a/java/src/com/SampleCode/ch4/pictureUploader/Listing4_11_ResizePictureConsumer.java b/java/src/com/SampleCode/ch4/pictureUploader/Listing4_11_ResizePictureConsumer.java new file mode 100644 index 0000000..331c994 --- /dev/null +++ b/java/src/com/SampleCode/ch4/pictureUploader/Listing4_11_ResizePictureConsumer.java @@ -0,0 +1,5 @@ +package com.SampleCode.ch4.pictureUploader; + +public class Listing4_11_ResizePictureConsumer { + +} diff --git a/java/src/com/SampleCode/ch4/pictureUploader/Listing4_12_rpc_server.java b/java/src/com/SampleCode/ch4/pictureUploader/Listing4_12_rpc_server.java new file mode 100644 index 0000000..1a09e50 --- /dev/null +++ b/java/src/com/SampleCode/ch4/pictureUploader/Listing4_12_rpc_server.java @@ -0,0 +1,5 @@ +package com.SampleCode.ch4.pictureUploader; + +public class Listing4_12_rpc_server { + +} diff --git a/java/src/com/SampleCode/ch4/pictureUploader/Listing4_13_rpc_client.java b/java/src/com/SampleCode/ch4/pictureUploader/Listing4_13_rpc_client.java new file mode 100644 index 0000000..8f274f3 --- /dev/null +++ b/java/src/com/SampleCode/ch4/pictureUploader/Listing4_13_rpc_client.java @@ -0,0 +1,5 @@ +package com.SampleCode.ch4.pictureUploader; + +public class Listing4_13_rpc_client { + +} diff --git a/java/src/com/SampleCode/ch4/pictureUploader/Listing4_7PicturesPublisher.java b/java/src/com/SampleCode/ch4/pictureUploader/Listing4_7PicturesPublisher.java new file mode 100644 index 0000000..c9c0921 --- /dev/null +++ b/java/src/com/SampleCode/ch4/pictureUploader/Listing4_7PicturesPublisher.java @@ -0,0 +1,51 @@ +package com.SampleCode.ch4.pictureUploader; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import com.rabbitmq.client.AMQP.Exchange.DeclareOk; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + + +/** + * Java version of listing 4.7 (Upload pictures publisher) + * @author andykwok + * + */ +public class Listing4_7PicturesPublisher { + public static void main(String[] args) { + + String ExName = "upload-pictures"; + String ExType = "fanout"; + String Qname = "add-points"; + String routeKey = "route_key"; + + + ConnectionFactory fact = new ConnectionFactory (); + fact.setHost("127.0.0.1"); + fact.setPort(5672); + fact.setUsername("guest"); + fact.setPassword("guest"); + + try(Connection conn = fact.newConnection(); + Channel ch = conn.createChannel();) { + + if (ch.exchangeDeclare(ExName, ExType, false, true, false, null) != null) { + if (ch.queueDeclare(Qname , false, false, false, null) != null) { + if (ch.queueBind(Qname, ExName, routeKey) != null) { + //Composite content + PicInJson payload = new PicInJson("img_iddd", "user_iddd", "/m/bababab"); + ch.basicPublish(ExName, "test_route_key", null, payload.getBytes()); + } + } + } + + } catch (IOException e) { + e.printStackTrace(); + } catch (TimeoutException e) { + e.printStackTrace(); + } + } +} diff --git a/java/src/com/SampleCode/ch4/pictureUploader/Listing4_8PicturesConsumer.java b/java/src/com/SampleCode/ch4/pictureUploader/Listing4_8PicturesConsumer.java new file mode 100644 index 0000000..3d13196 --- /dev/null +++ b/java/src/com/SampleCode/ch4/pictureUploader/Listing4_8PicturesConsumer.java @@ -0,0 +1,52 @@ +package com.SampleCode.ch4.pictureUploader; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.AMQP.Queue; +import com.rabbitmq.client.AMQP.Exchange.DeclareOk; + +/** + * P.76 Upload picture publisher + * @author andykwok + * + */ +public class Listing4_8PicturesConsumer { + public static void main(String[] args) { + + String ExName = "upload-pictures"; + String ExType = "fanout"; + String Qname = "add-points"; + String routeKey = "route_key"; + + + ConnectionFactory fact = new ConnectionFactory (); + fact.setHost("127.0.0.1"); + fact.setPort(5672); + fact.setUsername("guest"); + fact.setPassword("guest"); + + try(Connection conn = fact.newConnection(); + Channel ch = conn.createChannel();) { + + PicConsumer consumer = new PicConsumer(ch); + + if (ch.exchangeDeclare(ExName, ExType, false, true, false, null) != null) { + if (ch.queueDeclare(Qname , false, false, false, null) != null) { + if (ch.queueBind(Qname, ExName, routeKey) != null) { + System.out.println("Ready to consume!"); + ch.basicConsume(Qname, consumer); + ch.basicConsume(Qname, consumer); + } + } + } + } catch (IOException e) { + e.printStackTrace(); + } catch (TimeoutException e) { + e.printStackTrace(); + } + } +} diff --git a/java/src/com/SampleCode/ch4/pictureUploader/PicConsumer.java b/java/src/com/SampleCode/ch4/pictureUploader/PicConsumer.java new file mode 100644 index 0000000..3e806cd --- /dev/null +++ b/java/src/com/SampleCode/ch4/pictureUploader/PicConsumer.java @@ -0,0 +1,29 @@ +package com.SampleCode.ch4.pictureUploader; + +import java.io.IOException; + +import org.json.JSONObject; + +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; + +public class PicConsumer extends DefaultConsumer{ + + public PicConsumer(Channel channel) { + super(channel); + } + + @Override + public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) + throws IOException { + System.out.println("Hello handle delivery"); + String message = new String(body, "UTF-8"); + JSONObject json = new JSONObject (message); + System.out.println("Received msg: " + json.toString() ); + super.handleDelivery(consumerTag, envelope, properties, body); + } + + +} diff --git a/java/src/com/SampleCode/ch4/pictureUploader/PicInJson.java b/java/src/com/SampleCode/ch4/pictureUploader/PicInJson.java new file mode 100644 index 0000000..88b4839 --- /dev/null +++ b/java/src/com/SampleCode/ch4/pictureUploader/PicInJson.java @@ -0,0 +1,24 @@ +package com.SampleCode.ch4.pictureUploader; + +import org.json.JSONObject; + +public class PicInJson extends JSONObject { + + + public PicInJson(String image_id, String user_id, String image_path) { + super(); + this.put("image_id", image_id); + this.put("user_id", user_id); + this.put("image_path", image_path); + } + + + public byte[] getBytes() { + return toString( ).getBytes(); + } + + + + + +} diff --git a/java/src/com/SampleCode/ch4/pictureUploader/package-info.java b/java/src/com/SampleCode/ch4/pictureUploader/package-info.java new file mode 100644 index 0000000..ae69238 --- /dev/null +++ b/java/src/com/SampleCode/ch4/pictureUploader/package-info.java @@ -0,0 +1,9 @@ +/** + * Some pieces like 4.9 / 4.10 may be missing as it's not yet a + * implementation, so only MQ part code is done. + */ +/** + * @author andykwok + * + */ +package com.SampleCode.ch4.pictureUploader; \ No newline at end of file diff --git a/java/src/com/SampleCode/ch6/Listing6_2StdConsumer.java b/java/src/com/SampleCode/ch6/Listing6_2StdConsumer.java new file mode 100644 index 0000000..5b0685f --- /dev/null +++ b/java/src/com/SampleCode/ch6/Listing6_2StdConsumer.java @@ -0,0 +1,58 @@ +package com.SampleCode.ch6; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import com.SampleCode.util.LocalConnFactory; +import com.SampleCode.util.NormalConsumer; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.Consumer; + +/** + * Normal consumer which attempt to connect MQ Server only once. + * And terminate either connection established successfully and + * consume for once or Exception happen which lead to connection + * shutdown. (Won't re-attempt in either case). + * + * @author andykwok + * + */ +public class Listing6_2StdConsumer { + + public static void main(String[] args) { + + + //Exchange config + String exName = "cluster_test"; + String exType = "direct"; + + + //Queue + String qName = "cluster_test"; + String routingKey = "cluster_test"; + + + LocalConnFactory factory = new LocalConnFactory(); + + try (Connection conn = factory.newConnection(); + Channel ch = conn.createChannel();) { + + if (ch.exchangeDeclare(exName, exType, false, true, null) != null) { + if (ch.queueDeclare(qName, false, false, true, null) != null) { + if (ch.queueBind(qName, exName, routingKey)!=null) { + + Consumer consumer = new NormalConsumer(ch); + ch.basicConsume(qName, consumer); + + }else {System.out.println("Queue binding error!"); } + }else {System.out.println("Queue declare error!");} + }else {System.out.println("Exchange declare error!");} + + + } catch (IOException | TimeoutException e) { + e.printStackTrace(); + } + + } +} diff --git a/java/src/com/SampleCode/ch6/Listing6_4ClusterAwareConsumer.java b/java/src/com/SampleCode/ch6/Listing6_4ClusterAwareConsumer.java new file mode 100644 index 0000000..6961489 --- /dev/null +++ b/java/src/com/SampleCode/ch6/Listing6_4ClusterAwareConsumer.java @@ -0,0 +1,67 @@ +package com.SampleCode.ch6; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import com.SampleCode.util.LocalConnFactory; +import com.SampleCode.util.NormalConsumer; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.Consumer; + +/** + * Similar to 6.2 but this time having a outer loop which let program automatically + * reconnect once single consumer is done || connection failure. -> it match the case + * that once the connecting node is down, then the program would smart enough to + * auto connect. + * + * + * + * @author andykwok + * + */ +public class Listing6_4ClusterAwareConsumer { + + public static void main(String[] args) { + + + //Exchange config + String exName = "cluster_test"; + String exType = "direct"; + + //Queue + String qName = "cluster_test"; + String routingKey = "cluster_test"; + + + LocalConnFactory factory = new LocalConnFactory(); + while (true) { + //100 Times for demonstration purpose + //for (int i=0 ; i<100 ; i++) { + try (Connection conn = factory.newConnection(); + Channel ch = conn.createChannel();) { + if (ch.exchangeDeclare(exName, exType, false, true, null) != null) { + if (ch.queueDeclare(qName, false, false, true, null) != null) { + if (ch.queueBind(qName, exName, routingKey)!=null) { + + System.out.println("Consumming....."); + Consumer consumer = new NormalConsumer(ch); + for (int i = 0; i < 100; i++) { + try { + Thread.sleep(1000); + ch.basicConsume(qName, consumer); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + } + }else {System.out.println("Queue binding error!"); } + }else {System.out.println("Queue declare error!");} + }else {System.out.println("Exchange declare error!");} + } catch (IOException | TimeoutException e) { + System.err.println("Exception occur, attempt to reconnect!"); + } + + } + } +} diff --git a/java/src/com/SampleCode/ch6/Listing6_5ClusterAwareProducer.java b/java/src/com/SampleCode/ch6/Listing6_5ClusterAwareProducer.java new file mode 100644 index 0000000..ae4f4af --- /dev/null +++ b/java/src/com/SampleCode/ch6/Listing6_5ClusterAwareProducer.java @@ -0,0 +1,56 @@ +package com.SampleCode.ch6; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.util.concurrent.TimeoutException; + +import org.json.JSONObject; + +import com.SampleCode.util.LocalConnFactory; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; + + + +/** + * Producer for Listing 6.2/4 which send msg to exchange "Cluster_test" + * for testing purpose. + * @author andykwok + * + */ +public class Listing6_5ClusterAwareProducer { + + public static void main(String[] args) { + + //Exchange config + String exName = "cluster_test"; + String exType = "direct"; + + //Queue + String qName = "cluster_test"; + String routingKey = "cluster_test"; + + + + + LocalConnFactory factory = new LocalConnFactory(); + + try (Connection conn = factory.newConnection(); + Channel ch = conn.createChannel();) { + if(ch.exchangeDeclare(exName, exType, false, true, null) != null ) { + if(ch.queueDeclare(qName, false, false, true, null) != null) { + if (ch.queueBind(qName, exName, routingKey) != null) { + + //Composite a JSon string + JSONObject msg = new JSONObject(); + msg.put("Content", "Cluster Test!"); + msg.put("time", LocalDateTime.now().toString()); + ch.basicPublish(exName, routingKey, null, msg.toString().getBytes()); + }else {System.out.println("Queue binding error!"); } + }else {System.out.println("Queue declare error!");} + }else {System.out.println("Exchange declare error!");} + } catch (IOException | TimeoutException e) { + e.printStackTrace(); + } + } +} diff --git a/java/src/com/SampleCode/ch6/package-info.java b/java/src/com/SampleCode/ch6/package-info.java new file mode 100644 index 0000000..233a105 --- /dev/null +++ b/java/src/com/SampleCode/ch6/package-info.java @@ -0,0 +1,8 @@ +/** + * + */ +/** + * @author andykwok + * + */ +package com.SampleCode.ch6; \ No newline at end of file diff --git a/java/src/com/SampleCode/ch7/Listing7_3ShovelConsumer.java b/java/src/com/SampleCode/ch7/Listing7_3ShovelConsumer.java new file mode 100644 index 0000000..1373ff1 --- /dev/null +++ b/java/src/com/SampleCode/ch7/Listing7_3ShovelConsumer.java @@ -0,0 +1,62 @@ +package com.SampleCode.ch7; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import com.SampleCode.util.LocalConnFactory; +import com.SampleCode.util.NormalConsumer; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.Consumer; + +/** + * + * + * @author andykwok + * + */ +public class Listing7_3ShovelConsumer { + + public static void main(String[] args) { + + + + //Exchange config + String exName = "incoming_orders"; + String exType = "direct"; + + //Queue + String qName = "incoming_orders"; + String routingKey = "warehouse"; + + + LocalConnFactory factory = new LocalConnFactory(); + + + try (Connection conn = factory.newConnection(); + Channel ch = conn.createChannel();) { + if (ch.exchangeDeclare(exName, exType, false, true, null) != null) { + if (ch.queueDeclare(qName, false, false, true, null) != null) { + if (ch.queueBind(qName, exName, routingKey)!=null) { + //listen for 100s refresh for every second + for (int i = 0; i < 100; i++) { + + try { + Thread.sleep(1000); + System.out.println("Consumming....."); + Consumer consumer = new NormalConsumer(ch); + ch.basicConsume(qName, consumer); + + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }else {System.out.println("Queue binding error!"); } + }else {System.out.println("Queue declare error!");} + }else {System.out.println("Exchange declare error!");} + } catch (IOException | TimeoutException e) { + System.err.println("Exception occur, attempt to reconnect!"); + } + + } +} diff --git a/java/src/com/SampleCode/ch7/Listing7_4ShovelProducer.java b/java/src/com/SampleCode/ch7/Listing7_4ShovelProducer.java new file mode 100644 index 0000000..03fa947 --- /dev/null +++ b/java/src/com/SampleCode/ch7/Listing7_4ShovelProducer.java @@ -0,0 +1,40 @@ +package com.SampleCode.ch7; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import com.SampleCode.util.LocalConnFactory; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; + +public class Listing7_4ShovelProducer { + public static void main(String[] args) { + + //Exchange config + String exName = "incoming_orders"; + String exType = "direct"; + + //Queue + String qName = "incoming_orders"; + String routingKey = "warehouse"; + + + + + LocalConnFactory factory = new LocalConnFactory(); + + try (Connection conn = factory.newConnection(); + Channel ch = conn.createChannel();) { + if(ch.exchangeDeclare(exName, exType, false, true, null) != null ) { + if(ch.queueDeclare(qName, false, false, true, null) != null) { + if (ch.queueBind(qName, exName, routingKey) != null) { + Order order = new Order(1, "AVOCADO_TYPE"); + ch.basicPublish(exName, routingKey, null, order.toString().getBytes()); + }else {System.out.println("Queue binding error!"); } + }else {System.out.println("Queue declare error!");} + }else {System.out.println("Exchange declare error!");} + } catch (IOException | TimeoutException e) { + e.printStackTrace(); + } + } +} diff --git a/java/src/com/SampleCode/ch7/Order.java b/java/src/com/SampleCode/ch7/Order.java new file mode 100644 index 0000000..a7421ce --- /dev/null +++ b/java/src/com/SampleCode/ch7/Order.java @@ -0,0 +1,13 @@ +package com.SampleCode.ch7; + +import org.json.JSONObject; + +public class Order extends JSONObject { + + public Order (int ordNum, String type) { + super(); + put("ordernum", ordNum); + put("type", type); + } + +}