From 7ed271b3377caac85fcf0527381d8cc1c2b36086 Mon Sep 17 00:00:00 2001 From: Si Fraser Date: Sun, 15 Jul 2012 19:36:18 +0100 Subject: [PATCH 1/3] chapter two java examples refactor additional examples for Maven --- .gitignore | 7 +- java/appendix-a/Server.java | 102 ------------------ java/pom.xml | 35 ++++++ .../sourcecode/GenericConfiguration.java | 24 +++++ .../sourcecode/appendixA}/Client.java | 2 + .../sourcecode/appendixA/Server.java | 88 +++++++++++++++ .../chapter2/Chapter2Configuration.java | 19 ++++ .../sourcecode/chapter2/Consumer.java | 62 +++++++++++ .../sourcecode/chapter2/Producer.java | 64 +++++++++++ 9 files changed, 300 insertions(+), 103 deletions(-) delete mode 100644 java/appendix-a/Server.java create mode 100644 java/pom.xml create mode 100644 java/src/main/java/rabbitmqinaction/sourcecode/GenericConfiguration.java rename java/{appendix-a => src/main/java/rabbitmqinaction/sourcecode/appendixA}/Client.java (98%) create mode 100644 java/src/main/java/rabbitmqinaction/sourcecode/appendixA/Server.java create mode 100644 java/src/main/java/rabbitmqinaction/sourcecode/chapter2/Chapter2Configuration.java create mode 100644 java/src/main/java/rabbitmqinaction/sourcecode/chapter2/Consumer.java create mode 100644 java/src/main/java/rabbitmqinaction/sourcecode/chapter2/Producer.java diff --git a/.gitignore b/.gitignore index 28269e5..ebc0627 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,11 @@ bin +target obj +.project +.cache +.settings +.classpath *.suo *.csproj.user *.class -*.jar \ No newline at end of file +*.jar diff --git a/java/appendix-a/Server.java b/java/appendix-a/Server.java deleted file mode 100644 index 9895f21..0000000 --- a/java/appendix-a/Server.java +++ /dev/null @@ -1,102 +0,0 @@ -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.QueueingConsumer; -import com.rabbitmq.client.QueueingConsumer.Delivery; -import com.rabbitmq.client.AMQP.BasicProperties; -import org.json.JSONObject; - -public class Server -{ - private Connection connection; - private Channel channel; - private QueueingConsumer consumer; - - public Server Server(){ - return this; - } - - public Server init() - throws Exception { - ConnectionFactory factory = new ConnectionFactory(); - factory.setUsername("rpc_user"); - factory.setPassword("rpcme"); - connection = factory.newConnection(); - channel = connection.createChannel(); - - channel.exchangeDeclare("rpc", "direct"); - channel.queueDeclare("ping", false, false, false, null); - channel.queueBind("ping", "rpc", "ping"); - - consumer = new QueueingConsumer(channel); - channel.basicConsume("ping", false, "ping", consumer); - - System.out.println( - "Waiting for RPC calls..." - ); - - return this; - } - - public void closeConnection() { - if (connection != null) { - try { - connection.close(); - } - catch (Exception ignore) {} - } - } - - public void serveRequests() { - while (true) { - try { - - Delivery delivery = consumer.nextDelivery(); - BasicProperties props = delivery.getProperties(); - - channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); - System.out.println( - "Received API call...replying..." - ); - - channel.basicPublish( - "", - props.getReplyTo(), - null, - getResponse(delivery).getBytes("UTF-8") - ); - - } catch (Exception e){ - System.out.println(e.toString()); - } - } - } - - private String getResponse(Delivery delivery) { - String response = null; - try { - String message = new String(delivery.getBody(), "UTF-8"); - JSONObject jsonobject = new JSONObject(message); - response = "Pong!" + jsonobject.getString("time"); - } - catch (Exception e){ - System.out.println(e.toString()); - response = ""; - } - return response; - } - - public static void main(String[] args) { - Server server = null; - try { - server = new Server(); - server.init().serveRequests(); - } catch(Exception e) { - e.printStackTrace(); - } finally { - if(server != null) { - server.closeConnection(); - } - } - } -} \ No newline at end of file diff --git a/java/pom.xml b/java/pom.xml new file mode 100644 index 0000000..6fc2544 --- /dev/null +++ b/java/pom.xml @@ -0,0 +1,35 @@ + + 4.0.0 + + rabbitmqinaction + sourcecode-java + 0.0.1-SNAPSHOT + jar + + sourcecode-java + http://maven.apache.org + + + UTF-8 + + + + + junit + junit + 3.8.1 + test + + + com.rabbitmq + amqp-client + 2.8.4 + + + org.json + json + 20090211 + + + diff --git a/java/src/main/java/rabbitmqinaction/sourcecode/GenericConfiguration.java b/java/src/main/java/rabbitmqinaction/sourcecode/GenericConfiguration.java new file mode 100644 index 0000000..40ac545 --- /dev/null +++ b/java/src/main/java/rabbitmqinaction/sourcecode/GenericConfiguration.java @@ -0,0 +1,24 @@ +/** + * RabbitMQ in Action - Generic constants + * + * @author Simon Fraser, Siniatech Ltd + */ +package rabbitmqinaction.sourcecode; + +import java.util.HashMap; +import java.util.Map; + +public class GenericConfiguration { + + public static final boolean ACTIVE = false; + public static final boolean PASSIVE = false; + public static final boolean DURABLE = true; + public static final boolean NON_DURABLE = false; + public static final boolean AUTO_DELETE = true; + public static final boolean NON_AUTO_DELETE = false; + public static final String DIRECT_EXCHANGE_TYPE = "direct"; + public static final String HOST = "192.168.0.19"; + public static final String PLAIN_CONTENT_TYPE = "text/plain"; + public static final Map EMPTY_MAP = new HashMap(); + +} diff --git a/java/appendix-a/Client.java b/java/src/main/java/rabbitmqinaction/sourcecode/appendixA/Client.java similarity index 98% rename from java/appendix-a/Client.java rename to java/src/main/java/rabbitmqinaction/sourcecode/appendixA/Client.java index af1594c..15a66d2 100644 --- a/java/appendix-a/Client.java +++ b/java/src/main/java/rabbitmqinaction/sourcecode/appendixA/Client.java @@ -1,3 +1,5 @@ +package rabbitmqinaction.sourcecode.appendixA; + import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; diff --git a/java/src/main/java/rabbitmqinaction/sourcecode/appendixA/Server.java b/java/src/main/java/rabbitmqinaction/sourcecode/appendixA/Server.java new file mode 100644 index 0000000..cd2db73 --- /dev/null +++ b/java/src/main/java/rabbitmqinaction/sourcecode/appendixA/Server.java @@ -0,0 +1,88 @@ +package rabbitmqinaction.sourcecode.appendixA; + +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.QueueingConsumer; +import com.rabbitmq.client.QueueingConsumer.Delivery; +import com.rabbitmq.client.AMQP.BasicProperties; +import org.json.JSONObject; + +public class Server { + private Connection connection; + private Channel channel; + private QueueingConsumer consumer; + + public Server init() throws Exception { + ConnectionFactory factory = new ConnectionFactory(); + factory.setUsername( "rpc_user" ); + factory.setPassword( "rpcme" ); + connection = factory.newConnection(); + channel = connection.createChannel(); + + channel.exchangeDeclare( "rpc", "direct" ); + channel.queueDeclare( "ping", false, false, false, null ); + channel.queueBind( "ping", "rpc", "ping" ); + + consumer = new QueueingConsumer( channel ); + channel.basicConsume( "ping", false, "ping", consumer ); + + System.out.println( "Waiting for RPC calls..." ); + + return this; + } + + public void closeConnection() { + if ( connection != null ) { + try { + connection.close(); + } catch ( Exception ignore ) { + } + } + } + + public void serveRequests() { + while ( true ) { + try { + + Delivery delivery = consumer.nextDelivery(); + BasicProperties props = delivery.getProperties(); + + channel.basicAck( delivery.getEnvelope().getDeliveryTag(), false ); + System.out.println( "Received API call...replying..." ); + + channel.basicPublish( "", props.getReplyTo(), null, getResponse( delivery ).getBytes( "UTF-8" ) ); + + } catch ( Exception e ) { + System.out.println( e.toString() ); + } + } + } + + private String getResponse( Delivery delivery ) { + String response = null; + try { + String message = new String( delivery.getBody(), "UTF-8" ); + JSONObject jsonobject = new JSONObject( message ); + response = "Pong!" + jsonobject.getString( "time" ); + } catch ( Exception e ) { + System.out.println( e.toString() ); + response = ""; + } + return response; + } + + public static void main( String[] args ) { + Server server = null; + try { + server = new Server(); + server.init().serveRequests(); + } catch ( Exception e ) { + e.printStackTrace(); + } finally { + if ( server != null ) { + server.closeConnection(); + } + } + } +} \ No newline at end of file diff --git a/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/Chapter2Configuration.java b/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/Chapter2Configuration.java new file mode 100644 index 0000000..9b35dc5 --- /dev/null +++ b/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/Chapter2Configuration.java @@ -0,0 +1,19 @@ +/** + * RabbitMQ in Action - Chapter 2 Examples + * + * @author Alvaro Videla (original) + * @author Jason J. W. Williams (original) + * @author Simon Fraser, Siniatech Ltd (translation) + */ +package rabbitmqinaction.sourcecode.chapter2; + +public class Chapter2Configuration { + + public static final String ROUTING_KEY = "hola"; + public static final String USERNAME = "guest"; + public static final String PASSWORD = "guest"; + public static final String EXCHANGE = "hello-exchange"; + public static final String QUEUE_NAME = "hello-queue"; + public static final String CONSUMER_TAG = "hello-consumer"; + +} diff --git a/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/Consumer.java b/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/Consumer.java new file mode 100644 index 0000000..e8000dd --- /dev/null +++ b/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/Consumer.java @@ -0,0 +1,62 @@ +/** + * RabbitMQ in Action - Chapter 2 Examples + * + * @author Alvaro Videla (original) + * @author Jason J. W. Williams (original) + * @author Simon Fraser, Siniatech Ltd (translation) + */ +package rabbitmqinaction.sourcecode.chapter2; + +import static rabbitmqinaction.sourcecode.GenericConfiguration.*; +import static rabbitmqinaction.sourcecode.chapter2.Chapter2Configuration.*; + +import java.io.IOException; +import java.util.HashMap; + +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; + +public class Consumer { + + public static void main( String[] args ) throws IOException { + ConnectionFactory factory = new ConnectionFactory(); + factory.setUsername( USERNAME ); + factory.setPassword( PASSWORD ); + factory.setHost( HOST ); + + Connection connection = factory.newConnection(); + + Channel channel = connection.createChannel(); + channel.exchangeDeclare( EXCHANGE, DIRECT_EXCHANGE_TYPE, ACTIVE, DURABLE, NON_AUTO_DELETE, new HashMap() ); + channel.queueDeclare( QUEUE_NAME, ACTIVE, DURABLE, NON_AUTO_DELETE, null ); + channel.queueBind( QUEUE_NAME, EXCHANGE, ROUTING_KEY, EMPTY_MAP ); + channel.basicConsume( QUEUE_NAME, false, CONSUMER_TAG, new ConsumerCallback( channel ) ); + } + +} + +class ConsumerCallback extends DefaultConsumer { + + public ConsumerCallback( Channel channel ) { + super( channel ); + } + + @Override + public void handleDelivery( String consumerTag, Envelope envelope, BasicProperties properties, byte[] body ) throws IOException { + getChannel().basicAck( envelope.getDeliveryTag(), false ); + String msg = new String( body ); + if ( "quit".equals( msg ) ) { + getChannel().basicCancel( consumerTag ); + getChannel().close(); + getChannel().getConnection().close(); + System.exit( 0 ); + } else { + System.out.println( msg ); + } + } + +} diff --git a/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/Producer.java b/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/Producer.java new file mode 100644 index 0000000..9c4911e --- /dev/null +++ b/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/Producer.java @@ -0,0 +1,64 @@ +/** + * RabbitMQ in Action - Chapter 2 Examples + * + * @author Alvaro Videla (original) + * @author Jason J. W. Williams (original) + * @author Simon Fraser, Siniatech Ltd (translation) + */ +package rabbitmqinaction.sourcecode.chapter2; + +import static rabbitmqinaction.sourcecode.GenericConfiguration.*; +import static rabbitmqinaction.sourcecode.chapter2.Chapter2Configuration.*; + +import java.io.IOException; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.AMQP.BasicProperties.Builder; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.ConfirmListener; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +public class Producer { + + public static void main( String[] args ) throws IOException, InterruptedException { + if ( args.length != 1 ) { + System.err.println( "Message body must be supplied" ); + System.exit( 1 ); + } + ConnectionFactory factory = new ConnectionFactory(); + factory.setUsername( USERNAME ); + factory.setPassword( PASSWORD ); + factory.setHost( HOST ); + + Connection connection = factory.newConnection(); + + String msg = args[0]; + Builder msgPropertiesBuilder = new AMQP.BasicProperties.Builder(); + msgPropertiesBuilder.contentType( PLAIN_CONTENT_TYPE ); + BasicProperties msgProperties = msgPropertiesBuilder.build(); + + Channel channel = connection.createChannel(); + channel.confirmSelect(); + channel.addConfirmListener( new ConfirmHandler() ); + channel.basicPublish( EXCHANGE, ROUTING_KEY, msgProperties, msg.getBytes() ); + channel.waitForConfirmsOrDie(); + + channel.close(); + connection.close(); + } + +} + +class ConfirmHandler implements ConfirmListener { + + public void handleAck( long deliveryTag, boolean multiple ) throws IOException { + System.out.println( "Confirm receipt" ); + } + + public void handleNack( long deliveryTag, boolean multiple ) throws IOException { + System.out.println( "Message lost" ); + } + +} From 84f9e038b0d282fac770d31d47b9f51797a4e88b Mon Sep 17 00:00:00 2001 From: Si Fraser Date: Tue, 17 Jul 2012 21:02:28 +0100 Subject: [PATCH 2/3] Use localhost in generic config --- .../java/rabbitmqinaction/sourcecode/GenericConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/src/main/java/rabbitmqinaction/sourcecode/GenericConfiguration.java b/java/src/main/java/rabbitmqinaction/sourcecode/GenericConfiguration.java index 40ac545..f0e6dc7 100644 --- a/java/src/main/java/rabbitmqinaction/sourcecode/GenericConfiguration.java +++ b/java/src/main/java/rabbitmqinaction/sourcecode/GenericConfiguration.java @@ -17,7 +17,7 @@ public class GenericConfiguration { public static final boolean AUTO_DELETE = true; public static final boolean NON_AUTO_DELETE = false; public static final String DIRECT_EXCHANGE_TYPE = "direct"; - public static final String HOST = "192.168.0.19"; + public static final String HOST = "localhost"; public static final String PLAIN_CONTENT_TYPE = "text/plain"; public static final Map EMPTY_MAP = new HashMap(); From 62560cd471d6e03e809f4a65d38d2cca916a2f34 Mon Sep 17 00:00:00 2001 From: Si Fraser Date: Wed, 25 Jul 2012 21:54:05 +0100 Subject: [PATCH 3/3] added the variations on the ch.2 examples --- .../sourcecode/chapter2/Consumer.java | 1 - .../chapter2/ConsumerWithConfirms.java | 62 ++++++++++++++++++ .../sourcecode/chapter2/Producer.java | 17 +---- .../chapter2/ProducerWithConfirms.java | 64 +++++++++++++++++++ .../sourcecode/chapter2/ProducerWithTx.java | 50 +++++++++++++++ 5 files changed, 177 insertions(+), 17 deletions(-) create mode 100644 java/src/main/java/rabbitmqinaction/sourcecode/chapter2/ConsumerWithConfirms.java create mode 100644 java/src/main/java/rabbitmqinaction/sourcecode/chapter2/ProducerWithConfirms.java create mode 100644 java/src/main/java/rabbitmqinaction/sourcecode/chapter2/ProducerWithTx.java diff --git a/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/Consumer.java b/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/Consumer.java index e8000dd..467e1e5 100644 --- a/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/Consumer.java +++ b/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/Consumer.java @@ -47,7 +47,6 @@ public ConsumerCallback( Channel channel ) { @Override public void handleDelivery( String consumerTag, Envelope envelope, BasicProperties properties, byte[] body ) throws IOException { - getChannel().basicAck( envelope.getDeliveryTag(), false ); String msg = new String( body ); if ( "quit".equals( msg ) ) { getChannel().basicCancel( consumerTag ); diff --git a/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/ConsumerWithConfirms.java b/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/ConsumerWithConfirms.java new file mode 100644 index 0000000..a50ac9d --- /dev/null +++ b/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/ConsumerWithConfirms.java @@ -0,0 +1,62 @@ +/** + * RabbitMQ in Action - Chapter 2 Examples + * + * @author Alvaro Videla (original) + * @author Jason J. W. Williams (original) + * @author Simon Fraser, Siniatech Ltd (translation) + */ +package rabbitmqinaction.sourcecode.chapter2; + +import static rabbitmqinaction.sourcecode.GenericConfiguration.*; +import static rabbitmqinaction.sourcecode.chapter2.Chapter2Configuration.*; + +import java.io.IOException; +import java.util.HashMap; + +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; + +public class ConsumerWithConfirms { + + public static void main( String[] args ) throws IOException { + ConnectionFactory factory = new ConnectionFactory(); + factory.setUsername( USERNAME ); + factory.setPassword( PASSWORD ); + factory.setHost( HOST ); + + Connection connection = factory.newConnection(); + + Channel channel = connection.createChannel(); + channel.exchangeDeclare( EXCHANGE, DIRECT_EXCHANGE_TYPE, ACTIVE, DURABLE, NON_AUTO_DELETE, new HashMap() ); + channel.queueDeclare( QUEUE_NAME, ACTIVE, DURABLE, NON_AUTO_DELETE, null ); + channel.queueBind( QUEUE_NAME, EXCHANGE, ROUTING_KEY, EMPTY_MAP ); + channel.basicConsume( QUEUE_NAME, false, CONSUMER_TAG, new ConsumerCallbackWithConfirms( channel ) ); + } + +} + +class ConsumerCallbackWithConfirms extends DefaultConsumer { + + public ConsumerCallbackWithConfirms( Channel channel ) { + super( channel ); + } + + @Override + public void handleDelivery( String consumerTag, Envelope envelope, BasicProperties properties, byte[] body ) throws IOException { + getChannel().basicAck( envelope.getDeliveryTag(), false ); + String msg = new String( body ); + if ( "quit".equals( msg ) ) { + getChannel().basicCancel( consumerTag ); + getChannel().close(); + getChannel().getConnection().close(); + System.exit( 0 ); + } else { + System.out.println( msg ); + } + } + +} diff --git a/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/Producer.java b/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/Producer.java index 9c4911e..b9f0787 100644 --- a/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/Producer.java +++ b/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/Producer.java @@ -16,7 +16,6 @@ import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.AMQP.BasicProperties.Builder; import com.rabbitmq.client.Channel; -import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; @@ -40,25 +39,11 @@ public static void main( String[] args ) throws IOException, InterruptedExceptio BasicProperties msgProperties = msgPropertiesBuilder.build(); Channel channel = connection.createChannel(); - channel.confirmSelect(); - channel.addConfirmListener( new ConfirmHandler() ); + channel.exchangeDeclare( EXCHANGE, DIRECT_EXCHANGE_TYPE, PASSIVE, DURABLE, NON_AUTO_DELETE, EMPTY_MAP ); channel.basicPublish( EXCHANGE, ROUTING_KEY, msgProperties, msg.getBytes() ); - channel.waitForConfirmsOrDie(); channel.close(); connection.close(); } } - -class ConfirmHandler implements ConfirmListener { - - public void handleAck( long deliveryTag, boolean multiple ) throws IOException { - System.out.println( "Confirm receipt" ); - } - - public void handleNack( long deliveryTag, boolean multiple ) throws IOException { - System.out.println( "Message lost" ); - } - -} diff --git a/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/ProducerWithConfirms.java b/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/ProducerWithConfirms.java new file mode 100644 index 0000000..5d49b9c --- /dev/null +++ b/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/ProducerWithConfirms.java @@ -0,0 +1,64 @@ +/** + * RabbitMQ in Action - Chapter 2 Examples + * + * @author Alvaro Videla (original) + * @author Jason J. W. Williams (original) + * @author Simon Fraser, Siniatech Ltd (translation) + */ +package rabbitmqinaction.sourcecode.chapter2; + +import static rabbitmqinaction.sourcecode.GenericConfiguration.*; +import static rabbitmqinaction.sourcecode.chapter2.Chapter2Configuration.*; + +import java.io.IOException; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.AMQP.BasicProperties.Builder; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.ConfirmListener; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +public class ProducerWithConfirms { + + public static void main( String[] args ) throws IOException, InterruptedException { + if ( args.length != 1 ) { + System.err.println( "Message body must be supplied" ); + System.exit( 1 ); + } + ConnectionFactory factory = new ConnectionFactory(); + factory.setUsername( USERNAME ); + factory.setPassword( PASSWORD ); + factory.setHost( HOST ); + + Connection connection = factory.newConnection(); + + String msg = args[0]; + Builder msgPropertiesBuilder = new AMQP.BasicProperties.Builder(); + msgPropertiesBuilder.contentType( PLAIN_CONTENT_TYPE ); + BasicProperties msgProperties = msgPropertiesBuilder.build(); + + Channel channel = connection.createChannel(); + channel.confirmSelect(); + channel.addConfirmListener( new ConfirmHandler() ); + channel.basicPublish( EXCHANGE, ROUTING_KEY, msgProperties, msg.getBytes() ); + channel.waitForConfirmsOrDie(); + + channel.close(); + connection.close(); + } + +} + +class ConfirmHandler implements ConfirmListener { + + public void handleAck( long deliveryTag, boolean multiple ) throws IOException { + System.out.println( "Confirm receipt" ); + } + + public void handleNack( long deliveryTag, boolean multiple ) throws IOException { + System.out.println( "Message lost" ); + } + +} diff --git a/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/ProducerWithTx.java b/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/ProducerWithTx.java new file mode 100644 index 0000000..eeff6d6 --- /dev/null +++ b/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/ProducerWithTx.java @@ -0,0 +1,50 @@ +/** + * RabbitMQ in Action - Chapter 2 Examples + * + * @author Alvaro Videla (original) + * @author Jason J. W. Williams (original) + * @author Simon Fraser, Siniatech Ltd (translation) + */ +package rabbitmqinaction.sourcecode.chapter2; + +import static rabbitmqinaction.sourcecode.GenericConfiguration.*; +import static rabbitmqinaction.sourcecode.chapter2.Chapter2Configuration.*; + +import java.io.IOException; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.AMQP.BasicProperties.Builder; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +public class ProducerWithTx { + + public static void main( String[] args ) throws IOException, InterruptedException { + if ( args.length != 1 ) { + System.err.println( "Message body must be supplied" ); + System.exit( 1 ); + } + ConnectionFactory factory = new ConnectionFactory(); + factory.setUsername( USERNAME ); + factory.setPassword( PASSWORD ); + factory.setHost( HOST ); + + Connection connection = factory.newConnection(); + + String msg = args[0]; + Builder msgPropertiesBuilder = new AMQP.BasicProperties.Builder(); + msgPropertiesBuilder.contentType( PLAIN_CONTENT_TYPE ); + BasicProperties msgProperties = msgPropertiesBuilder.build(); + + Channel channel = connection.createChannel(); + channel.txSelect(); + channel.basicPublish( EXCHANGE, ROUTING_KEY, msgProperties, msg.getBytes() ); + channel.txCommit(); + + channel.close(); + connection.close(); + } + +}