-
Notifications
You must be signed in to change notification settings - Fork 1
Introduction
Let's show a simple use case: print the documents of a collection.
We assume that you got a running MongoDB instance. If not, get the latest MongoDB binaries and unzip the archive. Then you can launch the database:
$ cd mongodb-osx-x86_64-2.0.6
$ mkdir data
$ ./bin/mongod --dbpath dataThis will start a standalone MongoDB instance that stores its data in the data directory and listens on the TCP port 27017.
There is a Maven repository at https://bitbucket.org/sgodbillon/repository/raw/master/snapshots/.
If you use SBT, you just have to edit your build.properties and add the following:
resolvers += "sgodbillon" at "https://bitbucket.org/sgodbillon/repository/raw/master/snapshots/"
libraryDependencies ++= Seq(
"org.asyncmongo" %% "mongo-async-driver" % "0.1-SNAPSHOT"
)You can get a connection to a server (or a replica set) like this:
def test() {
import org.asyncmongo.api._
val connection = MongoConnection( List( "localhost:27017" ) )
val db = DB("plugin", connection)
val collection = db("acoll")
}The connection reference manages a pool of connections. You can provide a list of one ore more servers; the driver will guess if it's a standalone server or a replica set configuration. Even with one replica node, the driver will probe for other nodes and add them automatically.
package foo
import org.asyncmongo.api._
import org.asyncmongo.bson._
import org.asyncmongo.handlers.DefaultBSONHandlers._
import play.api.libs.iteratee.Iteratee
object Samples {
def listDocs() = {
val connection = MongoConnection( List( "localhost:27017" ) )
val db = DB("plugin", connection)
val collection = db("acoll")
val futureCursor = collection.find(Bson("name" -> BSONString("Jack")))
Cursor.enumerate(futureCursor)(Iteratee.foreach { doc =>
println("found document: " + DefaultBSONIterator.pretty(doc))
})
}
}The above code deserves some explanations.
First, let's take a look to the collection.find signature:
def find[T, U, V](query: T, fields: Option[U] = None, skip: Int = 0, limit: Int = 0, flags: Int = 0)(implicit writer: BSONWriter[T], writer2: BSONWriter[U], handler: BSONReaderHandler, reader: BSONReader[V]) :Future[Cursor[V]]The find method allows you to pass any query object of type T, provided that there is an implicit BSONWriter[T] in the scope. BSONWriter[T] is a typeclass which instances implement a write(document: T) method that returns a ChannelBuffer:
trait BSONWriter[DocumentType] {
def write(document: DocumentType) :ChannelBuffer
}BSONReader[V] is the opposite typeclass. It's typically a deserializer that takes a ChannelBuffer and returns an instance of V:
trait BSONReader[DocumentType] {
def read(buffer: ChannelBuffer) :DocumentType
}These two typeclasses allow you to provide different de/serializers for different types.
For this example, we don't need to write specific handlers, so we use the default ones by importing org.asyncmongo.handlers.DefaultBSONHandlers._.
Among DefaultBSONHandlers is a BSONWriter[Bson] that handles the shipped-in BSON library.
You may have noticed that collection.find returns a Future[Cursor[V]]. In fact, everything in MongoAsync is both non-blocking and asynchronous. That means each time you make a query, the only immediate result you get is a future of result, so the current thread is not blocked waiting for its completion. You don't need to have n threads to process n database operations at the same time anymore.
When a query matches too much documents, Mongo sends just a part of them and creates a Cursor in order to get the next documents. The problem is, how to handle it in a non-blocking, asynchronous, yet elegant way?
That's where the Enumerator/Iteratee pattern (or immutable Producer/Consumer pattern) comes to the rescue!
Let's consider the next statement:
Cursor.enumerate(futureCursor)(Iteratee.foreach { doc =>
println("found document: " + DefaultBSONIterator.pretty(doc))
})The method Cursor.enumerate[T](Future[Cursor[T]]) returns an Enumerator[T]. Enumerators can be seen as producers of data: their job is to give chunks of data when data is available. In this case, we get a producer of documents, which source is a future cursor.
Now that we have the producer, we need to define how the documents are processed: that is the Iteratee's job. Iteratees, as the opposite of Enumerators, are consumers: they are fed in by enumerators and do some computation with the chunks they get.
Here, we write a very simple Iteratee: each time it gets a document, it makes a readable, JSON-like description of the document and prints it on the console. Note that none of these operations are blocking: when the running thread is not processing the callback of our iteratee, it can be used to compute other things.
When this snippet is run, we get the following:
found document: {
_id: BSONObjectID["4f899e7eaf527324ab25c56b"],
name: BSONString(Jack)
}
found document: {
_id: BSONObjectID["4f899f9baf527324ab25c56c"],
name: BSONString(Jack)
}
found document: {
_id: BSONObjectID["4f899f9baf527324ab25c56d"],
name: BSONString(Jack)
}
found document: {
_id: BSONObjectID["4f8a269aaf527324ab25c56e"],
name: BSONString(Jack)
}
found document: {
_id: BSONObjectID["4f8a269aaf527324ab25c56f"],
name: BSONString(Jack)
}
found document: {
_id: BSONObjectID["4fa15559af527324ab25c570"],
name: BSONString(Jack)
}