diff --git a/build.sbt b/build.sbt index 5270d5d..7e824f1 100644 --- a/build.sbt +++ b/build.sbt @@ -4,6 +4,8 @@ import Tests._ name := "Rabbit Consumer" +organization := "com.paddypowerbetfair" + version := "0.1" scalaVersion := "2.12.3" @@ -55,6 +57,11 @@ val mockito = Seq ( "org.mockito" % "mockito-core" % mockitoV % "test" ) +// to use for local build only +publishTo := Some(Resolver.file("file", new File(Resolver.mavenLocal.root))) + + + libraryDependencies ++= logging ++ scalacheck ++ scalatest ++ amqpClient ++ scalaz ++ argonaut ++ typesafeConfig ++ mockito lazy val root = (project in file(".")) diff --git a/docker/resources/rabbitmq/definitions.json b/docker/resources/rabbitmq/definitions.json index 86ac8ea..09dcbf5 100644 --- a/docker/resources/rabbitmq/definitions.json +++ b/docker/resources/rabbitmq/definitions.json @@ -1,39 +1,97 @@ { - "rabbit_version": "3.6.9", - "users": [ - { - "name": "guest", - "password_hash": "FkYR4/EokTofabv5M6BrcganRjk/OjgbjT70+yIz0Ms05liE", - "hashing_algorithm": "rabbit_password_hashing_sha256", - "tags": "administrator" - } - ], - "vhosts": [ - { - "name": "/" - } - ], - "permissions": [ - { - "user": "guest", - "vhost": "/", - "configure": ".*", - "write": ".*", - "read": ".*" - } - ], - "parameters": [], - "policies": [], - "queues": [], - "exchanges": [ - { - "name": "myExchange", - "vhost": "/", - "type": "fanout", - "durable": true, - "auto_delete": false, - "internal": false, - "arguments": {} - } - ] + "rabbit_version":"3.7.2", + "users":[ + { + "name":"guest", + "password_hash":"FkYR4/EokTofabv5M6BrcganRjk/OjgbjT70+yIz0Ms05liE", + "hashing_algorithm":"rabbit_password_hashing_sha256", + "tags":"administrator" + } + ], + "vhosts":[ + { + "name":"/" + } + ], + "permissions":[ + { + "user":"guest", + "vhost":"/", + "configure":".*", + "write":".*", + "read":".*" + } + ], + "topic_permissions":[ + + ], + "parameters":[ + + ], + "global_parameters":[ + { + "name":"cluster_name", + "value":"rabbit@ppb-rabbit-server" + } + ], + "policies":[ + + ], + "queues":[ + { + "name":"myTestQueue", + "vhost":"/", + "durable":false, + "auto_delete":false, + "arguments":{ + + } + }, + + { + "name":"myItTestSecondQueue", + "vhost":"/", + "durable":false, + "auto_delete":false, + "arguments":{ + + } + } + ], + "exchanges":[ + { + "name":"myExchange", + "vhost":"/", + "type":"fanout", + "durable":false, + "auto_delete":false, + "internal":false, + "arguments":{ + + } + } + ], + "bindings":[ + { + "source":"myExchange", + "vhost":"/", + "destination":"myTestQueue", + "destination_type":"queue", + "routing_key":"myItTest", + "arguments":{ + + } + }, + + { + "source":"myExchange", + "vhost":"/", + "destination":"myItTestSecondQueue", + "destination_type":"queue", + "routing_key":"myItTestSecond", + "arguments":{ + + } + } + ] } diff --git a/src/it/resources/local.conf b/src/it/resources/local.conf index e39cec9..799e51a 100644 --- a/src/it/resources/local.conf +++ b/src/it/resources/local.conf @@ -1,26 +1,32 @@ amqp { connections = [ { - ip = "localhost" + name = "uat" + ip = "127.0.0.1" port = 5672 user = "guest" password = "guest" useSSL = false + vhost = "/", exchangeName = "myExchange" - queue = "dwimyQueue" - routingKey = "" - fileName = "~/output1.json" - }, + routingKey = "myItTest" + queue = "myTestQueue" + fileName = "target/myTestQueue.log" + } { - ip = "localhost" + name = "uat" + ip = "127.0.0.1" port = 5672 user = "guest" password = "guest" useSSL = false + vhost = "/", exchangeName = "myExchange" - queue = "dwimyQueue" - routingKey = "" - fileName = "~/output2.json" + routingKey = "myItTestSecond" + queue = "myItTestSecondQueue" + fileName = "target/myItTestSecondQueue.log" } ] } + + diff --git a/src/it/scala/com/ppb/rabbitconsumer/RabbitConsumerITSpec.scala b/src/it/scala/com/ppb/rabbitconsumer/RabbitConsumerITSpec.scala index fe1ab69..794154b 100644 --- a/src/it/scala/com/ppb/rabbitconsumer/RabbitConsumerITSpec.scala +++ b/src/it/scala/com/ppb/rabbitconsumer/RabbitConsumerITSpec.scala @@ -1,18 +1,73 @@ package com.ppb.rabbitconsumer +import java.io.File +import java.nio.file.{Files, Paths} +import java.util.UUID + +import com.ppb.rabbitconsumer.RabbitConsumerAlgebra.Configurations +import com.typesafe.config.{Config, ConfigFactory} +import org.scalatest._ + +import scala.collection.JavaConverters._ import argonaut._ import Argonaut._ -import org.scalatest.{FlatSpec, Matchers} -import org.slf4j.LoggerFactory + import scala.io.Source -import scala.util.{Failure, Success, Try} -import scala.collection.JavaConverters._ -import com.typesafe.config.{Config, ConfigFactory} +//import scalaz.stream.{Sink, _} + + +class RabbitConsumerITSpec extends FlatSpec { + + val configFile:String = "src/it/resources/local.conf" + val configurations:Configurations = Configurations(configFile, ConfigFactory.parseFile(new File(configFile)).getConfigList("amqp.connections").asScala.toList) + + val setUp:Config => Unit = (config) => { + println("Inside setup "+ config) + val rabbitConn:RabbitConnection = ConnectionService.rabbitConnection(config) + val queueName = ConfigService.readQueue(config) + rabbitConn.channel.queuePurge(queueName) + println(" Publishing dummy to queue "+ queueName) + for (i <- 1 to 3) { + val map:Map[String, String] = Map("QueueName"->queueName, "message" -> UUID.randomUUID().toString, "ID"-> i.toString) + rabbitConn.channel.basicPublish("", queueName, null, map.asJson.toString().getBytes()) + } -class RabbitConsumerITSpec extends FlatSpec with Matchers { + RabbitConsumer.readFromFile(configFile) + println("closing connection") + rabbitConn.connection.close(1000) + Option(ConfigService.getFilename(config)) map( + outputfile => { Files.deleteIfExists(Paths.get(outputfile)) } + )getOrElse( println (" To Stdout") ) - it should "read docker local configuration files and process it" in { - RabbitConsumer.local } + + val verifyOrFail:Config => Unit = (config) => { + Option(ConfigService.getFilename(config)) map( + outputfile => { + assert(Files.exists(Paths.get(outputfile))) + assert(Files.size(Paths.get(outputfile)) > 0) + println("============== "+ outputfile +"============") ; + scala.io.Source.fromFile(outputfile).getLines foreach println _ + } + )getOrElse( ) + } + + + + configurations.configs.map(setUp) + + it should "read stream message to local file it" in { + RabbitConsumer.readFromFile(configFile) + + configurations.configs.map(verifyOrFail) + +// configurations.configs.foreach( +// (x) => { +// println("============== "+x +"============") ; +// scala.io.Source.fromFile(ConfigService.getFilename(x)).getLines foreach println _ +// } +// ) + } + } diff --git a/src/main/scala/com/ppb/rabbitconsumer/ConnectionService.scala b/src/main/scala/com/ppb/rabbitconsumer/ConnectionService.scala index a6d4dcc..9a7a3f1 100644 --- a/src/main/scala/com/ppb/rabbitconsumer/ConnectionService.scala +++ b/src/main/scala/com/ppb/rabbitconsumer/ConnectionService.scala @@ -1,16 +1,18 @@ package com.ppb.rabbitconsumer -import com.rabbitmq.client.ConnectionFactory -import com.typesafe.config.Config - -import argonaut._ import com.ppb.rabbitconsumer.ConfigService.{getFilename, readExchange, readQueue, readRoutingKey} import com.ppb.rabbitconsumer.RabbitConnection._ +import com.ppb.rabbitconsumer.RabbitConsumerAlgebra.{SingleResponseConnection, SubscriptionConnection} +import com.rabbitmq.client.ConnectionFactory +import com.typesafe.config.Config import org.slf4j.LoggerFactory +import scodec.bits.ByteVector + +import scalaz.Sink +import scalaz.concurrent.Task +import scalaz.stream.{Sink, io, sink} + -sealed trait RabbitResponse extends Product with Serializable -case object NoMoreMessages extends RabbitResponse -case class RabbitMessage(payload: Json) extends RabbitResponse object ConnectionService { @@ -40,19 +42,53 @@ object ConnectionService { RabbitConnection(connection, channel, nextMessage) } - def init(config: Config): Cxn = { + + def createConnection(config: Config) : SingleResponseConnection = { + implicit val rabbitConnnection: RabbitConnection = rabbitConnection(config) + val queueName = readQueue(config) + val exchangeName = readExchange(config) + val routingKey = readRoutingKey(config) + bindQueueToExchange(queueName, exchangeName, routingKey) + + val sink = Option(getFilename(config)) map { + fileNameDefined => io.fileChunkW(fileNameDefined) + } getOrElse io.stdOutBytes + + SingleResponseConnection(() => RabbitConnection.readNextPayload(queueName), () => RabbitConnection.disconnect, sink) + } + + + + + + + // def init(config: Config, createQueue:Boolean = false, sink:Sink[Task, ByteVector]): SingleResponseConnection = { +// createConnection(config, createQueue, sink) +// } + + + def subscribe(config: Config, messageParser:MessageParser = PlainMessageParser): SubscriptionConnection = { implicit val rabbitConnnection: RabbitConnection = rabbitConnection(config) val queueName = readQueue(config) val exchangeName = readExchange(config) val routingKey = readRoutingKey(config) + val fileName = getFilename(config) - createQueue(queueName) bindQueueToExchange(queueName, exchangeName, routingKey) - Cxn(getFilename(config), () => RabbitConnection.nextPayload(queueName), () => RabbitConnection.disconnect) + + + println(" Created a subscription with "+ queueName + " " +exchangeName+ " "+ routingKey ) + + SubscriptionConnection(RabbitConnection.registerListener(queueName, messageParser), ()=>RabbitConnection.disconnect, Option(fileName)) + + } + + + def done(config: Config): Unit = { val queueName = readQueue(config) logger.info(s"Deleting $queueName") diff --git a/src/main/scala/com/ppb/rabbitconsumer/RabbitConnection.scala b/src/main/scala/com/ppb/rabbitconsumer/RabbitConnection.scala index b29732f..2cc0ec6 100644 --- a/src/main/scala/com/ppb/rabbitconsumer/RabbitConnection.scala +++ b/src/main/scala/com/ppb/rabbitconsumer/RabbitConnection.scala @@ -1,12 +1,39 @@ package com.ppb.rabbitconsumer -import argonaut.Json -import com.rabbitmq.client.{Channel, Connection} +import java.io.IOException + +import argonaut.Argonaut._ +import argonaut.{Json, _} +import com.ppb.rabbitconsumer.RabbitConsumerAlgebra._ +import com.rabbitmq.client._ import scala.collection.JavaConverters._ import scala.util.{Failure, Success, Try} -import argonaut._ -import Argonaut._ + + +sealed trait MessageParser { def msgParser(load:Array[Byte]):Try[RabbitResponse] } + +case object PlainMessageParser extends MessageParser { + override def msgParser(load: Array[Byte]): Try[RabbitResponse] = { + Option(new String(load, "UTF-8")) match { + case None => Failure(new IllegalArgumentException("payload is null")) + case Some(text) => Success(RabbitPlainMessage(text)) + } + } +} + +case object JSONMessageParser extends MessageParser { + override def msgParser(load: Array[Byte]): Try[RabbitResponse] = { + new String(load, "UTF-8").parse match { + case Right(json) => Success(RabbitJsonMessage(json)) + case Left(error) => Failure(new IllegalStateException(error)) + } + } +} + + + + object RabbitConnection { def disconnect(implicit rabbitConnection: RabbitConnection): Try[Unit] = { @@ -28,18 +55,63 @@ object RabbitConnection { def deleteQueue(queueName: String)(implicit rabbitConnection: RabbitConnection): Unit = rabbitConnection.channel.queueDelete(queueName) + @Deprecated def nextPayload(queueName: String)(implicit rabbitConnection: RabbitConnection): RabbitResponse = { + readNextPayload(queueName , JSONMessageParser) + } + + def readNextPayload(queueName: String, messageParser: MessageParser = PlainMessageParser )(implicit rabbitConnection: RabbitConnection): RabbitResponse = { + val response = for { message <- Try(rabbitConnection.nextMessage()) - json <- asJson(message) - } yield RabbitMessage(json) + rabbitResponse <- messageParser.msgParser(message) + } yield rabbitResponse response match { - case Success(msg) => msg case Failure(th) => NoMoreMessages + case Success(rabbitResp) => rabbitResp + } + } + + implicit class ScalaHeaderProperties(properties: BasicProperties) { + def headerAsScalaMap: Map[String, AnyRef] = { + Option(properties.getHeaders) map { props => + props.asScala.toMap[String, AnyRef] + } getOrElse Map.empty[String, AnyRef] } } + /** + * Function to register to queue for message notification. This method will subscribe to queue, will consume when + * message arrives in queue + * @param queueName + * @param messageParser : Custom Parser of type MessageParse + * @param rabbitConnection + * @return : A function Holder of type parameter 'OnReceive' returns String. + */ + def registerListener(queueName:String, messageParser: MessageParser = PlainMessageParser)(implicit rabbitConnection: RabbitConnection): (OnReceive) => String = { + + val definedConsumer:OnReceive => String = (callbackFun) => { + rabbitConnection.channel.basicConsume(queueName, false, new DefaultConsumer(rabbitConnection.channel) { + + @throws(classOf[IOException]) + override def handleDelivery(consumerTag: String, + envelope: Envelope, + properties: AMQP.BasicProperties, + body: Array[Byte]): Unit = { + + callbackFun(properties.getTimestamp, queueName, messageParser.msgParser(body), properties.headerAsScalaMap) + } // end handle + }) // end define consume + } + definedConsumer + } + + +// def publish(queueName:String, routingKey:String, message:String)(implicit rabbitConnection: RabbitConnection):Unit = { +// rabbitConnection.channel.basicPublish("", queueName, null, message.getBytes()) +// } + def asJson(payload: Array[Byte]): Try[Json] = new String(payload, "UTF-8").parse match { case Right(json) => Success(json) @@ -47,6 +119,4 @@ object RabbitConnection { } } -case class RabbitConnection(connection: Connection, channel: Channel, nextMessage: () => Array[Byte]) - - +case class RabbitConnection(connection: Connection, channel: Channel, nextMessage: () => Array[Byte]) \ No newline at end of file diff --git a/src/main/scala/com/ppb/rabbitconsumer/RabbitConsumer.scala b/src/main/scala/com/ppb/rabbitconsumer/RabbitConsumer.scala index 4f758e5..6062c5a 100644 --- a/src/main/scala/com/ppb/rabbitconsumer/RabbitConsumer.scala +++ b/src/main/scala/com/ppb/rabbitconsumer/RabbitConsumer.scala @@ -1,65 +1,149 @@ package com.ppb.rabbitconsumer +import java.io.File +import java.util.Date + import argonaut._ import scala.collection.JavaConverters._ import com.typesafe.config.{Config, ConfigFactory} import org.slf4j.LoggerFactory +import scodec.bits.ByteVector import scala.util.Try import scalaz.concurrent.Task -import scalaz.stream._ +import scalaz.stream.{Sink, _} + + +object RabbitConsumerAlgebra { + + sealed trait RabbitResponse extends Product with Serializable + case object NoMoreMessages extends RabbitResponse + case class RabbitException(throwable:Throwable) extends RabbitResponse + + /** + * Message Format used for representing Json type message payload + * @param payload + */ + case class RabbitJsonMessage(payload: Json) extends RabbitResponse + + /** + * Message Format used to represent Simple String type message payload + * @param plainPayload + */ + case class RabbitPlainMessage(plainPayload: String) extends RabbitResponse + + /** + * Type OnReceive accepts timestamp as Long , queueName as String, RabbitResponse as try[RabbitResponse], and header as Map. + * It Returns a Unit + */ + type OnReceive = (Date, String, Try[RabbitResponse], Map[String, AnyRef]) => Unit + // type ProcessMessage = (SingleResponseConnection) => Process[Task, Unit] + // type MessageStreaming = (Any) => Process[Task, Unit] + + /** + * Class for holding Message reader and disconnect function from queue or exchange. + * //@param stream : stream Sink where response will be streamed + * @param nextMessage : Function which will read queue and return Rabbit Response + * @param disconnect : Function to disconnect from queue + */ +// case class SingleResponseConnection(filename: String, nextMessage: () => RabbitResponse, disconnect: () => Try[Unit]) + case class SingleResponseConnection(nextMessage: () => RabbitResponse, disconnect: () => Try[Unit], stream: Sink[Task, ByteVector] = io.stdOutBytes) + + /** + * Class for holding subscription to the queue + * @param subscription : of Type 'OnReceive' function holder, will be called when message arrives and consumed + * @param disconnect : disconnect from queue + */ + case class SubscriptionConnection(subscription:OnReceive => String, disconnect: () => Try[Unit], fileName:Option[String] = None) + + /** + * class for holding configuration + * @param name : Name of config + * @param configs : List of Configuration of type Config + */ + case class Configurations(name: String, configs: List[Config]) +} -case class Cxn(filename: String, nextMessage: () => RabbitResponse, disconnect: () => Try[Unit]) -case class Configurations(name: String, configs: List[Config]) object RabbitConsumer { - val jsonPreamble = "{\n \"all\": [" - val jsonPostamble = "]}" + + import RabbitConsumerAlgebra._ + + //val jsonPreamble = "{\n \"all\": [" + //val jsonPostamble = "]}" private val logger = LoggerFactory.getLogger(RabbitConsumer.getClass) - def local(): Unit = read("local") + def local(): Unit = readFromResource("local") def done(configName: String): Unit = - getConfigs(configName).configs foreach ConnectionService.done + getConfigFromResource(configName).configs foreach ConnectionService.done - def getConfigs(configName: String): Configurations = { + def getConfigFromResource(configName: String): Configurations = { val configs = ConfigFactory.load(configName).getConfigList("amqp.connections").asScala.toList Configurations(configName, configs) } - val getMessagesPerConnection: Cxn => Process[Task, Unit] = cxn => - getMessages(cxn.nextMessage).toSource pipe text.utf8Encode to io.fileChunkW(cxn.filename) - val read: (String) => Unit = getConfigs _ andThen consumeMessages(ConnectionService.init, getMessagesPerConnection) + def getConfigsFromFile(configFile: String): Configurations = + Configurations(configFile, ConfigFactory.parseFile(new File(configFile)).getConfigList("amqp.connections").asScala.toList) + + + val getMessagesPerConnection: (SingleResponseConnection) => Process[Task, Unit] = (cxn) => { + //getMessagesWithPreamble(cxn.nextMessage).toSource pipe text.utf8Encode to cxn.stream + getMessages(cxn.nextMessage).toSource pipe text.utf8Encode to cxn.stream + } + + + val readFromResource: (String) => Unit = getConfigFromResource _ andThen consumeMessages(ConnectionService.createConnection(_), getMessagesPerConnection) + val readFromFile: (String) => Unit = getConfigsFromFile _ andThen consumeMessages(ConnectionService.createConnection(_), getMessagesPerConnection) + - def consumeMessages(getCxn: Config => Cxn, getMessages: Cxn => Process[Task, Unit])(c: Configurations): Unit = { + def consumeMessages(getCxn: Config => SingleResponseConnection, fxMessages: (SingleResponseConnection) => Process[Task, Unit])(c: Configurations): Unit = { c.configs.map(getCxn) foreach { cxn => { - getMessages(cxn).run.run + fxMessages(cxn).run.run cxn.disconnect() + } } - } + } - logger.info(s"Done receiving ${c.name} messages") + def subscribeMessages(createSubscriptionConnection: Config => SubscriptionConnection, onReceiveFun:Config => OnReceive)(c: Configurations): Unit = { + c.configs.map(config => createSubscriptionConnection(config).subscription(onReceiveFun(config))) - logger.info(s"""When you're done testing, run "R.done("${c.name}") to delete the following Rabbit queues:""") - c.configs.foreach { config => - logger.info(s"- ${config.getString("queue")}") - } } - private def getMessages(nextMessage: () => RabbitResponse): Process0[String] = - Process(jsonPreamble) ++ - (receiveAll(nextMessage) map (_.spaces2) intersperse ",") ++ - Process(jsonPostamble) - def receiveAll(nextMessage: () => RabbitResponse): Process0[Json] = + + @Deprecated + def getMessagesWithPreamble(nextMessage: () => RabbitResponse): Process0[String] = getMessages(nextMessage) + // Process(jsonPreamble) ++ getMessages(nextMessage) ++ Process(jsonPostamble) + + + def getMessages(nextMessage: () => RabbitResponse): Process0[String] = + receiveAllAny(nextMessage) match { + case ss:Process0[String] => ss intersperse("\n") + case kk:Process0[Json] => kk map(_.spaces2) intersperse "," + } + +// +// Process(jsonPreamble) ++ +// (receiveAll(nextMessage) map (_.spaces2) intersperse ",") ++ +// Process(jsonPostamble) + +// def receiveAll(nextMessage: () => RabbitResponse): Process0[Json] = +// nextMessage() match { +// case RabbitMessage(json) => Process.emit(json) ++ receiveAll(nextMessage) +// case NoMoreMessages => Process.halt +// } + + def receiveAllAny(nextMessage: () => RabbitResponse): Process0[Any] = nextMessage() match { - case RabbitMessage(json) => Process.emit(json) ++ receiveAll(nextMessage) - case NoMoreMessages => Process.halt + case RabbitJsonMessage(json) => Process.emit(json) ++ receiveAllAny(nextMessage) + case RabbitPlainMessage(plainPayload) => Process.emit(plainPayload) ++ receiveAllAny(nextMessage) + case _ => Process.halt } } diff --git a/src/main/scala/com/ppb/rabbitconsumer/examples/RabbitExampleApp.scala b/src/main/scala/com/ppb/rabbitconsumer/examples/RabbitExampleApp.scala new file mode 100644 index 0000000..b09608b --- /dev/null +++ b/src/main/scala/com/ppb/rabbitconsumer/examples/RabbitExampleApp.scala @@ -0,0 +1,79 @@ +package com.ppb.rabbitconsumer.examples + +import java.io.File +import java.text.SimpleDateFormat + +import com.ppb.rabbitconsumer.RabbitConsumerAlgebra._ +import com.ppb.rabbitconsumer.{ConnectionService, _} +import com.typesafe.config.{Config, ConfigFactory} + +import scala.collection.JavaConverters._ +import scalaz.stream.{Process, Process0, text, _} + +object RabbitExampleApp extends App { + + + + + def getConfigsFromFile(configFile: File): Configurations = + Configurations(configFile.getName, ConfigFactory.parseFile(configFile).getConfigList("amqp.connections").asScala.toList) + + + val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS") + + val onReceiveFun: (Config) => OnReceive = (config) => { + + lazy val outStream = Option(ConfigService.getFilename(config)).getOrElse(None) match { + case file:String => {io.fileChunkW(file, 4096, true)} + case None => {io.stdOutBytes} + } + + val myReceiveFun:OnReceive = (timestamp, queueName, response, header) => { + println(" Called on onReceive callback " + queueName + " response " + response) + + val headerStream = Option(header).getOrElse(None) match { + case headerVal => Process("Header={"+headerVal.toString+"}") + case None=> Process.emit("") + } + + + val responseStream = response.getOrElse(NoMoreMessages) match { + case RabbitPlainMessage(message) => Process.emit( message) + case RabbitJsonMessage(json) => Process.emit(json.spaces2) + case RabbitException(th) => Process.emit(th.toString) + case _ => Process.halt + } + val partialProc:Process0[String] = Process(dateFormat.format(timestamp)) ++ headerStream ++ responseStream ++ Process("\n") + + (partialProc.intersperse("\t").toSource pipe text.utf8Encode to outStream).run.run + } + + myReceiveFun + } + + + val doSubscribe:(File) => Unit = getConfigsFromFile _ andThen RabbitConsumer.subscribeMessages(ConnectionService.subscribe(_) , onReceiveFun ) + + val exitUsage:(String) => Unit = (message ) => { + println(s"${message} \n\t valid command line arguments are \n") + System.exit(0); + } + + + + + + println("\n\n\t Copyright (c) \n") + println(s"\t Version := ${System.getProperty("prog.version")} , revision := ${System.getProperty("prog.revision")}\n ") + println(" ============> args "+ args(0)) + if (args.length == 0) exitUsage("") + + if (args.length == 1) { + println("\n\n\t Rabbit Consumer App") + RabbitConsumer.readFromFile + } else { + println("\n\n\t Rabbit Subscriber App") + doSubscribe(new File(args(0))) + } + +} diff --git a/src/test/resources/local.conf b/src/test/resources/local.conf index 6afe7df..4a1c995 100644 --- a/src/test/resources/local.conf +++ b/src/test/resources/local.conf @@ -9,7 +9,7 @@ amqp { exchangeName = "myExchange" queue = "myQueue" routingKey = "myRoutingKey" - fileName = "~/output1.json" + fileName = "target/output1.json" }, { ip = "localhost" @@ -20,7 +20,7 @@ amqp { exchangeName = "myExchange" queue = "myQueue" routingKey = "someQueue" - fileName = "~/output2.json" + fileName = "target/output2.json" } ] } diff --git a/src/test/scala/com/ppb/rabbitconsumer/ConfigServiceSpec.scala b/src/test/scala/com/ppb/rabbitconsumer/ConfigServiceSpec.scala index 53cd91c..6231ada 100644 --- a/src/test/scala/com/ppb/rabbitconsumer/ConfigServiceSpec.scala +++ b/src/test/scala/com/ppb/rabbitconsumer/ConfigServiceSpec.scala @@ -5,6 +5,7 @@ import org.scalatest.{FlatSpec, Matchers} import scala.collection.JavaConverters._ + class ConfigServiceSpec extends FlatSpec with Matchers { behavior of "ConfigService" diff --git a/src/test/scala/com/ppb/rabbitconsumer/OnReceiveSpec.scala b/src/test/scala/com/ppb/rabbitconsumer/OnReceiveSpec.scala new file mode 100644 index 0000000..15f9fae --- /dev/null +++ b/src/test/scala/com/ppb/rabbitconsumer/OnReceiveSpec.scala @@ -0,0 +1,67 @@ +package com.ppb.rabbitconsumer + +import java.text.SimpleDateFormat +import java.util +import java.util.{Calendar, Date} +import java.util.concurrent.Executors + +import com.ppb.rabbitconsumer.RabbitConsumerAlgebra.{OnReceive, RabbitJsonMessage, RabbitPlainMessage, RabbitResponse} +import org.scalatest.FlatSpec +import argonaut._ +import Argonaut._ +import com.rabbitmq.client.BasicProperties +import org.scalatest.mockito.MockitoSugar +import org.mockito.Mockito._ + +import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionContext +import scala.util.{Failure, Success, Try} +import scalaz.concurrent.Future +import com.ppb.rabbitconsumer.RabbitConnection.ScalaHeaderProperties + +class OnReceiveSpec extends FlatSpec { + + val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS") + + val onMessageReceive : OnReceive = (timestamp, queue, response, header) => { + println(dateFormat.format(timestamp)+" Got Message from Queue " + queue) + println(" \t Header " + header) + println(" \t Message "+ response.getOrElse("ERRRRRRRR").toString ) + } + + def registerListener(queueName:String, messageParser: MessageParser = PlainMessageParser): OnReceive => String = { + + implicit val context = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor()) + lazy val definedConsumer:OnReceive => String = (onReceiveFun) => { + println("inside definedConsumer") + val tryMsg = Try( RabbitPlainMessage("MESSAGE") ) + val header:java.util.Map[String,AnyRef] = new util.HashMap[String, AnyRef](); + header.put("key1",new String("value1")) + header.put("key2",new String("value2")) + header.put("key3",null) + + val properties: BasicProperties = MockitoSugar.mock[BasicProperties] + when(properties.getHeaders).thenReturn(header) + when(properties.getTimestamp).thenReturn(Calendar.getInstance().getTime()) + + val propertiesNullHeader: BasicProperties = MockitoSugar.mock[BasicProperties] + when(propertiesNullHeader.getHeaders).thenReturn(null) + + val f: Future[Unit] = Future { + onReceiveFun(properties.getTimestamp , "QueueName", tryMsg, properties.headerAsScalaMap ) + } + + f.start.after(1000) + + "CONNECTED" + } + definedConsumer; + } + + + + println("Testing the OnReceiveSpec") + //val connection = registerListener("queue").(onMessageReceive) + // println( connection ); + +} diff --git a/src/test/scala/com/ppb/rabbitconsumer/RabbitConnectionSpec.scala b/src/test/scala/com/ppb/rabbitconsumer/RabbitConnectionSpec.scala index 4fa4306..acadbee 100644 --- a/src/test/scala/com/ppb/rabbitconsumer/RabbitConnectionSpec.scala +++ b/src/test/scala/com/ppb/rabbitconsumer/RabbitConnectionSpec.scala @@ -1,5 +1,6 @@ package com.ppb.rabbitconsumer +import com.ppb.rabbitconsumer.RabbitConsumerAlgebra._ import com.rabbitmq.client.AMQP.Queue.{BindOk, DeleteOk} import com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk import com.rabbitmq.client.{Channel, Connection} @@ -87,8 +88,8 @@ class RabbitConnectionSpec extends FlatSpec with Matchers with MockitoSugar { val rabbitConnection = RabbitConnection(connection, channel, nextMessage) - val message: RabbitResponse = RabbitConnection.nextPayload("someQueue")(rabbitConnection) - message shouldBe a[RabbitMessage] + val message: RabbitResponse = RabbitConnection.readNextPayload("someQueue", JSONMessageParser)(rabbitConnection) + message shouldBe a[RabbitJsonMessage] } it should "return a NoMoreMessages object from the RabbitMq Broker when no payload is available" in { @@ -99,7 +100,7 @@ class RabbitConnectionSpec extends FlatSpec with Matchers with MockitoSugar { val rabbitConnection = RabbitConnection(connection, channel, nextMessage) - val message: RabbitResponse = RabbitConnection.nextPayload("someQueue")(rabbitConnection) + val message: RabbitResponse = RabbitConnection.readNextPayload("someQueue")(rabbitConnection) message shouldBe NoMoreMessages } diff --git a/src/test/scala/com/ppb/rabbitconsumer/RabbitConsumerSpec.scala b/src/test/scala/com/ppb/rabbitconsumer/RabbitConsumerSpec.scala index 3a71635..ed8cee2 100644 --- a/src/test/scala/com/ppb/rabbitconsumer/RabbitConsumerSpec.scala +++ b/src/test/scala/com/ppb/rabbitconsumer/RabbitConsumerSpec.scala @@ -2,6 +2,7 @@ package com.ppb.rabbitconsumer import argonaut._ import Argonaut._ +import com.ppb.rabbitconsumer.RabbitConsumerAlgebra._ import com.typesafe.config.Config import org.scalatest.mockito.MockitoSugar import org.scalatest.{FlatSpec, Matchers} @@ -21,12 +22,12 @@ class RabbitConsumerSpec extends FlatSpec with Matchers with MockitoSugar { behavior of "RabbitConsumer" it should "receive all messages" in new RabbitConsumerFixture { - val message = RabbitConsumer.receiveAll(receiveOneMessage).toSource.runLog.run + val message = RabbitConsumer.receiveAllAny(receiveOneMessage).toSource.runLog.run message should have size 1 } it should "read configuration files" in { - val config: Configurations = RabbitConsumer.getConfigs("local") + val config: Configurations = RabbitConsumer.getConfigFromResource("local") config.name should be ("local") config.configs should have size 2 } @@ -39,14 +40,13 @@ class RabbitConsumerSpec extends FlatSpec with Matchers with MockitoSugar { } val myMock = mock[MyMock] - val getCxn: Config => Cxn = _ => Cxn("", () => NoMoreMessages, () => { myMock.soWasI(); Success(()) }) - - val getMessages: Cxn => Process[Task, Unit] = _ => { + val getCxn: Config => SingleResponseConnection = _ => SingleResponseConnection(() => NoMoreMessages, () => { myMock.soWasI(); Success(()) }) + val getMessages: SingleResponseConnection => Process[Task, Unit] = _ => { myMock.iWasCalled() Process(()).toSource } - val configs: Configurations = RabbitConsumer.getConfigs("local") + val configs: Configurations = RabbitConsumer.getConfigFromResource("local") RabbitConsumer.consumeMessages(getCxn, getMessages)(configs) verify(myMock, times(2)).iWasCalled() @@ -62,7 +62,7 @@ trait RabbitConsumerFixture { val receiveOneMessage: () => RabbitResponse = () => if (times == 0) { times = 1 - RabbitMessage("".asJson) + RabbitJsonMessage("".asJson) } else { NoMoreMessages }