From a8e449ab998b1d4c7caa21d39e3feaecc51b3549 Mon Sep 17 00:00:00 2001 From: Ainul Habib Date: Mon, 29 Jan 2018 00:10:39 +0000 Subject: [PATCH 1/6] updaed code to add examples --- build.sbt | 5 + .../rabbitconsumer/ConnectionService.scala | 45 +++++++- .../ppb/rabbitconsumer/RabbitConnection.scala | 101 +++++++++++++++++- .../ppb/rabbitconsumer/RabbitConsumer.scala | 58 +++++++--- .../examples/RabbitConsumerApp.scala | 46 ++++++++ .../examples/RabbitSubscribeApp.scala | 61 +++++++++++ .../rabbitconsumer/RabbitConsumerSpec.scala | 4 +- 7 files changed, 296 insertions(+), 24 deletions(-) create mode 100644 src/main/scala/com/ppb/rabbitconsumer/examples/RabbitConsumerApp.scala create mode 100644 src/main/scala/com/ppb/rabbitconsumer/examples/RabbitSubscribeApp.scala diff --git a/build.sbt b/build.sbt index 5270d5d..373093a 100644 --- a/build.sbt +++ b/build.sbt @@ -55,6 +55,11 @@ val mockito = Seq ( "org.mockito" % "mockito-core" % mockitoV % "test" ) + +publishTo := Some(Resolver.file("file", new File(System.getenv("M2_REPO")))) + + + libraryDependencies ++= logging ++ scalacheck ++ scalatest ++ amqpClient ++ scalaz ++ argonaut ++ typesafeConfig ++ mockito lazy val root = (project in file(".")) diff --git a/src/main/scala/com/ppb/rabbitconsumer/ConnectionService.scala b/src/main/scala/com/ppb/rabbitconsumer/ConnectionService.scala index a6d4dcc..054d944 100644 --- a/src/main/scala/com/ppb/rabbitconsumer/ConnectionService.scala +++ b/src/main/scala/com/ppb/rabbitconsumer/ConnectionService.scala @@ -1,16 +1,22 @@ package com.ppb.rabbitconsumer -import com.rabbitmq.client.ConnectionFactory -import com.typesafe.config.Config +import java.io.IOException +import com.rabbitmq.client.{AMQP, ConnectionFactory, Envelope} +import com.typesafe.config.Config import argonaut._ import com.ppb.rabbitconsumer.ConfigService.{getFilename, readExchange, readQueue, readRoutingKey} import com.ppb.rabbitconsumer.RabbitConnection._ import org.slf4j.LoggerFactory + sealed trait RabbitResponse extends Product with Serializable case object NoMoreMessages extends RabbitResponse -case class RabbitMessage(payload: Json) extends RabbitResponse +case class RabbitJsonMessage(payload: Json) extends RabbitResponse +case class RabbitPlainMessage(plainPayload: String) extends RabbitResponse +case class RabbitMessage(rabbitResponse: RabbitResponse, map: java.util.Map[String, AnyRef]) extends RabbitResponse +case class RabbitException(throwable:Throwable) extends RabbitResponse + object ConnectionService { @@ -51,8 +57,41 @@ object ConnectionService { bindQueueToExchange(queueName, exchangeName, routingKey) Cxn(getFilename(config), () => RabbitConnection.nextPayload(queueName), () => RabbitConnection.disconnect) + } + + + def newInit(config: Config, ifCreateQueue:Boolean = false): Cxn = { + implicit val rabbitConnnection: RabbitConnection = rabbitConnection(config) + + val queueName = readQueue(config) + val exchangeName = readExchange(config) + val routingKey = readRoutingKey(config) + if (ifCreateQueue) { + createQueue(queueName) + } + bindQueueToExchange(queueName, exchangeName, routingKey) + + Cxn("", () => RabbitConnection.newNextPayload(queueName), () => RabbitConnection.disconnect) + } + + + def subscribe(config: Config): Sxn = { + implicit val rabbitConnnection: RabbitConnection = rabbitConnection(config) + + val queueName = readQueue(config) + val exchangeName = readExchange(config) + val routingKey = readRoutingKey(config) + + bindQueueToExchange(queueName, exchangeName, routingKey) + + Sxn(RabbitConnection.registerListener(queueName), ()=>RabbitConnection.disconnect) + } + + + + def done(config: Config): Unit = { val queueName = readQueue(config) logger.info(s"Deleting $queueName") diff --git a/src/main/scala/com/ppb/rabbitconsumer/RabbitConnection.scala b/src/main/scala/com/ppb/rabbitconsumer/RabbitConnection.scala index b29732f..8fb2d27 100644 --- a/src/main/scala/com/ppb/rabbitconsumer/RabbitConnection.scala +++ b/src/main/scala/com/ppb/rabbitconsumer/RabbitConnection.scala @@ -1,12 +1,64 @@ package com.ppb.rabbitconsumer import argonaut.Json -import com.rabbitmq.client.{Channel, Connection} +import com.rabbitmq.client._ import scala.collection.JavaConverters._ import scala.util.{Failure, Success, Try} import argonaut._ import Argonaut._ +import java.io.IOException + +import scala.concurrent.Future +import scala.concurrent.ExecutionContext.Implicits.global + + +sealed trait MessageParser { def msgParser(load:Array[Byte]):Try[RabbitResponse] } + +case object PlainMessageParser extends MessageParser { + override def msgParser(load: Array[Byte]): Try[RabbitResponse] = { + new String(load, "UTF-8") match { + case null => Failure(new Exception("Null Payload")) + case (text) => Success(RabbitPlainMessage(text)) + } + } +} + +case object JSONMessageParser extends MessageParser { + override def msgParser(load: Array[Byte]): Try[RabbitResponse] = { + new String(load, "UTF-8").parse match { + case Right(json) => Success(RabbitJsonMessage(json)) + case Left(error) => Failure(new IllegalStateException(error)) + } + } +} + + + + +//trait QueueSubscriber { +// def subscribeNow(quene: String, consumer: (Array[Byte]) => Unit): Unit; +//} + +// +//trait QueueSubscriber extends Consumer { +// def messageReceived(consumerTag:String, envelop:Envelope, properties:AMQP.BasicProperties, payload:Array[Byte]): Unit; +//} +// +//case class DefaultQueueSubscriber( messageProcessor: (Array[Byte]) => Unit ) extends QueueSubscriber { +//// def messageReceived(consumerTag: String, envelop: Envelope, properties: AMQP.BasicProperties, payload: Array[Byte]): Unit = { +//// messageProcessor(payload); +//// } +// //import com.rabbitmq.client.AMQP +// +// +// @throws[IOException] +// def handleDelivery(consumerTag: String, envelop: Envelope, properties: AMQP.BasicProperties, payload: Array[Byte]): Unit = { +// messageProcessor(payload); +// } +// +//} + object RabbitConnection { def disconnect(implicit rabbitConnection: RabbitConnection): Try[Unit] = { @@ -32,7 +84,7 @@ object RabbitConnection { val response = for { message <- Try(rabbitConnection.nextMessage()) json <- asJson(message) - } yield RabbitMessage(json) + } yield RabbitJsonMessage(json) response match { case Success(msg) => msg @@ -40,6 +92,47 @@ object RabbitConnection { } } +// def subscribe(queueName: String, consumer:Consumer) (implicit rabbitConnection: RabbitConnection): Unit = { +// rabbitConnection.channel.basicConsume(queueName, false, consumer) +// } + + + def newNextPayload(queueName: String, messageParser: MessageParser = PlainMessageParser )(implicit rabbitConnection: RabbitConnection): RabbitResponse = { + + val response = for { + message <- Try(rabbitConnection.nextMessage()) + rabbitResponse <- messageParser.msgParser(message) + } yield rabbitResponse + + response match { + case Failure(th) => NoMoreMessages + case Success(rabbitResp) => rabbitResp + } + } + + + // Function takes queueName, messageParser and returns a function which takes in a callback and return nothing. + // Return function will be invoked by the consumer registering a callbackFunction + // when message is received 'handleDelivery' is invoked which starts a future which will parse message and pass it callback function on complete + def registerListener(queueName:String, messageParser: MessageParser = PlainMessageParser)(implicit rabbitConnection: RabbitConnection): ((Try[RabbitResponse] => Unit) => String) = { + + val definedConsumer: (Try[RabbitResponse] => Unit) => String = (callbackFun) => { + rabbitConnection.channel.basicConsume(queueName, false, new DefaultConsumer(rabbitConnection.channel) { + + @throws(classOf[IOException]) + override def handleDelivery(consumerTag: String, envelope: Envelope, properties: AMQP.BasicProperties, body: Array[Byte]): Unit = { + Future { + messageParser.msgParser(body) match { + case Failure(th) => RabbitException(th) + case Success(rabbitResponse) => RabbitMessage(rabbitResponse, properties.getHeaders) + } + }.onComplete(callbackFun) + } // end handle + }) // end consume + } + definedConsumer + } + def asJson(payload: Array[Byte]): Try[Json] = new String(payload, "UTF-8").parse match { case Right(json) => Success(json) @@ -47,6 +140,4 @@ object RabbitConnection { } } -case class RabbitConnection(connection: Connection, channel: Channel, nextMessage: () => Array[Byte]) - - +case class RabbitConnection(connection: Connection, channel: Channel, nextMessage: () => Array[Byte]) \ No newline at end of file diff --git a/src/main/scala/com/ppb/rabbitconsumer/RabbitConsumer.scala b/src/main/scala/com/ppb/rabbitconsumer/RabbitConsumer.scala index 4f758e5..4f4cb3a 100644 --- a/src/main/scala/com/ppb/rabbitconsumer/RabbitConsumer.scala +++ b/src/main/scala/com/ppb/rabbitconsumer/RabbitConsumer.scala @@ -1,6 +1,7 @@ package com.ppb.rabbitconsumer import argonaut._ +import com.rabbitmq.client.Consumer import scala.collection.JavaConverters._ import com.typesafe.config.{Config, ConfigFactory} @@ -11,6 +12,9 @@ import scalaz.concurrent.Task import scalaz.stream._ case class Cxn(filename: String, nextMessage: () => RabbitResponse, disconnect: () => Try[Unit]) +case class Sxn(subscription:((Try[RabbitResponse] => Unit) => String), disconnect: () => Try[Unit]) + + case class Configurations(name: String, configs: List[Config]) @@ -32,34 +36,60 @@ object RabbitConsumer { } val getMessagesPerConnection: Cxn => Process[Task, Unit] = cxn => - getMessages(cxn.nextMessage).toSource pipe text.utf8Encode to io.fileChunkW(cxn.filename) + getMessagesWithPreamble(cxn.nextMessage).toSource pipe text.utf8Encode to io.fileChunkW(cxn.filename) val read: (String) => Unit = getConfigs _ andThen consumeMessages(ConnectionService.init, getMessagesPerConnection) - def consumeMessages(getCxn: Config => Cxn, getMessages: Cxn => Process[Task, Unit])(c: Configurations): Unit = { + def consumeMessages(getCxn: Config => Cxn, fxMessages: Cxn => Process[Task, Unit])(c: Configurations): Unit = { c.configs.map(getCxn) foreach { cxn => { - getMessages(cxn).run.run + fxMessages(cxn).run.run cxn.disconnect() } } logger.info(s"Done receiving ${c.name} messages") - logger.info(s"""When you're done testing, run "R.done("${c.name}") to delete the following Rabbit queues:""") - c.configs.foreach { config => - logger.info(s"- ${config.getString("queue")}") - } + //logger.info(s"""When you're done testing, run "R.done("${c.name}") to delete the following Rabbit queues:""") + //c.configs.foreach { config => + // logger.info(s"- ${config.getString("queue")}") + // } + } + + def subscribeMessages(getSxn: Config => Sxn, callbackFunction:(Try[RabbitResponse]=>Unit))(c: Configurations): Unit = { + c.configs.map(getSxn) foreach { sxn => { + logger.info("Subscribing , please wait .....") + sxn.subscription(callbackFunction) + } + } } - private def getMessages(nextMessage: () => RabbitResponse): Process0[String] = - Process(jsonPreamble) ++ - (receiveAll(nextMessage) map (_.spaces2) intersperse ",") ++ - Process(jsonPostamble) - def receiveAll(nextMessage: () => RabbitResponse): Process0[Json] = + def getMessagesWithPreamble(nextMessage: () => RabbitResponse): Process0[String] = + Process(jsonPreamble) ++ getMessages(nextMessage) ++ Process(jsonPostamble) + + + def getMessages(nextMessage: () => RabbitResponse): Process0[String] = + (receiveAllAny(nextMessage) match { + case ss:Process0[String] => ss intersperse("\n") + case kk:Process0[Json] => kk map(_.spaces2) intersperse "," + }) + +// +// Process(jsonPreamble) ++ +// (receiveAll(nextMessage) map (_.spaces2) intersperse ",") ++ +// Process(jsonPostamble) + +// def receiveAll(nextMessage: () => RabbitResponse): Process0[Json] = +// nextMessage() match { +// case RabbitMessage(json) => Process.emit(json) ++ receiveAll(nextMessage) +// case NoMoreMessages => Process.halt +// } + + def receiveAllAny(nextMessage: () => RabbitResponse): Process0[Any] = nextMessage() match { - case RabbitMessage(json) => Process.emit(json) ++ receiveAll(nextMessage) - case NoMoreMessages => Process.halt + case RabbitJsonMessage(json) => Process.emit(json) ++ receiveAllAny(nextMessage) + case NoMoreMessages => Process.halt + case RabbitPlainMessage(plainPayload) => Process.emit(plainPayload) ++ receiveAllAny(nextMessage) } } diff --git a/src/main/scala/com/ppb/rabbitconsumer/examples/RabbitConsumerApp.scala b/src/main/scala/com/ppb/rabbitconsumer/examples/RabbitConsumerApp.scala new file mode 100644 index 0000000..cd9075a --- /dev/null +++ b/src/main/scala/com/ppb/rabbitconsumer/examples/RabbitConsumerApp.scala @@ -0,0 +1,46 @@ +package com.ppb.rabbitconsumer.examples + +import java.io.File + +import com.ppb.rabbitconsumer.{Configurations, ConnectionService, Cxn, RabbitConsumer} +import com.typesafe.config.{Config, ConfigFactory} + +import scala.collection.JavaConverters._ +import scalaz.concurrent.Task +import scalaz.stream._ + +object RabbitConsumerApp extends App { + + + def getConfigsFromFile(configFile: File): Configurations = + Configurations(configFile.getName, ConfigFactory.parseFile(configFile).getConfigList("amqp.connections").asScala.toList) + + val cxxn: (Config) => Cxn = (config) => + ConnectionService.newInit(config) + + val processMessages: Cxn => Process[Task, Unit] = cxn => + RabbitConsumer.getMessages(cxn.nextMessage).toSource pipe text.utf8Encode to io.chunkW(System.out) + + + val consume:(File) => Unit = getConfigsFromFile _ andThen RabbitConsumer.consumeMessages(cxxn, processMessages) + + val exitUsage:(String) => Unit = (message ) => { + println(s"${message} \n\t valid command line arguments are \n") + System.exit(1); + } + + + ///////// execution starts ////////////// + + println( "\n\n\t Rabbit Consumer App") + println( "\n\n\t Copyright (c) \n") + println( s"\t Version := ${System.getProperty("prog.version")} , revision := ${System.getProperty("prog.revision")}\n ") + + + if (args.length == 0) exitUsage("") + else { + consume(new File(args(0))) + } + println(" All done , exiting") + +} diff --git a/src/main/scala/com/ppb/rabbitconsumer/examples/RabbitSubscribeApp.scala b/src/main/scala/com/ppb/rabbitconsumer/examples/RabbitSubscribeApp.scala new file mode 100644 index 0000000..e7889d6 --- /dev/null +++ b/src/main/scala/com/ppb/rabbitconsumer/examples/RabbitSubscribeApp.scala @@ -0,0 +1,61 @@ +package com.ppb.rabbitconsumer.examples + +import java.io.File + +import com.ppb.rabbitconsumer._ +import com.typesafe.config.ConfigFactory + +import scala.collection.JavaConverters._ +import scala.util.Try + +object RabbitSubscribeApp extends App { + + + def getConfigsFromFile(configFile: File): Configurations = + Configurations(configFile.getName, ConfigFactory.parseFile(configFile).getConfigList("amqp.connections").asScala.toList) + + + // the callback function which will take a rabbit response and print result + // result is either RabbitMessage or Exception + val subscritpionFun: (Try[RabbitResponse]) => Unit = (response) =>{ + println(" \n receiving response \n--------------------------------------------------------") + response.getOrElse(NoMoreMessages) match { + case RabbitMessage(rabbitResponse, header) => { + println("Header = "+header) + rabbitResponse match { + case RabbitPlainMessage(message) => println(message) + case RabbitJsonMessage(json) => print(json.spaces2) + } + } + case RabbitException(th)=> th.printStackTrace() + } +} + + + + + val subscribe:(File) => Unit = getConfigsFromFile _ andThen RabbitConsumer.subscribeMessages(ConnectionService.subscribe , subscritpionFun) + + + val exitUsage:(String) => Unit = (message ) => { + println(s"${message} \n\t valid command line arguments are \n") + System.exit(0); + } + + + ///////// execution starts ////////////// + + println( "\n\n\t Rabbit Consumer App") + println( "\n\n\t Copyright (c) \n") + println( s"\t Version := ${System.getProperty("prog.version")} , revision := ${System.getProperty("prog.revision")}\n ") + + + if (args.length == 0) { + exitUsage("") + } + else { + subscribe(new File(args(0))) + } + // println(" All done , exiting") + +} diff --git a/src/test/scala/com/ppb/rabbitconsumer/RabbitConsumerSpec.scala b/src/test/scala/com/ppb/rabbitconsumer/RabbitConsumerSpec.scala index 3a71635..5cea206 100644 --- a/src/test/scala/com/ppb/rabbitconsumer/RabbitConsumerSpec.scala +++ b/src/test/scala/com/ppb/rabbitconsumer/RabbitConsumerSpec.scala @@ -21,7 +21,7 @@ class RabbitConsumerSpec extends FlatSpec with Matchers with MockitoSugar { behavior of "RabbitConsumer" it should "receive all messages" in new RabbitConsumerFixture { - val message = RabbitConsumer.receiveAll(receiveOneMessage).toSource.runLog.run + val message = RabbitConsumer.receiveAllAny(receiveOneMessage).toSource.runLog.run message should have size 1 } @@ -62,7 +62,7 @@ trait RabbitConsumerFixture { val receiveOneMessage: () => RabbitResponse = () => if (times == 0) { times = 1 - RabbitMessage("".asJson) + RabbitJsonMessage("".asJson) } else { NoMoreMessages } From c7d17605ccf5229cb24cfa573145803ac02ec24d Mon Sep 17 00:00:00 2001 From: Ainul Habib Date: Mon, 29 Jan 2018 08:40:40 +0000 Subject: [PATCH 2/6] fixed breaking Unit test --- src/it/resources/local.conf | 7 ++++-- .../rabbitconsumer/RabbitConsumerITSpec.scala | 22 +++++++++++++------ src/test/resources/local.conf | 4 ++-- .../rabbitconsumer/RabbitConnectionSpec.scala | 2 +- 4 files changed, 23 insertions(+), 12 deletions(-) diff --git a/src/it/resources/local.conf b/src/it/resources/local.conf index e39cec9..aea77e8 100644 --- a/src/it/resources/local.conf +++ b/src/it/resources/local.conf @@ -9,7 +9,10 @@ amqp { exchangeName = "myExchange" queue = "dwimyQueue" routingKey = "" - fileName = "~/output1.json" + exchangeName = "exchange.dmUpstream" + routingKey = "detest" + queue = "dmTestQueue" + fileName = "target/outputIT1.json" }, { ip = "localhost" @@ -20,7 +23,7 @@ amqp { exchangeName = "myExchange" queue = "dwimyQueue" routingKey = "" - fileName = "~/output2.json" + fileName = "target/outputIT2.json" } ] } diff --git a/src/it/scala/com/ppb/rabbitconsumer/RabbitConsumerITSpec.scala b/src/it/scala/com/ppb/rabbitconsumer/RabbitConsumerITSpec.scala index fe1ab69..9e8a37a 100644 --- a/src/it/scala/com/ppb/rabbitconsumer/RabbitConsumerITSpec.scala +++ b/src/it/scala/com/ppb/rabbitconsumer/RabbitConsumerITSpec.scala @@ -1,18 +1,26 @@ package com.ppb.rabbitconsumer -import argonaut._ -import Argonaut._ +import java.io.File + +import com.typesafe.config.ConfigFactory import org.scalatest.{FlatSpec, Matchers} -import org.slf4j.LoggerFactory -import scala.io.Source -import scala.util.{Failure, Success, Try} + import scala.collection.JavaConverters._ -import com.typesafe.config.{Config, ConfigFactory} + class RabbitConsumerITSpec extends FlatSpec with Matchers { it should "read docker local configuration files and process it" in { RabbitConsumer.local - } + + val configFile:File = new File("src/it/resources/local.conf") + val config:Configurations = Configurations(configFile.getName, ConfigFactory.parseFile(configFile).getConfigList("amqp.connections").asScala.toList) + config.configs.foreach( (x) => { + val file:File = new File(ConfigService.getFilename(x)) + file.exists() shouldBe true + }) + } + + } diff --git a/src/test/resources/local.conf b/src/test/resources/local.conf index 6afe7df..4a1c995 100644 --- a/src/test/resources/local.conf +++ b/src/test/resources/local.conf @@ -9,7 +9,7 @@ amqp { exchangeName = "myExchange" queue = "myQueue" routingKey = "myRoutingKey" - fileName = "~/output1.json" + fileName = "target/output1.json" }, { ip = "localhost" @@ -20,7 +20,7 @@ amqp { exchangeName = "myExchange" queue = "myQueue" routingKey = "someQueue" - fileName = "~/output2.json" + fileName = "target/output2.json" } ] } diff --git a/src/test/scala/com/ppb/rabbitconsumer/RabbitConnectionSpec.scala b/src/test/scala/com/ppb/rabbitconsumer/RabbitConnectionSpec.scala index 4fa4306..90de711 100644 --- a/src/test/scala/com/ppb/rabbitconsumer/RabbitConnectionSpec.scala +++ b/src/test/scala/com/ppb/rabbitconsumer/RabbitConnectionSpec.scala @@ -88,7 +88,7 @@ class RabbitConnectionSpec extends FlatSpec with Matchers with MockitoSugar { val rabbitConnection = RabbitConnection(connection, channel, nextMessage) val message: RabbitResponse = RabbitConnection.nextPayload("someQueue")(rabbitConnection) - message shouldBe a[RabbitMessage] + message shouldBe a[RabbitJsonMessage] } it should "return a NoMoreMessages object from the RabbitMq Broker when no payload is available" in { From 79619d8e43edbf58ca9c50eb646a4c8812daa487 Mon Sep 17 00:00:00 2001 From: Ainul Habib Date: Mon, 29 Jan 2018 10:29:36 +0000 Subject: [PATCH 3/6] commented out publish task because it was failing the build --- build.sbt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/build.sbt b/build.sbt index 373093a..6584295 100644 --- a/build.sbt +++ b/build.sbt @@ -55,8 +55,8 @@ val mockito = Seq ( "org.mockito" % "mockito-core" % mockitoV % "test" ) - -publishTo := Some(Resolver.file("file", new File(System.getenv("M2_REPO")))) +// profile to use for local build only +//publishTo := Some(Resolver.file("file", new File(System.getenv("M2_REPO")))) From 0738d495023148c08c9f9a16c4780eebebe85a5c Mon Sep 17 00:00:00 2001 From: Ainul Habib Date: Mon, 29 Jan 2018 10:49:18 +0000 Subject: [PATCH 4/6] fixed error in local.conf file --- src/it/resources/local.conf | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/it/resources/local.conf b/src/it/resources/local.conf index aea77e8..985d9d2 100644 --- a/src/it/resources/local.conf +++ b/src/it/resources/local.conf @@ -9,9 +9,7 @@ amqp { exchangeName = "myExchange" queue = "dwimyQueue" routingKey = "" - exchangeName = "exchange.dmUpstream" - routingKey = "detest" - queue = "dmTestQueue" + queue = "dmTestQueue" fileName = "target/outputIT1.json" }, { From 11f51f953071680b7aaae25d4eedc29e1e38894b Mon Sep 17 00:00:00 2001 From: Ainul Habib Date: Wed, 31 Jan 2018 08:21:23 +0000 Subject: [PATCH 5/6] Updated build.sbt to include publish to local repo --- build.sbt | 6 ++++-- .../scala/com/ppb/rabbitconsumer/RabbitConnection.scala | 8 ++++---- .../scala/com/ppb/rabbitconsumer/RabbitConsumer.scala | 9 +++++---- .../ppb/rabbitconsumer/examples/RabbitSubscribeApp.scala | 5 ++++- 4 files changed, 17 insertions(+), 11 deletions(-) diff --git a/build.sbt b/build.sbt index 6584295..7e824f1 100644 --- a/build.sbt +++ b/build.sbt @@ -4,6 +4,8 @@ import Tests._ name := "Rabbit Consumer" +organization := "com.paddypowerbetfair" + version := "0.1" scalaVersion := "2.12.3" @@ -55,8 +57,8 @@ val mockito = Seq ( "org.mockito" % "mockito-core" % mockitoV % "test" ) -// profile to use for local build only -//publishTo := Some(Resolver.file("file", new File(System.getenv("M2_REPO")))) +// to use for local build only +publishTo := Some(Resolver.file("file", new File(Resolver.mavenLocal.root))) diff --git a/src/main/scala/com/ppb/rabbitconsumer/RabbitConnection.scala b/src/main/scala/com/ppb/rabbitconsumer/RabbitConnection.scala index 8fb2d27..9faa963 100644 --- a/src/main/scala/com/ppb/rabbitconsumer/RabbitConnection.scala +++ b/src/main/scala/com/ppb/rabbitconsumer/RabbitConnection.scala @@ -17,10 +17,10 @@ sealed trait MessageParser { def msgParser(load:Array[Byte]):Try[RabbitResponse case object PlainMessageParser extends MessageParser { override def msgParser(load: Array[Byte]): Try[RabbitResponse] = { - new String(load, "UTF-8") match { - case null => Failure(new Exception("Null Payload")) - case (text) => Success(RabbitPlainMessage(text)) - } + Option(new String(load, "UTF-8")) match { + case None => Failure(new IllegalArgumentException("payload is null")) + case Some(text) => Success(RabbitPlainMessage(text)) + } } } diff --git a/src/main/scala/com/ppb/rabbitconsumer/RabbitConsumer.scala b/src/main/scala/com/ppb/rabbitconsumer/RabbitConsumer.scala index 4f4cb3a..03554a3 100644 --- a/src/main/scala/com/ppb/rabbitconsumer/RabbitConsumer.scala +++ b/src/main/scala/com/ppb/rabbitconsumer/RabbitConsumer.scala @@ -66,14 +66,14 @@ object RabbitConsumer { def getMessagesWithPreamble(nextMessage: () => RabbitResponse): Process0[String] = - Process(jsonPreamble) ++ getMessages(nextMessage) ++ Process(jsonPostamble) + Process(jsonPreamble) ++ getMessages(nextMessage) ++ Process(jsonPostamble) //getMessages(nextMessage) def getMessages(nextMessage: () => RabbitResponse): Process0[String] = - (receiveAllAny(nextMessage) match { + receiveAllAny(nextMessage) match { case ss:Process0[String] => ss intersperse("\n") case kk:Process0[Json] => kk map(_.spaces2) intersperse "," - }) + } // // Process(jsonPreamble) ++ @@ -89,7 +89,8 @@ object RabbitConsumer { def receiveAllAny(nextMessage: () => RabbitResponse): Process0[Any] = nextMessage() match { case RabbitJsonMessage(json) => Process.emit(json) ++ receiveAllAny(nextMessage) - case NoMoreMessages => Process.halt + // case NoMoreMessages => Process.halt case RabbitPlainMessage(plainPayload) => Process.emit(plainPayload) ++ receiveAllAny(nextMessage) + case _ => Process.halt } } diff --git a/src/main/scala/com/ppb/rabbitconsumer/examples/RabbitSubscribeApp.scala b/src/main/scala/com/ppb/rabbitconsumer/examples/RabbitSubscribeApp.scala index e7889d6..7fa7016 100644 --- a/src/main/scala/com/ppb/rabbitconsumer/examples/RabbitSubscribeApp.scala +++ b/src/main/scala/com/ppb/rabbitconsumer/examples/RabbitSubscribeApp.scala @@ -24,10 +24,13 @@ object RabbitSubscribeApp extends App { println("Header = "+header) rabbitResponse match { case RabbitPlainMessage(message) => println(message) - case RabbitJsonMessage(json) => print(json.spaces2) + case RabbitJsonMessage(json) => println(json.spaces2) + case _ => {} } } case RabbitException(th)=> th.printStackTrace() + case NoMoreMessages => {} + case _ => {} } } From f72c747efb00c9c53de643f1b74cf63185787e5c Mon Sep 17 00:00:00 2001 From: Ainul Habib Date: Mon, 5 Mar 2018 21:37:03 +0000 Subject: [PATCH 6/6] updated code to stream rabbit message to sink --- docker/resources/rabbitmq/definitions.json | 132 +++++++++++++----- src/it/resources/local.conf | 25 ++-- .../rabbitconsumer/RabbitConsumerITSpec.scala | 69 +++++++-- .../rabbitconsumer/ConnectionService.scala | 55 ++++---- .../ppb/rabbitconsumer/RabbitConnection.scala | 99 ++++++------- .../ppb/rabbitconsumer/RabbitConsumer.scala | 115 +++++++++++---- .../examples/RabbitConsumerApp.scala | 46 ------ .../examples/RabbitExampleApp.scala | 79 +++++++++++ .../examples/RabbitSubscribeApp.scala | 64 --------- .../rabbitconsumer/ConfigServiceSpec.scala | 1 + .../ppb/rabbitconsumer/OnReceiveSpec.scala | 67 +++++++++ .../rabbitconsumer/RabbitConnectionSpec.scala | 5 +- .../rabbitconsumer/RabbitConsumerSpec.scala | 10 +- 13 files changed, 472 insertions(+), 295 deletions(-) delete mode 100644 src/main/scala/com/ppb/rabbitconsumer/examples/RabbitConsumerApp.scala create mode 100644 src/main/scala/com/ppb/rabbitconsumer/examples/RabbitExampleApp.scala delete mode 100644 src/main/scala/com/ppb/rabbitconsumer/examples/RabbitSubscribeApp.scala create mode 100644 src/test/scala/com/ppb/rabbitconsumer/OnReceiveSpec.scala diff --git a/docker/resources/rabbitmq/definitions.json b/docker/resources/rabbitmq/definitions.json index 86ac8ea..09dcbf5 100644 --- a/docker/resources/rabbitmq/definitions.json +++ b/docker/resources/rabbitmq/definitions.json @@ -1,39 +1,97 @@ { - "rabbit_version": "3.6.9", - "users": [ - { - "name": "guest", - "password_hash": "FkYR4/EokTofabv5M6BrcganRjk/OjgbjT70+yIz0Ms05liE", - "hashing_algorithm": "rabbit_password_hashing_sha256", - "tags": "administrator" - } - ], - "vhosts": [ - { - "name": "/" - } - ], - "permissions": [ - { - "user": "guest", - "vhost": "/", - "configure": ".*", - "write": ".*", - "read": ".*" - } - ], - "parameters": [], - "policies": [], - "queues": [], - "exchanges": [ - { - "name": "myExchange", - "vhost": "/", - "type": "fanout", - "durable": true, - "auto_delete": false, - "internal": false, - "arguments": {} - } - ] + "rabbit_version":"3.7.2", + "users":[ + { + "name":"guest", + "password_hash":"FkYR4/EokTofabv5M6BrcganRjk/OjgbjT70+yIz0Ms05liE", + "hashing_algorithm":"rabbit_password_hashing_sha256", + "tags":"administrator" + } + ], + "vhosts":[ + { + "name":"/" + } + ], + "permissions":[ + { + "user":"guest", + "vhost":"/", + "configure":".*", + "write":".*", + "read":".*" + } + ], + "topic_permissions":[ + + ], + "parameters":[ + + ], + "global_parameters":[ + { + "name":"cluster_name", + "value":"rabbit@ppb-rabbit-server" + } + ], + "policies":[ + + ], + "queues":[ + { + "name":"myTestQueue", + "vhost":"/", + "durable":false, + "auto_delete":false, + "arguments":{ + + } + }, + + { + "name":"myItTestSecondQueue", + "vhost":"/", + "durable":false, + "auto_delete":false, + "arguments":{ + + } + } + ], + "exchanges":[ + { + "name":"myExchange", + "vhost":"/", + "type":"fanout", + "durable":false, + "auto_delete":false, + "internal":false, + "arguments":{ + + } + } + ], + "bindings":[ + { + "source":"myExchange", + "vhost":"/", + "destination":"myTestQueue", + "destination_type":"queue", + "routing_key":"myItTest", + "arguments":{ + + } + }, + + { + "source":"myExchange", + "vhost":"/", + "destination":"myItTestSecondQueue", + "destination_type":"queue", + "routing_key":"myItTestSecond", + "arguments":{ + + } + } + ] } diff --git a/src/it/resources/local.conf b/src/it/resources/local.conf index 985d9d2..799e51a 100644 --- a/src/it/resources/local.conf +++ b/src/it/resources/local.conf @@ -1,27 +1,32 @@ amqp { connections = [ { - ip = "localhost" + name = "uat" + ip = "127.0.0.1" port = 5672 user = "guest" password = "guest" useSSL = false + vhost = "/", exchangeName = "myExchange" - queue = "dwimyQueue" - routingKey = "" - queue = "dmTestQueue" - fileName = "target/outputIT1.json" - }, + routingKey = "myItTest" + queue = "myTestQueue" + fileName = "target/myTestQueue.log" + } { - ip = "localhost" + name = "uat" + ip = "127.0.0.1" port = 5672 user = "guest" password = "guest" useSSL = false + vhost = "/", exchangeName = "myExchange" - queue = "dwimyQueue" - routingKey = "" - fileName = "target/outputIT2.json" + routingKey = "myItTestSecond" + queue = "myItTestSecondQueue" + fileName = "target/myItTestSecondQueue.log" } ] } + + diff --git a/src/it/scala/com/ppb/rabbitconsumer/RabbitConsumerITSpec.scala b/src/it/scala/com/ppb/rabbitconsumer/RabbitConsumerITSpec.scala index 9e8a37a..794154b 100644 --- a/src/it/scala/com/ppb/rabbitconsumer/RabbitConsumerITSpec.scala +++ b/src/it/scala/com/ppb/rabbitconsumer/RabbitConsumerITSpec.scala @@ -1,26 +1,73 @@ package com.ppb.rabbitconsumer import java.io.File +import java.nio.file.{Files, Paths} +import java.util.UUID -import com.typesafe.config.ConfigFactory -import org.scalatest.{FlatSpec, Matchers} +import com.ppb.rabbitconsumer.RabbitConsumerAlgebra.Configurations +import com.typesafe.config.{Config, ConfigFactory} +import org.scalatest._ import scala.collection.JavaConverters._ +import argonaut._ +import Argonaut._ +import scala.io.Source +//import scalaz.stream.{Sink, _} -class RabbitConsumerITSpec extends FlatSpec with Matchers { +class RabbitConsumerITSpec extends FlatSpec { - it should "read docker local configuration files and process it" in { - RabbitConsumer.local + val configFile:String = "src/it/resources/local.conf" + val configurations:Configurations = Configurations(configFile, ConfigFactory.parseFile(new File(configFile)).getConfigList("amqp.connections").asScala.toList) - val configFile:File = new File("src/it/resources/local.conf") - val config:Configurations = Configurations(configFile.getName, ConfigFactory.parseFile(configFile).getConfigList("amqp.connections").asScala.toList) - config.configs.foreach( (x) => { - val file:File = new File(ConfigService.getFilename(x)) - file.exists() shouldBe true - }) + val setUp:Config => Unit = (config) => { + println("Inside setup "+ config) + val rabbitConn:RabbitConnection = ConnectionService.rabbitConnection(config) + val queueName = ConfigService.readQueue(config) + rabbitConn.channel.queuePurge(queueName) + println(" Publishing dummy to queue "+ queueName) + for (i <- 1 to 3) { + val map:Map[String, String] = Map("QueueName"->queueName, "message" -> UUID.randomUUID().toString, "ID"-> i.toString) + rabbitConn.channel.basicPublish("", queueName, null, map.asJson.toString().getBytes()) } + RabbitConsumer.readFromFile(configFile) + + println("closing connection") + rabbitConn.connection.close(1000) + Option(ConfigService.getFilename(config)) map( + outputfile => { Files.deleteIfExists(Paths.get(outputfile)) } + )getOrElse( println (" To Stdout") ) + + } + + val verifyOrFail:Config => Unit = (config) => { + Option(ConfigService.getFilename(config)) map( + outputfile => { + assert(Files.exists(Paths.get(outputfile))) + assert(Files.size(Paths.get(outputfile)) > 0) + println("============== "+ outputfile +"============") ; + scala.io.Source.fromFile(outputfile).getLines foreach println _ + } + )getOrElse( ) + } + + + + configurations.configs.map(setUp) + + it should "read stream message to local file it" in { + RabbitConsumer.readFromFile(configFile) + + configurations.configs.map(verifyOrFail) + +// configurations.configs.foreach( +// (x) => { +// println("============== "+x +"============") ; +// scala.io.Source.fromFile(ConfigService.getFilename(x)).getLines foreach println _ +// } +// ) + } } diff --git a/src/main/scala/com/ppb/rabbitconsumer/ConnectionService.scala b/src/main/scala/com/ppb/rabbitconsumer/ConnectionService.scala index 054d944..9a7a3f1 100644 --- a/src/main/scala/com/ppb/rabbitconsumer/ConnectionService.scala +++ b/src/main/scala/com/ppb/rabbitconsumer/ConnectionService.scala @@ -1,21 +1,17 @@ package com.ppb.rabbitconsumer -import java.io.IOException - -import com.rabbitmq.client.{AMQP, ConnectionFactory, Envelope} -import com.typesafe.config.Config -import argonaut._ import com.ppb.rabbitconsumer.ConfigService.{getFilename, readExchange, readQueue, readRoutingKey} import com.ppb.rabbitconsumer.RabbitConnection._ +import com.ppb.rabbitconsumer.RabbitConsumerAlgebra.{SingleResponseConnection, SubscriptionConnection} +import com.rabbitmq.client.ConnectionFactory +import com.typesafe.config.Config import org.slf4j.LoggerFactory +import scodec.bits.ByteVector +import scalaz.Sink +import scalaz.concurrent.Task +import scalaz.stream.{Sink, io, sink} -sealed trait RabbitResponse extends Product with Serializable -case object NoMoreMessages extends RabbitResponse -case class RabbitJsonMessage(payload: Json) extends RabbitResponse -case class RabbitPlainMessage(plainPayload: String) extends RabbitResponse -case class RabbitMessage(rabbitResponse: RabbitResponse, map: java.util.Map[String, AnyRef]) extends RabbitResponse -case class RabbitException(throwable:Throwable) extends RabbitResponse object ConnectionService { @@ -46,47 +42,48 @@ object ConnectionService { RabbitConnection(connection, channel, nextMessage) } - def init(config: Config): Cxn = { - implicit val rabbitConnnection: RabbitConnection = rabbitConnection(config) + def createConnection(config: Config) : SingleResponseConnection = { + implicit val rabbitConnnection: RabbitConnection = rabbitConnection(config) val queueName = readQueue(config) val exchangeName = readExchange(config) val routingKey = readRoutingKey(config) - - createQueue(queueName) bindQueueToExchange(queueName, exchangeName, routingKey) - Cxn(getFilename(config), () => RabbitConnection.nextPayload(queueName), () => RabbitConnection.disconnect) + val sink = Option(getFilename(config)) map { + fileNameDefined => io.fileChunkW(fileNameDefined) + } getOrElse io.stdOutBytes + SingleResponseConnection(() => RabbitConnection.readNextPayload(queueName), () => RabbitConnection.disconnect, sink) } - def newInit(config: Config, ifCreateQueue:Boolean = false): Cxn = { - implicit val rabbitConnnection: RabbitConnection = rabbitConnection(config) - val queueName = readQueue(config) - val exchangeName = readExchange(config) - val routingKey = readRoutingKey(config) - if (ifCreateQueue) { - createQueue(queueName) - } - bindQueueToExchange(queueName, exchangeName, routingKey) - Cxn("", () => RabbitConnection.newNextPayload(queueName), () => RabbitConnection.disconnect) - } + // def init(config: Config, createQueue:Boolean = false, sink:Sink[Task, ByteVector]): SingleResponseConnection = { +// createConnection(config, createQueue, sink) +// } - def subscribe(config: Config): Sxn = { + + def subscribe(config: Config, messageParser:MessageParser = PlainMessageParser): SubscriptionConnection = { implicit val rabbitConnnection: RabbitConnection = rabbitConnection(config) val queueName = readQueue(config) val exchangeName = readExchange(config) val routingKey = readRoutingKey(config) + val fileName = getFilename(config) bindQueueToExchange(queueName, exchangeName, routingKey) - Sxn(RabbitConnection.registerListener(queueName), ()=>RabbitConnection.disconnect) + + + println(" Created a subscription with "+ queueName + " " +exchangeName+ " "+ routingKey ) + + SubscriptionConnection(RabbitConnection.registerListener(queueName, messageParser), ()=>RabbitConnection.disconnect, Option(fileName)) + + } diff --git a/src/main/scala/com/ppb/rabbitconsumer/RabbitConnection.scala b/src/main/scala/com/ppb/rabbitconsumer/RabbitConnection.scala index 9faa963..2cc0ec6 100644 --- a/src/main/scala/com/ppb/rabbitconsumer/RabbitConnection.scala +++ b/src/main/scala/com/ppb/rabbitconsumer/RabbitConnection.scala @@ -1,16 +1,14 @@ package com.ppb.rabbitconsumer -import argonaut.Json +import java.io.IOException + +import argonaut.Argonaut._ +import argonaut.{Json, _} +import com.ppb.rabbitconsumer.RabbitConsumerAlgebra._ import com.rabbitmq.client._ import scala.collection.JavaConverters._ import scala.util.{Failure, Success, Try} -import argonaut._ -import Argonaut._ -import java.io.IOException - -import scala.concurrent.Future -import scala.concurrent.ExecutionContext.Implicits.global sealed trait MessageParser { def msgParser(load:Array[Byte]):Try[RabbitResponse] } @@ -36,29 +34,6 @@ case object JSONMessageParser extends MessageParser { -//trait QueueSubscriber { -// def subscribeNow(quene: String, consumer: (Array[Byte]) => Unit): Unit; -//} - -// -//trait QueueSubscriber extends Consumer { -// def messageReceived(consumerTag:String, envelop:Envelope, properties:AMQP.BasicProperties, payload:Array[Byte]): Unit; -//} -// -//case class DefaultQueueSubscriber( messageProcessor: (Array[Byte]) => Unit ) extends QueueSubscriber { -//// def messageReceived(consumerTag: String, envelop: Envelope, properties: AMQP.BasicProperties, payload: Array[Byte]): Unit = { -//// messageProcessor(payload); -//// } -// //import com.rabbitmq.client.AMQP -// -// -// @throws[IOException] -// def handleDelivery(consumerTag: String, envelop: Envelope, properties: AMQP.BasicProperties, payload: Array[Byte]): Unit = { -// messageProcessor(payload); -// } -// -//} - object RabbitConnection { def disconnect(implicit rabbitConnection: RabbitConnection): Try[Unit] = { @@ -80,24 +55,12 @@ object RabbitConnection { def deleteQueue(queueName: String)(implicit rabbitConnection: RabbitConnection): Unit = rabbitConnection.channel.queueDelete(queueName) + @Deprecated def nextPayload(queueName: String)(implicit rabbitConnection: RabbitConnection): RabbitResponse = { - val response = for { - message <- Try(rabbitConnection.nextMessage()) - json <- asJson(message) - } yield RabbitJsonMessage(json) - - response match { - case Success(msg) => msg - case Failure(th) => NoMoreMessages - } + readNextPayload(queueName , JSONMessageParser) } -// def subscribe(queueName: String, consumer:Consumer) (implicit rabbitConnection: RabbitConnection): Unit = { -// rabbitConnection.channel.basicConsume(queueName, false, consumer) -// } - - - def newNextPayload(queueName: String, messageParser: MessageParser = PlainMessageParser )(implicit rabbitConnection: RabbitConnection): RabbitResponse = { + def readNextPayload(queueName: String, messageParser: MessageParser = PlainMessageParser )(implicit rabbitConnection: RabbitConnection): RabbitResponse = { val response = for { message <- Try(rabbitConnection.nextMessage()) @@ -110,29 +73,45 @@ object RabbitConnection { } } + implicit class ScalaHeaderProperties(properties: BasicProperties) { + def headerAsScalaMap: Map[String, AnyRef] = { + Option(properties.getHeaders) map { props => + props.asScala.toMap[String, AnyRef] + } getOrElse Map.empty[String, AnyRef] + } + } - // Function takes queueName, messageParser and returns a function which takes in a callback and return nothing. - // Return function will be invoked by the consumer registering a callbackFunction - // when message is received 'handleDelivery' is invoked which starts a future which will parse message and pass it callback function on complete - def registerListener(queueName:String, messageParser: MessageParser = PlainMessageParser)(implicit rabbitConnection: RabbitConnection): ((Try[RabbitResponse] => Unit) => String) = { - - val definedConsumer: (Try[RabbitResponse] => Unit) => String = (callbackFun) => { + /** + * Function to register to queue for message notification. This method will subscribe to queue, will consume when + * message arrives in queue + * @param queueName + * @param messageParser : Custom Parser of type MessageParse + * @param rabbitConnection + * @return : A function Holder of type parameter 'OnReceive' returns String. + */ + def registerListener(queueName:String, messageParser: MessageParser = PlainMessageParser)(implicit rabbitConnection: RabbitConnection): (OnReceive) => String = { + + val definedConsumer:OnReceive => String = (callbackFun) => { rabbitConnection.channel.basicConsume(queueName, false, new DefaultConsumer(rabbitConnection.channel) { - @throws(classOf[IOException]) - override def handleDelivery(consumerTag: String, envelope: Envelope, properties: AMQP.BasicProperties, body: Array[Byte]): Unit = { - Future { - messageParser.msgParser(body) match { - case Failure(th) => RabbitException(th) - case Success(rabbitResponse) => RabbitMessage(rabbitResponse, properties.getHeaders) - } - }.onComplete(callbackFun) + @throws(classOf[IOException]) + override def handleDelivery(consumerTag: String, + envelope: Envelope, + properties: AMQP.BasicProperties, + body: Array[Byte]): Unit = { + + callbackFun(properties.getTimestamp, queueName, messageParser.msgParser(body), properties.headerAsScalaMap) } // end handle - }) // end consume + }) // end define consume } definedConsumer } + +// def publish(queueName:String, routingKey:String, message:String)(implicit rabbitConnection: RabbitConnection):Unit = { +// rabbitConnection.channel.basicPublish("", queueName, null, message.getBytes()) +// } + def asJson(payload: Array[Byte]): Try[Json] = new String(payload, "UTF-8").parse match { case Right(json) => Success(json) diff --git a/src/main/scala/com/ppb/rabbitconsumer/RabbitConsumer.scala b/src/main/scala/com/ppb/rabbitconsumer/RabbitConsumer.scala index 03554a3..6062c5a 100644 --- a/src/main/scala/com/ppb/rabbitconsumer/RabbitConsumer.scala +++ b/src/main/scala/com/ppb/rabbitconsumer/RabbitConsumer.scala @@ -1,72 +1,126 @@ package com.ppb.rabbitconsumer +import java.io.File +import java.util.Date + import argonaut._ -import com.rabbitmq.client.Consumer import scala.collection.JavaConverters._ import com.typesafe.config.{Config, ConfigFactory} import org.slf4j.LoggerFactory +import scodec.bits.ByteVector import scala.util.Try import scalaz.concurrent.Task -import scalaz.stream._ +import scalaz.stream.{Sink, _} + + +object RabbitConsumerAlgebra { + + sealed trait RabbitResponse extends Product with Serializable + case object NoMoreMessages extends RabbitResponse + case class RabbitException(throwable:Throwable) extends RabbitResponse + + /** + * Message Format used for representing Json type message payload + * @param payload + */ + case class RabbitJsonMessage(payload: Json) extends RabbitResponse + + /** + * Message Format used to represent Simple String type message payload + * @param plainPayload + */ + case class RabbitPlainMessage(plainPayload: String) extends RabbitResponse + + /** + * Type OnReceive accepts timestamp as Long , queueName as String, RabbitResponse as try[RabbitResponse], and header as Map. + * It Returns a Unit + */ + type OnReceive = (Date, String, Try[RabbitResponse], Map[String, AnyRef]) => Unit + // type ProcessMessage = (SingleResponseConnection) => Process[Task, Unit] + // type MessageStreaming = (Any) => Process[Task, Unit] + + /** + * Class for holding Message reader and disconnect function from queue or exchange. + * //@param stream : stream Sink where response will be streamed + * @param nextMessage : Function which will read queue and return Rabbit Response + * @param disconnect : Function to disconnect from queue + */ +// case class SingleResponseConnection(filename: String, nextMessage: () => RabbitResponse, disconnect: () => Try[Unit]) + case class SingleResponseConnection(nextMessage: () => RabbitResponse, disconnect: () => Try[Unit], stream: Sink[Task, ByteVector] = io.stdOutBytes) + + /** + * Class for holding subscription to the queue + * @param subscription : of Type 'OnReceive' function holder, will be called when message arrives and consumed + * @param disconnect : disconnect from queue + */ + case class SubscriptionConnection(subscription:OnReceive => String, disconnect: () => Try[Unit], fileName:Option[String] = None) + + /** + * class for holding configuration + * @param name : Name of config + * @param configs : List of Configuration of type Config + */ + case class Configurations(name: String, configs: List[Config]) +} -case class Cxn(filename: String, nextMessage: () => RabbitResponse, disconnect: () => Try[Unit]) -case class Sxn(subscription:((Try[RabbitResponse] => Unit) => String), disconnect: () => Try[Unit]) +object RabbitConsumer { -case class Configurations(name: String, configs: List[Config]) + import RabbitConsumerAlgebra._ -object RabbitConsumer { - val jsonPreamble = "{\n \"all\": [" - val jsonPostamble = "]}" + //val jsonPreamble = "{\n \"all\": [" + //val jsonPostamble = "]}" private val logger = LoggerFactory.getLogger(RabbitConsumer.getClass) - def local(): Unit = read("local") + def local(): Unit = readFromResource("local") def done(configName: String): Unit = - getConfigs(configName).configs foreach ConnectionService.done + getConfigFromResource(configName).configs foreach ConnectionService.done - def getConfigs(configName: String): Configurations = { + def getConfigFromResource(configName: String): Configurations = { val configs = ConfigFactory.load(configName).getConfigList("amqp.connections").asScala.toList Configurations(configName, configs) } - val getMessagesPerConnection: Cxn => Process[Task, Unit] = cxn => - getMessagesWithPreamble(cxn.nextMessage).toSource pipe text.utf8Encode to io.fileChunkW(cxn.filename) - val read: (String) => Unit = getConfigs _ andThen consumeMessages(ConnectionService.init, getMessagesPerConnection) + def getConfigsFromFile(configFile: String): Configurations = + Configurations(configFile, ConfigFactory.parseFile(new File(configFile)).getConfigList("amqp.connections").asScala.toList) + + + val getMessagesPerConnection: (SingleResponseConnection) => Process[Task, Unit] = (cxn) => { + //getMessagesWithPreamble(cxn.nextMessage).toSource pipe text.utf8Encode to cxn.stream + getMessages(cxn.nextMessage).toSource pipe text.utf8Encode to cxn.stream + } + + + val readFromResource: (String) => Unit = getConfigFromResource _ andThen consumeMessages(ConnectionService.createConnection(_), getMessagesPerConnection) + val readFromFile: (String) => Unit = getConfigsFromFile _ andThen consumeMessages(ConnectionService.createConnection(_), getMessagesPerConnection) - def consumeMessages(getCxn: Config => Cxn, fxMessages: Cxn => Process[Task, Unit])(c: Configurations): Unit = { + + def consumeMessages(getCxn: Config => SingleResponseConnection, fxMessages: (SingleResponseConnection) => Process[Task, Unit])(c: Configurations): Unit = { c.configs.map(getCxn) foreach { cxn => { fxMessages(cxn).run.run cxn.disconnect() + } } - } + } - logger.info(s"Done receiving ${c.name} messages") + def subscribeMessages(createSubscriptionConnection: Config => SubscriptionConnection, onReceiveFun:Config => OnReceive)(c: Configurations): Unit = { + c.configs.map(config => createSubscriptionConnection(config).subscription(onReceiveFun(config))) - //logger.info(s"""When you're done testing, run "R.done("${c.name}") to delete the following Rabbit queues:""") - //c.configs.foreach { config => - // logger.info(s"- ${config.getString("queue")}") - // } } - def subscribeMessages(getSxn: Config => Sxn, callbackFunction:(Try[RabbitResponse]=>Unit))(c: Configurations): Unit = { - c.configs.map(getSxn) foreach { sxn => { - logger.info("Subscribing , please wait .....") - sxn.subscription(callbackFunction) - } - } - } - def getMessagesWithPreamble(nextMessage: () => RabbitResponse): Process0[String] = - Process(jsonPreamble) ++ getMessages(nextMessage) ++ Process(jsonPostamble) //getMessages(nextMessage) + @Deprecated + def getMessagesWithPreamble(nextMessage: () => RabbitResponse): Process0[String] = getMessages(nextMessage) + // Process(jsonPreamble) ++ getMessages(nextMessage) ++ Process(jsonPostamble) def getMessages(nextMessage: () => RabbitResponse): Process0[String] = @@ -89,7 +143,6 @@ object RabbitConsumer { def receiveAllAny(nextMessage: () => RabbitResponse): Process0[Any] = nextMessage() match { case RabbitJsonMessage(json) => Process.emit(json) ++ receiveAllAny(nextMessage) - // case NoMoreMessages => Process.halt case RabbitPlainMessage(plainPayload) => Process.emit(plainPayload) ++ receiveAllAny(nextMessage) case _ => Process.halt } diff --git a/src/main/scala/com/ppb/rabbitconsumer/examples/RabbitConsumerApp.scala b/src/main/scala/com/ppb/rabbitconsumer/examples/RabbitConsumerApp.scala deleted file mode 100644 index cd9075a..0000000 --- a/src/main/scala/com/ppb/rabbitconsumer/examples/RabbitConsumerApp.scala +++ /dev/null @@ -1,46 +0,0 @@ -package com.ppb.rabbitconsumer.examples - -import java.io.File - -import com.ppb.rabbitconsumer.{Configurations, ConnectionService, Cxn, RabbitConsumer} -import com.typesafe.config.{Config, ConfigFactory} - -import scala.collection.JavaConverters._ -import scalaz.concurrent.Task -import scalaz.stream._ - -object RabbitConsumerApp extends App { - - - def getConfigsFromFile(configFile: File): Configurations = - Configurations(configFile.getName, ConfigFactory.parseFile(configFile).getConfigList("amqp.connections").asScala.toList) - - val cxxn: (Config) => Cxn = (config) => - ConnectionService.newInit(config) - - val processMessages: Cxn => Process[Task, Unit] = cxn => - RabbitConsumer.getMessages(cxn.nextMessage).toSource pipe text.utf8Encode to io.chunkW(System.out) - - - val consume:(File) => Unit = getConfigsFromFile _ andThen RabbitConsumer.consumeMessages(cxxn, processMessages) - - val exitUsage:(String) => Unit = (message ) => { - println(s"${message} \n\t valid command line arguments are \n") - System.exit(1); - } - - - ///////// execution starts ////////////// - - println( "\n\n\t Rabbit Consumer App") - println( "\n\n\t Copyright (c) \n") - println( s"\t Version := ${System.getProperty("prog.version")} , revision := ${System.getProperty("prog.revision")}\n ") - - - if (args.length == 0) exitUsage("") - else { - consume(new File(args(0))) - } - println(" All done , exiting") - -} diff --git a/src/main/scala/com/ppb/rabbitconsumer/examples/RabbitExampleApp.scala b/src/main/scala/com/ppb/rabbitconsumer/examples/RabbitExampleApp.scala new file mode 100644 index 0000000..b09608b --- /dev/null +++ b/src/main/scala/com/ppb/rabbitconsumer/examples/RabbitExampleApp.scala @@ -0,0 +1,79 @@ +package com.ppb.rabbitconsumer.examples + +import java.io.File +import java.text.SimpleDateFormat + +import com.ppb.rabbitconsumer.RabbitConsumerAlgebra._ +import com.ppb.rabbitconsumer.{ConnectionService, _} +import com.typesafe.config.{Config, ConfigFactory} + +import scala.collection.JavaConverters._ +import scalaz.stream.{Process, Process0, text, _} + +object RabbitExampleApp extends App { + + + + + def getConfigsFromFile(configFile: File): Configurations = + Configurations(configFile.getName, ConfigFactory.parseFile(configFile).getConfigList("amqp.connections").asScala.toList) + + + val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS") + + val onReceiveFun: (Config) => OnReceive = (config) => { + + lazy val outStream = Option(ConfigService.getFilename(config)).getOrElse(None) match { + case file:String => {io.fileChunkW(file, 4096, true)} + case None => {io.stdOutBytes} + } + + val myReceiveFun:OnReceive = (timestamp, queueName, response, header) => { + println(" Called on onReceive callback " + queueName + " response " + response) + + val headerStream = Option(header).getOrElse(None) match { + case headerVal => Process("Header={"+headerVal.toString+"}") + case None=> Process.emit("") + } + + + val responseStream = response.getOrElse(NoMoreMessages) match { + case RabbitPlainMessage(message) => Process.emit( message) + case RabbitJsonMessage(json) => Process.emit(json.spaces2) + case RabbitException(th) => Process.emit(th.toString) + case _ => Process.halt + } + val partialProc:Process0[String] = Process(dateFormat.format(timestamp)) ++ headerStream ++ responseStream ++ Process("\n") + + (partialProc.intersperse("\t").toSource pipe text.utf8Encode to outStream).run.run + } + + myReceiveFun + } + + + val doSubscribe:(File) => Unit = getConfigsFromFile _ andThen RabbitConsumer.subscribeMessages(ConnectionService.subscribe(_) , onReceiveFun ) + + val exitUsage:(String) => Unit = (message ) => { + println(s"${message} \n\t valid command line arguments are \n") + System.exit(0); + } + + + + + + println("\n\n\t Copyright (c) \n") + println(s"\t Version := ${System.getProperty("prog.version")} , revision := ${System.getProperty("prog.revision")}\n ") + println(" ============> args "+ args(0)) + if (args.length == 0) exitUsage("") + + if (args.length == 1) { + println("\n\n\t Rabbit Consumer App") + RabbitConsumer.readFromFile + } else { + println("\n\n\t Rabbit Subscriber App") + doSubscribe(new File(args(0))) + } + +} diff --git a/src/main/scala/com/ppb/rabbitconsumer/examples/RabbitSubscribeApp.scala b/src/main/scala/com/ppb/rabbitconsumer/examples/RabbitSubscribeApp.scala deleted file mode 100644 index 7fa7016..0000000 --- a/src/main/scala/com/ppb/rabbitconsumer/examples/RabbitSubscribeApp.scala +++ /dev/null @@ -1,64 +0,0 @@ -package com.ppb.rabbitconsumer.examples - -import java.io.File - -import com.ppb.rabbitconsumer._ -import com.typesafe.config.ConfigFactory - -import scala.collection.JavaConverters._ -import scala.util.Try - -object RabbitSubscribeApp extends App { - - - def getConfigsFromFile(configFile: File): Configurations = - Configurations(configFile.getName, ConfigFactory.parseFile(configFile).getConfigList("amqp.connections").asScala.toList) - - - // the callback function which will take a rabbit response and print result - // result is either RabbitMessage or Exception - val subscritpionFun: (Try[RabbitResponse]) => Unit = (response) =>{ - println(" \n receiving response \n--------------------------------------------------------") - response.getOrElse(NoMoreMessages) match { - case RabbitMessage(rabbitResponse, header) => { - println("Header = "+header) - rabbitResponse match { - case RabbitPlainMessage(message) => println(message) - case RabbitJsonMessage(json) => println(json.spaces2) - case _ => {} - } - } - case RabbitException(th)=> th.printStackTrace() - case NoMoreMessages => {} - case _ => {} - } -} - - - - - val subscribe:(File) => Unit = getConfigsFromFile _ andThen RabbitConsumer.subscribeMessages(ConnectionService.subscribe , subscritpionFun) - - - val exitUsage:(String) => Unit = (message ) => { - println(s"${message} \n\t valid command line arguments are \n") - System.exit(0); - } - - - ///////// execution starts ////////////// - - println( "\n\n\t Rabbit Consumer App") - println( "\n\n\t Copyright (c) \n") - println( s"\t Version := ${System.getProperty("prog.version")} , revision := ${System.getProperty("prog.revision")}\n ") - - - if (args.length == 0) { - exitUsage("") - } - else { - subscribe(new File(args(0))) - } - // println(" All done , exiting") - -} diff --git a/src/test/scala/com/ppb/rabbitconsumer/ConfigServiceSpec.scala b/src/test/scala/com/ppb/rabbitconsumer/ConfigServiceSpec.scala index 53cd91c..6231ada 100644 --- a/src/test/scala/com/ppb/rabbitconsumer/ConfigServiceSpec.scala +++ b/src/test/scala/com/ppb/rabbitconsumer/ConfigServiceSpec.scala @@ -5,6 +5,7 @@ import org.scalatest.{FlatSpec, Matchers} import scala.collection.JavaConverters._ + class ConfigServiceSpec extends FlatSpec with Matchers { behavior of "ConfigService" diff --git a/src/test/scala/com/ppb/rabbitconsumer/OnReceiveSpec.scala b/src/test/scala/com/ppb/rabbitconsumer/OnReceiveSpec.scala new file mode 100644 index 0000000..15f9fae --- /dev/null +++ b/src/test/scala/com/ppb/rabbitconsumer/OnReceiveSpec.scala @@ -0,0 +1,67 @@ +package com.ppb.rabbitconsumer + +import java.text.SimpleDateFormat +import java.util +import java.util.{Calendar, Date} +import java.util.concurrent.Executors + +import com.ppb.rabbitconsumer.RabbitConsumerAlgebra.{OnReceive, RabbitJsonMessage, RabbitPlainMessage, RabbitResponse} +import org.scalatest.FlatSpec +import argonaut._ +import Argonaut._ +import com.rabbitmq.client.BasicProperties +import org.scalatest.mockito.MockitoSugar +import org.mockito.Mockito._ + +import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionContext +import scala.util.{Failure, Success, Try} +import scalaz.concurrent.Future +import com.ppb.rabbitconsumer.RabbitConnection.ScalaHeaderProperties + +class OnReceiveSpec extends FlatSpec { + + val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS") + + val onMessageReceive : OnReceive = (timestamp, queue, response, header) => { + println(dateFormat.format(timestamp)+" Got Message from Queue " + queue) + println(" \t Header " + header) + println(" \t Message "+ response.getOrElse("ERRRRRRRR").toString ) + } + + def registerListener(queueName:String, messageParser: MessageParser = PlainMessageParser): OnReceive => String = { + + implicit val context = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor()) + lazy val definedConsumer:OnReceive => String = (onReceiveFun) => { + println("inside definedConsumer") + val tryMsg = Try( RabbitPlainMessage("MESSAGE") ) + val header:java.util.Map[String,AnyRef] = new util.HashMap[String, AnyRef](); + header.put("key1",new String("value1")) + header.put("key2",new String("value2")) + header.put("key3",null) + + val properties: BasicProperties = MockitoSugar.mock[BasicProperties] + when(properties.getHeaders).thenReturn(header) + when(properties.getTimestamp).thenReturn(Calendar.getInstance().getTime()) + + val propertiesNullHeader: BasicProperties = MockitoSugar.mock[BasicProperties] + when(propertiesNullHeader.getHeaders).thenReturn(null) + + val f: Future[Unit] = Future { + onReceiveFun(properties.getTimestamp , "QueueName", tryMsg, properties.headerAsScalaMap ) + } + + f.start.after(1000) + + "CONNECTED" + } + definedConsumer; + } + + + + println("Testing the OnReceiveSpec") + //val connection = registerListener("queue").(onMessageReceive) + // println( connection ); + +} diff --git a/src/test/scala/com/ppb/rabbitconsumer/RabbitConnectionSpec.scala b/src/test/scala/com/ppb/rabbitconsumer/RabbitConnectionSpec.scala index 90de711..acadbee 100644 --- a/src/test/scala/com/ppb/rabbitconsumer/RabbitConnectionSpec.scala +++ b/src/test/scala/com/ppb/rabbitconsumer/RabbitConnectionSpec.scala @@ -1,5 +1,6 @@ package com.ppb.rabbitconsumer +import com.ppb.rabbitconsumer.RabbitConsumerAlgebra._ import com.rabbitmq.client.AMQP.Queue.{BindOk, DeleteOk} import com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk import com.rabbitmq.client.{Channel, Connection} @@ -87,7 +88,7 @@ class RabbitConnectionSpec extends FlatSpec with Matchers with MockitoSugar { val rabbitConnection = RabbitConnection(connection, channel, nextMessage) - val message: RabbitResponse = RabbitConnection.nextPayload("someQueue")(rabbitConnection) + val message: RabbitResponse = RabbitConnection.readNextPayload("someQueue", JSONMessageParser)(rabbitConnection) message shouldBe a[RabbitJsonMessage] } @@ -99,7 +100,7 @@ class RabbitConnectionSpec extends FlatSpec with Matchers with MockitoSugar { val rabbitConnection = RabbitConnection(connection, channel, nextMessage) - val message: RabbitResponse = RabbitConnection.nextPayload("someQueue")(rabbitConnection) + val message: RabbitResponse = RabbitConnection.readNextPayload("someQueue")(rabbitConnection) message shouldBe NoMoreMessages } diff --git a/src/test/scala/com/ppb/rabbitconsumer/RabbitConsumerSpec.scala b/src/test/scala/com/ppb/rabbitconsumer/RabbitConsumerSpec.scala index 5cea206..ed8cee2 100644 --- a/src/test/scala/com/ppb/rabbitconsumer/RabbitConsumerSpec.scala +++ b/src/test/scala/com/ppb/rabbitconsumer/RabbitConsumerSpec.scala @@ -2,6 +2,7 @@ package com.ppb.rabbitconsumer import argonaut._ import Argonaut._ +import com.ppb.rabbitconsumer.RabbitConsumerAlgebra._ import com.typesafe.config.Config import org.scalatest.mockito.MockitoSugar import org.scalatest.{FlatSpec, Matchers} @@ -26,7 +27,7 @@ class RabbitConsumerSpec extends FlatSpec with Matchers with MockitoSugar { } it should "read configuration files" in { - val config: Configurations = RabbitConsumer.getConfigs("local") + val config: Configurations = RabbitConsumer.getConfigFromResource("local") config.name should be ("local") config.configs should have size 2 } @@ -39,14 +40,13 @@ class RabbitConsumerSpec extends FlatSpec with Matchers with MockitoSugar { } val myMock = mock[MyMock] - val getCxn: Config => Cxn = _ => Cxn("", () => NoMoreMessages, () => { myMock.soWasI(); Success(()) }) - - val getMessages: Cxn => Process[Task, Unit] = _ => { + val getCxn: Config => SingleResponseConnection = _ => SingleResponseConnection(() => NoMoreMessages, () => { myMock.soWasI(); Success(()) }) + val getMessages: SingleResponseConnection => Process[Task, Unit] = _ => { myMock.iWasCalled() Process(()).toSource } - val configs: Configurations = RabbitConsumer.getConfigs("local") + val configs: Configurations = RabbitConsumer.getConfigFromResource("local") RabbitConsumer.consumeMessages(getCxn, getMessages)(configs) verify(myMock, times(2)).iWasCalled()