diff --git a/.gitignore b/.gitignore index 28269e5..ebc0627 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,11 @@ bin +target obj +.project +.cache +.settings +.classpath *.suo *.csproj.user *.class -*.jar \ No newline at end of file +*.jar diff --git a/java/appendix-a/Server.java b/java/appendix-a/Server.java deleted file mode 100644 index 9895f21..0000000 --- a/java/appendix-a/Server.java +++ /dev/null @@ -1,102 +0,0 @@ -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.QueueingConsumer; -import com.rabbitmq.client.QueueingConsumer.Delivery; -import com.rabbitmq.client.AMQP.BasicProperties; -import org.json.JSONObject; - -public class Server -{ - private Connection connection; - private Channel channel; - private QueueingConsumer consumer; - - public Server Server(){ - return this; - } - - public Server init() - throws Exception { - ConnectionFactory factory = new ConnectionFactory(); - factory.setUsername("rpc_user"); - factory.setPassword("rpcme"); - connection = factory.newConnection(); - channel = connection.createChannel(); - - channel.exchangeDeclare("rpc", "direct"); - channel.queueDeclare("ping", false, false, false, null); - channel.queueBind("ping", "rpc", "ping"); - - consumer = new QueueingConsumer(channel); - channel.basicConsume("ping", false, "ping", consumer); - - System.out.println( - "Waiting for RPC calls..." - ); - - return this; - } - - public void closeConnection() { - if (connection != null) { - try { - connection.close(); - } - catch (Exception ignore) {} - } - } - - public void serveRequests() { - while (true) { - try { - - Delivery delivery = consumer.nextDelivery(); - BasicProperties props = delivery.getProperties(); - - channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); - System.out.println( - "Received API call...replying..." - ); - - channel.basicPublish( - "", - props.getReplyTo(), - null, - getResponse(delivery).getBytes("UTF-8") - ); - - } catch (Exception e){ - System.out.println(e.toString()); - } - } - } - - private String getResponse(Delivery delivery) { - String response = null; - try { - String message = new String(delivery.getBody(), "UTF-8"); - JSONObject jsonobject = new JSONObject(message); - response = "Pong!" + jsonobject.getString("time"); - } - catch (Exception e){ - System.out.println(e.toString()); - response = ""; - } - return response; - } - - public static void main(String[] args) { - Server server = null; - try { - server = new Server(); - server.init().serveRequests(); - } catch(Exception e) { - e.printStackTrace(); - } finally { - if(server != null) { - server.closeConnection(); - } - } - } -} \ No newline at end of file diff --git a/java/pom.xml b/java/pom.xml new file mode 100644 index 0000000..6fc2544 --- /dev/null +++ b/java/pom.xml @@ -0,0 +1,35 @@ + + 4.0.0 + + rabbitmqinaction + sourcecode-java + 0.0.1-SNAPSHOT + jar + + sourcecode-java + http://maven.apache.org + + + UTF-8 + + + + + junit + junit + 3.8.1 + test + + + com.rabbitmq + amqp-client + 2.8.4 + + + org.json + json + 20090211 + + + diff --git a/java/src/main/java/rabbitmqinaction/sourcecode/GenericConfiguration.java b/java/src/main/java/rabbitmqinaction/sourcecode/GenericConfiguration.java new file mode 100644 index 0000000..f0e6dc7 --- /dev/null +++ b/java/src/main/java/rabbitmqinaction/sourcecode/GenericConfiguration.java @@ -0,0 +1,24 @@ +/** + * RabbitMQ in Action - Generic constants + * + * @author Simon Fraser, Siniatech Ltd + */ +package rabbitmqinaction.sourcecode; + +import java.util.HashMap; +import java.util.Map; + +public class GenericConfiguration { + + public static final boolean ACTIVE = false; + public static final boolean PASSIVE = false; + public static final boolean DURABLE = true; + public static final boolean NON_DURABLE = false; + public static final boolean AUTO_DELETE = true; + public static final boolean NON_AUTO_DELETE = false; + public static final String DIRECT_EXCHANGE_TYPE = "direct"; + public static final String HOST = "localhost"; + public static final String PLAIN_CONTENT_TYPE = "text/plain"; + public static final Map EMPTY_MAP = new HashMap(); + +} diff --git a/java/appendix-a/Client.java b/java/src/main/java/rabbitmqinaction/sourcecode/appendixA/Client.java similarity index 98% rename from java/appendix-a/Client.java rename to java/src/main/java/rabbitmqinaction/sourcecode/appendixA/Client.java index af1594c..15a66d2 100644 --- a/java/appendix-a/Client.java +++ b/java/src/main/java/rabbitmqinaction/sourcecode/appendixA/Client.java @@ -1,3 +1,5 @@ +package rabbitmqinaction.sourcecode.appendixA; + import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; diff --git a/java/src/main/java/rabbitmqinaction/sourcecode/appendixA/Server.java b/java/src/main/java/rabbitmqinaction/sourcecode/appendixA/Server.java new file mode 100644 index 0000000..cd2db73 --- /dev/null +++ b/java/src/main/java/rabbitmqinaction/sourcecode/appendixA/Server.java @@ -0,0 +1,88 @@ +package rabbitmqinaction.sourcecode.appendixA; + +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.QueueingConsumer; +import com.rabbitmq.client.QueueingConsumer.Delivery; +import com.rabbitmq.client.AMQP.BasicProperties; +import org.json.JSONObject; + +public class Server { + private Connection connection; + private Channel channel; + private QueueingConsumer consumer; + + public Server init() throws Exception { + ConnectionFactory factory = new ConnectionFactory(); + factory.setUsername( "rpc_user" ); + factory.setPassword( "rpcme" ); + connection = factory.newConnection(); + channel = connection.createChannel(); + + channel.exchangeDeclare( "rpc", "direct" ); + channel.queueDeclare( "ping", false, false, false, null ); + channel.queueBind( "ping", "rpc", "ping" ); + + consumer = new QueueingConsumer( channel ); + channel.basicConsume( "ping", false, "ping", consumer ); + + System.out.println( "Waiting for RPC calls..." ); + + return this; + } + + public void closeConnection() { + if ( connection != null ) { + try { + connection.close(); + } catch ( Exception ignore ) { + } + } + } + + public void serveRequests() { + while ( true ) { + try { + + Delivery delivery = consumer.nextDelivery(); + BasicProperties props = delivery.getProperties(); + + channel.basicAck( delivery.getEnvelope().getDeliveryTag(), false ); + System.out.println( "Received API call...replying..." ); + + channel.basicPublish( "", props.getReplyTo(), null, getResponse( delivery ).getBytes( "UTF-8" ) ); + + } catch ( Exception e ) { + System.out.println( e.toString() ); + } + } + } + + private String getResponse( Delivery delivery ) { + String response = null; + try { + String message = new String( delivery.getBody(), "UTF-8" ); + JSONObject jsonobject = new JSONObject( message ); + response = "Pong!" + jsonobject.getString( "time" ); + } catch ( Exception e ) { + System.out.println( e.toString() ); + response = ""; + } + return response; + } + + public static void main( String[] args ) { + Server server = null; + try { + server = new Server(); + server.init().serveRequests(); + } catch ( Exception e ) { + e.printStackTrace(); + } finally { + if ( server != null ) { + server.closeConnection(); + } + } + } +} \ No newline at end of file diff --git a/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/Chapter2Configuration.java b/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/Chapter2Configuration.java new file mode 100644 index 0000000..9b35dc5 --- /dev/null +++ b/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/Chapter2Configuration.java @@ -0,0 +1,19 @@ +/** + * RabbitMQ in Action - Chapter 2 Examples + * + * @author Alvaro Videla (original) + * @author Jason J. W. Williams (original) + * @author Simon Fraser, Siniatech Ltd (translation) + */ +package rabbitmqinaction.sourcecode.chapter2; + +public class Chapter2Configuration { + + public static final String ROUTING_KEY = "hola"; + public static final String USERNAME = "guest"; + public static final String PASSWORD = "guest"; + public static final String EXCHANGE = "hello-exchange"; + public static final String QUEUE_NAME = "hello-queue"; + public static final String CONSUMER_TAG = "hello-consumer"; + +} diff --git a/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/Consumer.java b/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/Consumer.java new file mode 100644 index 0000000..467e1e5 --- /dev/null +++ b/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/Consumer.java @@ -0,0 +1,61 @@ +/** + * RabbitMQ in Action - Chapter 2 Examples + * + * @author Alvaro Videla (original) + * @author Jason J. W. Williams (original) + * @author Simon Fraser, Siniatech Ltd (translation) + */ +package rabbitmqinaction.sourcecode.chapter2; + +import static rabbitmqinaction.sourcecode.GenericConfiguration.*; +import static rabbitmqinaction.sourcecode.chapter2.Chapter2Configuration.*; + +import java.io.IOException; +import java.util.HashMap; + +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; + +public class Consumer { + + public static void main( String[] args ) throws IOException { + ConnectionFactory factory = new ConnectionFactory(); + factory.setUsername( USERNAME ); + factory.setPassword( PASSWORD ); + factory.setHost( HOST ); + + Connection connection = factory.newConnection(); + + Channel channel = connection.createChannel(); + channel.exchangeDeclare( EXCHANGE, DIRECT_EXCHANGE_TYPE, ACTIVE, DURABLE, NON_AUTO_DELETE, new HashMap() ); + channel.queueDeclare( QUEUE_NAME, ACTIVE, DURABLE, NON_AUTO_DELETE, null ); + channel.queueBind( QUEUE_NAME, EXCHANGE, ROUTING_KEY, EMPTY_MAP ); + channel.basicConsume( QUEUE_NAME, false, CONSUMER_TAG, new ConsumerCallback( channel ) ); + } + +} + +class ConsumerCallback extends DefaultConsumer { + + public ConsumerCallback( Channel channel ) { + super( channel ); + } + + @Override + public void handleDelivery( String consumerTag, Envelope envelope, BasicProperties properties, byte[] body ) throws IOException { + String msg = new String( body ); + if ( "quit".equals( msg ) ) { + getChannel().basicCancel( consumerTag ); + getChannel().close(); + getChannel().getConnection().close(); + System.exit( 0 ); + } else { + System.out.println( msg ); + } + } + +} diff --git a/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/ConsumerWithConfirms.java b/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/ConsumerWithConfirms.java new file mode 100644 index 0000000..a50ac9d --- /dev/null +++ b/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/ConsumerWithConfirms.java @@ -0,0 +1,62 @@ +/** + * RabbitMQ in Action - Chapter 2 Examples + * + * @author Alvaro Videla (original) + * @author Jason J. W. Williams (original) + * @author Simon Fraser, Siniatech Ltd (translation) + */ +package rabbitmqinaction.sourcecode.chapter2; + +import static rabbitmqinaction.sourcecode.GenericConfiguration.*; +import static rabbitmqinaction.sourcecode.chapter2.Chapter2Configuration.*; + +import java.io.IOException; +import java.util.HashMap; + +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; + +public class ConsumerWithConfirms { + + public static void main( String[] args ) throws IOException { + ConnectionFactory factory = new ConnectionFactory(); + factory.setUsername( USERNAME ); + factory.setPassword( PASSWORD ); + factory.setHost( HOST ); + + Connection connection = factory.newConnection(); + + Channel channel = connection.createChannel(); + channel.exchangeDeclare( EXCHANGE, DIRECT_EXCHANGE_TYPE, ACTIVE, DURABLE, NON_AUTO_DELETE, new HashMap() ); + channel.queueDeclare( QUEUE_NAME, ACTIVE, DURABLE, NON_AUTO_DELETE, null ); + channel.queueBind( QUEUE_NAME, EXCHANGE, ROUTING_KEY, EMPTY_MAP ); + channel.basicConsume( QUEUE_NAME, false, CONSUMER_TAG, new ConsumerCallbackWithConfirms( channel ) ); + } + +} + +class ConsumerCallbackWithConfirms extends DefaultConsumer { + + public ConsumerCallbackWithConfirms( Channel channel ) { + super( channel ); + } + + @Override + public void handleDelivery( String consumerTag, Envelope envelope, BasicProperties properties, byte[] body ) throws IOException { + getChannel().basicAck( envelope.getDeliveryTag(), false ); + String msg = new String( body ); + if ( "quit".equals( msg ) ) { + getChannel().basicCancel( consumerTag ); + getChannel().close(); + getChannel().getConnection().close(); + System.exit( 0 ); + } else { + System.out.println( msg ); + } + } + +} diff --git a/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/Producer.java b/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/Producer.java new file mode 100644 index 0000000..b9f0787 --- /dev/null +++ b/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/Producer.java @@ -0,0 +1,49 @@ +/** + * RabbitMQ in Action - Chapter 2 Examples + * + * @author Alvaro Videla (original) + * @author Jason J. W. Williams (original) + * @author Simon Fraser, Siniatech Ltd (translation) + */ +package rabbitmqinaction.sourcecode.chapter2; + +import static rabbitmqinaction.sourcecode.GenericConfiguration.*; +import static rabbitmqinaction.sourcecode.chapter2.Chapter2Configuration.*; + +import java.io.IOException; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.AMQP.BasicProperties.Builder; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +public class Producer { + + public static void main( String[] args ) throws IOException, InterruptedException { + if ( args.length != 1 ) { + System.err.println( "Message body must be supplied" ); + System.exit( 1 ); + } + ConnectionFactory factory = new ConnectionFactory(); + factory.setUsername( USERNAME ); + factory.setPassword( PASSWORD ); + factory.setHost( HOST ); + + Connection connection = factory.newConnection(); + + String msg = args[0]; + Builder msgPropertiesBuilder = new AMQP.BasicProperties.Builder(); + msgPropertiesBuilder.contentType( PLAIN_CONTENT_TYPE ); + BasicProperties msgProperties = msgPropertiesBuilder.build(); + + Channel channel = connection.createChannel(); + channel.exchangeDeclare( EXCHANGE, DIRECT_EXCHANGE_TYPE, PASSIVE, DURABLE, NON_AUTO_DELETE, EMPTY_MAP ); + channel.basicPublish( EXCHANGE, ROUTING_KEY, msgProperties, msg.getBytes() ); + + channel.close(); + connection.close(); + } + +} diff --git a/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/ProducerWithConfirms.java b/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/ProducerWithConfirms.java new file mode 100644 index 0000000..5d49b9c --- /dev/null +++ b/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/ProducerWithConfirms.java @@ -0,0 +1,64 @@ +/** + * RabbitMQ in Action - Chapter 2 Examples + * + * @author Alvaro Videla (original) + * @author Jason J. W. Williams (original) + * @author Simon Fraser, Siniatech Ltd (translation) + */ +package rabbitmqinaction.sourcecode.chapter2; + +import static rabbitmqinaction.sourcecode.GenericConfiguration.*; +import static rabbitmqinaction.sourcecode.chapter2.Chapter2Configuration.*; + +import java.io.IOException; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.AMQP.BasicProperties.Builder; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.ConfirmListener; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +public class ProducerWithConfirms { + + public static void main( String[] args ) throws IOException, InterruptedException { + if ( args.length != 1 ) { + System.err.println( "Message body must be supplied" ); + System.exit( 1 ); + } + ConnectionFactory factory = new ConnectionFactory(); + factory.setUsername( USERNAME ); + factory.setPassword( PASSWORD ); + factory.setHost( HOST ); + + Connection connection = factory.newConnection(); + + String msg = args[0]; + Builder msgPropertiesBuilder = new AMQP.BasicProperties.Builder(); + msgPropertiesBuilder.contentType( PLAIN_CONTENT_TYPE ); + BasicProperties msgProperties = msgPropertiesBuilder.build(); + + Channel channel = connection.createChannel(); + channel.confirmSelect(); + channel.addConfirmListener( new ConfirmHandler() ); + channel.basicPublish( EXCHANGE, ROUTING_KEY, msgProperties, msg.getBytes() ); + channel.waitForConfirmsOrDie(); + + channel.close(); + connection.close(); + } + +} + +class ConfirmHandler implements ConfirmListener { + + public void handleAck( long deliveryTag, boolean multiple ) throws IOException { + System.out.println( "Confirm receipt" ); + } + + public void handleNack( long deliveryTag, boolean multiple ) throws IOException { + System.out.println( "Message lost" ); + } + +} diff --git a/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/ProducerWithTx.java b/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/ProducerWithTx.java new file mode 100644 index 0000000..eeff6d6 --- /dev/null +++ b/java/src/main/java/rabbitmqinaction/sourcecode/chapter2/ProducerWithTx.java @@ -0,0 +1,50 @@ +/** + * RabbitMQ in Action - Chapter 2 Examples + * + * @author Alvaro Videla (original) + * @author Jason J. W. Williams (original) + * @author Simon Fraser, Siniatech Ltd (translation) + */ +package rabbitmqinaction.sourcecode.chapter2; + +import static rabbitmqinaction.sourcecode.GenericConfiguration.*; +import static rabbitmqinaction.sourcecode.chapter2.Chapter2Configuration.*; + +import java.io.IOException; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.AMQP.BasicProperties.Builder; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +public class ProducerWithTx { + + public static void main( String[] args ) throws IOException, InterruptedException { + if ( args.length != 1 ) { + System.err.println( "Message body must be supplied" ); + System.exit( 1 ); + } + ConnectionFactory factory = new ConnectionFactory(); + factory.setUsername( USERNAME ); + factory.setPassword( PASSWORD ); + factory.setHost( HOST ); + + Connection connection = factory.newConnection(); + + String msg = args[0]; + Builder msgPropertiesBuilder = new AMQP.BasicProperties.Builder(); + msgPropertiesBuilder.contentType( PLAIN_CONTENT_TYPE ); + BasicProperties msgProperties = msgPropertiesBuilder.build(); + + Channel channel = connection.createChannel(); + channel.txSelect(); + channel.basicPublish( EXCHANGE, ROUTING_KEY, msgProperties, msg.getBytes() ); + channel.txCommit(); + + channel.close(); + connection.close(); + } + +}