1. Introduction

MongoDB is one of the most popular NoSQL databases today. It uses a BSON(Binary JSON) format to save the data (documents) in collections. For Scala, there are a few drivers for MongoDB. However, ReactiveMongo is the most popular of them all.

In this tutorial, we’ll look at ReactiveMongo and how we can use it to perform queries on MongoDB.

2. About ReactiveMongo

ReactiveMongo is an asynchronous and non-blocking Scala driver for MongoDB. In addition to performing standard CRUD operations, it also supports querying the data as a stream, which helps to process a large amount of data with minimal overhead.

3. Dependencies

First, let’s add the required dependencies to the build.sbt file:

"org.reactivemongo" %% "reactivemongo" % "1.0.3",
"de.flapdoodle.embed" % "de.flapdoodle.embed.mongo" % "3.0.0" % Test

Apart from the ReactiveMongo dependency, we also add a dependency for an embedded MongoDB. This allows us to start an in-memory version of MongoDB for testing.

4. Creating a Connection

Let’s start by creating a connection to MongoDB. We’ll be using a MongoDB URL for connecting to the database. For example, to connect to a local MongoDB database named movies, we can specify the URL as mongodb://localhost:27017/movies:

val mongoDriver = AsyncDriver()
lazy val parsedURIFuture: Future[ParsedURI] = MongoConnection.fromString(mongoURL)
lazy val connection: Future[MongoConnection] = parsedURIFuture.flatMap(u => mongoDriver.connect(u))

To connect to a database, we can use:

val db: Future[DB] = connection.flatMap(_.database(dbName))

With this db instance, we can get an instance to BSONCollection, using which we can execute queries:

val moviesCollection: Future[BSONCollection] = db.map(_.collection(collectionName))

5. Supporting Entities

Let’s create a sample case class to model the MongoDB collection:

case class Movie(name:String, leadActor:String, genre:String, durationInMin: Int)

We need to create implicit variables to map the case class to BSONDocument. These implicit need to be in scope for ReactiveMongo to convert the BSONDocument into/from the case class instances:

implicit def moviesWriter: BSONDocumentWriter[Movie] = Macros.writer[Movie]
implicit def moviesReader: BSONDocumentReader[Movie] = Macros.reader[Movie]

6. Inserting Data

Now, let’s see how we can insert some documents into the MongoDB collection:

<span class="blob-code-inner blob-code-marker" data-code-marker="+"><span class="pl-k">val</span> <span class="pl-smi">movie</span> <span class="pl-k">=</span> <span class="pl-en">Movie</span>(<span class="pl-s"><span class="pl-pds">"</span>The Shawshank Redemption<span class="pl-pds">"</span></span>, <span class="pl-s"><span class="pl-pds">"</span>Morgan Freeman<span class="pl-pds">"</span></span>, <span class="pl-s"><span class="pl-pds">"</span>Drama<span class="pl-pds">"</span></span>, <span class="pl-c1">144</span>)</span>
connection.getCollection("Movie").flatMap { col =>
  val insertResultFuture: Future[WriteResult] = col.insert.one(movie)
  insertResultFuture
  }
}

If we want to insert multiple documents together, we can use the many() method instead of one():

val allMovies: Seq[Movie] = getMoviesList()
connection.getCollection("Movie").flatMap { col =>
  col.insert.many(allMovies)
  }
}

7. Fetching Documents from a Collection

Let’s see how we can filter the collections based on a particular field in MongoDB:

val dramaMovies = connection.getCollection("Movie").flatMap(c =>
  c.find(BSONDocument("genre" -> "Drama"))
  .cursor[Movie]().collect(err = Cursor.FailOnError[List[Movie]]()))

If we want to get only a limited number of documents matching a particular condition, we can provide the maximum number of documents to collect. Let’s say we want only the two longest drama movies:

val longestTwoDramas = col.find(BSONDocument("genre" -> "Drama"))
  .sort(BSONDocument("durationInMin" -> -1))
  .cursor[Movie]().collect(2, Cursor.FailOnError[List[Movie]]())

The sort() function sorts the documents based on the provided field. The value -1 corresponds to sorting in descending order, whereas the number 1 corresponds to ascending order.

If we want to get all the documents from the collection without applying any filter, we can use an empty BSONDocument in the find method:

val allMovies = connection.getCollection("Movie")
  .flatMap(_.find(BSONDocument()).cursor[Movie]()
  .collect(err = Cursor.FailOnError[List[Movie]]()))

We can also provide multiple conditions to the query:

val bradPittDramasFuture = connection.getCollection("Movie")
  .flatMap(_.find(
    BSONDocument(
      "genre" -> "Drama", 
      "leadActor" -> "Brad Pitt", 
      "durationInMin" -> BSONDocument("$gt" -> 130))
    ).cursor[Movie]().collect(err = Cursor.FailOnError[List[Movie]]()))

Note that operations like $gt, $gte, $lt, and $lte, which are used for greater-than and less-than filters, need to be wrapped in BSONDocument in the filter condition.

8. Updating a Document

Now, let’s see how we can find a document and update some fields of the same document. For that, we can use the function findAndUpdate:

connection.getCollection("Movie").flatMap { col =>
  val updateStatus = col.findAndUpdate(
    BSONDocument("name" -> "Fight Club"),
    BSONDocument("$set" -> BSONDocument("durationInMin" -> 145))
  )
}

The function findAndUpdate takes two parameters. The first parameter is the filter to select the document. The second parameter is the update to be applied. If the filter operation matches more than one document, the update will be applied to the first match only. To update multiple records, we should create an UpdateBuilder and invoke the method many():

val updateFuture = connection.getCollection("Movie").flatMap { col =>
  val updateBuilder = col.update(true)
  val updates = updateBuilder.element(
    q = BSONDocument("genre" -> "Drama"),
    u = BSONDocument("$set" -> BSONDocument("genre" -> "Dramatic")),
    multi = true
  )
  updates.flatMap(updateEle => updateBuilder.many(Seq(updateEle)))
}

9. Deleting a Document

We can also delete a document from the collection by calling the method findAndRemove(). Let’s delete a movie by name:

col.findAndRemove(BSONDocument("name" -> "Troy"))

10. Streaming Documents Using Akka-Stream

ReactiveMongo now supports the streaming of documents. It processes the data without loading the entire documents into memory. ReactiveMongo has a module for akka-stream integration. To use it, we need to add the reactivemongo-akkastream dependency to our buid.sbt file:

"org.reactivemongo" %% "reactivemongo-akkastream" % "1.0.3"

Now, we can use the streaming API to stream and process the documents from MongoDB. Let’s implement a very simple example of finding out the sum of durations of all the movies in the database. First, we need to create a Source:

val source = col.find(BSONDocument()).cursor[Movie]()
  .documentSource(100, Cursor.FailOnError())

Now, let’s extract the durations for each movie and sum them:

val totalDurationFuture = source.map(_.durationInMin).runWith(Sink.fold(0)(_ + _))

11. Conclusion

In this tutorial, we’ve seen how we can use ReactiveMongo to run queries on MongoDB. We’ve covered the most common and widely used parts of ReactiveMongo.

As always, the sample code is available over on GitHub.

Comments are closed on this article!