1. Introduction

Elasticsearch is a distributed, RESTful search and analytics engine at the heart of the Elastic Stack. Over the years, many libraries have been developed to interact with an Elasticsearch cluster.

In this tutorial, we’re going to see how to use elastic4s to work with Elasticsearch in Scala applications.

2. What Is Elasticsearch?

Elasticsearch centrally stores our data so we can search, index, and analyze information of all shapes and sizes, such as application logs or metrics. It accomplishes this by storing it in the form of text documents. When the Elasticsearch cluster is composed of more than one node, the documents are distributed across the cluster and can be accessed from any node.

Collections of documents form the so-called indexes. As a matter of fact, indexing is the process of adding documents to an Elasticsearch cluster.

Elasticsearch provides us with a powerful REST API that we can use via HTTP. It also comes with a variety of built-in clients for a number of programming languages, such as Java or Python. In the Scala ecosystem, we can use the elastic4s library.

3. Elasticsearch in Scala with elastic4s

elastic4s is a Scala client for Elasticsearch. While we can use the official Java client as well, the resulting code is more verbose and cannot leverage Scala core classes, such as Either and Future. For example, elastic4s returns Option instead of null. The library also supports Scala collections, so we don’t need to use Java ones, and it makes use of Scala’s Duration instead of Strings and Longs to represent time values.

To use elastic4s in our project, we’ll need to import two libraries to our project using sbt:

"com.sksamuel.elastic4s" %% "elastic4s-client-esjava" % "8.8.1"
"com.sksamuel.elastic4s" %% "elastic4s-core" % "8.8.1"

To use the DSL, we need to add the import statement:

import com.sksamuel.elastic4s.ElasticDsl._

By default, elastic4s uses Scala Futures when returning responses. However, elastic4s supports many other ways to deal with effects, such as ZIO, Cats-Effect, and ScalaZ.

In the following sections, we’re going to see how to use elastic4s to work with Elasticsearch in Scala.

3.1. Creating an elastic4s Client

The first step to using elastic4s is to create a client:

val client = ElasticClient(
  JavaClient(ElasticProperties("http://localhost:9200"))
)

In the example above, we created a new ElasticClient by wrapping a JavaClient. Additionally, we configured the latter with the endpoint of our Elasticsearch cluster. If we had multiple nodes, instead of only one, we could pass multiple endpoints:

ElasticProperties("http://host1:9200,host2:9200,host3:9200")

We must close the client explicitly at the end of our application:

client.close()

3.2. Creating an Index

After connecting to an Elasticsearch cluster, we can start to send commands to it. Let’s create an index to make our cluster ready to receive documents:

client.execute {
  createIndex("activities").mapping(
    properties(
      TextField("username"),
      DateField("when"),
      IpField("ip"),
      TextField("action")
    )
  )
}.await

The example above models an index to keep track of user activity. For each activity, we track the username of the user that performed it, the action they performed, when it was performed, and from which IP address (ip).

client.execute() is part of the DSL and tells elastic4s to execute an operation on the cluster that the client is connected to. In this case, such an operation is CreateIndexApi::createIndex(). First, we define the name of the index, activities in this case. Then, we define the properties of the index. elastic4s gives us many different property types. This example only shows TextField, DateField, and IpField, but we can use many more (such as IntField or ShortField for integral numbers).

ElasticClient::execute() only allows us to perform a single call to Elasticsearch. In this configuration, it returns a Future on which we invoke await to wait for the operation to complete. This is fine for testing purposes, but in an application, we’d likely want to operate asynchronously on the returned value.

3.3. Listing Indexes

We can list all the indexes defined in our cluster by invoking CatsApi::catIndices(). Let’s see how to use it to verify that our activities index was created correctly:

client.execute {
  catIndices()
}.await match {
  case failure: RequestFailure => println(failure.error)
  case response: Response[Seq[CatIndicesResponse]] => println(response.result.exists(_.index == "activities"))
}

This time we pattern-match on the value returned by await, which might either be a failure (RequestFailure) or a success. In the example, we just filter the response list to check if there’s an index named activities, but we could also print its information (such as the health status).

There’s a simpler way to verify that the index was created – the IndexAdminApi::indexExists() operation:

client.execute {
  indexExists("activities")
}.await match {
  case failure: RequestFailure => println(failure.error)
  case response: RequestSuccess[IndexExistsResponse] => println(response.result.exists)
}

In this case, the response we want is of type IndexExistsResponse, and we can verify that the activities index exists by printing the field result.exists of the response.

3.4. Creating Documents in an Index

We can now write some documents in the activities index we just created:

client.execute {
  indexInto("activities").fields(
    "username" -> "thomas",
    "when" -> Instant.now,
    "ip" -> "192.168.197.123",
    "action" -> "GetArticles"
  ).refresh(RefreshPolicy.Immediate)
}.andThen {
  case Success(_) =>
    client.execute(
      indexInto("activities").fields(
        "username" -> "robert",
        "when" -> Instant.now,
        "ip" -> "192.168.197.103",
        "action" -> "DeleteArticle"
      ).refreshImmediately
    )
}.await

IndexApi::indexInto() allows us to write documents into an index. In the example above, we call it twice to add two activities, one for the user thomas and one for robert. In this case, we leverage the composability of Future to chain two operations.

IndexRequest::fields() lets us specify a document as a sequence of pairs (String, Any). There are other possibilities. For example, we can use IndexRequest::doc() to write a JSON document instead of a sequence of pairs.

Lastly, we always specify .refresh(RefreshPolicy.Immediate) after adding a document to an index. This tells Elasticsearch to refresh the cluster so that we can see the new document immediately. Since refreshing immediately is fairly common, we can also specify .refreshImmediately instead of .refresh(RefreshPolicy.Immediate).

3.5. Listing Documents in an Index

To verify that we actually just created documents in our index, we can invoke the SearchApi::search() operation:

client.execute {
  search("activities")
}.await match {
  case failure: RequestFailure => println(failure.error)
  case response: RequestSuccess[SearchResponse] =>
    val searchHits = response.result.hits.hits.toList
    searchHits.foreach { searchHit =>
      println(s"${searchHit.id} -> ${searchHit.sourceAsMap}")
    }
}

The structure of the result is a bit complex. In particular, in order to get to the actual results, we have to access response.result.hits.hits. This will give us an Array[SearchHit], where SearchHit contains various useful information regarding the documents, such as the ID and the fields in the document.

In the example above, we’re printing the ID and the fields of the document represented as a Map. Hence, running the example above will print the following lines:

27P-sn0BEtaLvWE5iFPY -> Map(username -> thomas, when -> 2021-12-13T08:52:45.383498Z, ip -> 192.168.197.123, action -> GetArticles)
3LP-sn0BEtaLvWE5iVMO -> Map(username -> robert, when -> 2021-12-13T08:52:45.449883Z, ip -> 192.168.197.103, action -> DeleteArticle)

3.6. Deleting Documents and Indexes

Lastly, let’s see how to delete documents from an index:

client.execute {
  deleteByQuery(
    "activities",
    termQuery("username", "robert")
  ).refreshImmediately
}

In this case, we deleted all the documents where the field username was set to robert. DeleteApi::deleteByQuery() allows us to filter the documents in an index and to delete only those that match a given condition.

Alternatively, if we knew the ID of the document to delete, we could call DeleteApi::deleteById(). Again, in order to refresh immediately the documents in the index, we composed DeleteApi::deleteByQuery() with refreshImmediately.

If we now print all the documents in the index as above, we’ll get only that of thomas:

27P-sn0BEtaLvWE5iFPY -> Map(username -> thomas, when -> 2021-12-13T08:52:45.383498Z, ip -> 192.168.197.123, action -> GetArticles)

DeleteIndexApi::deleteIndex() lets us delete an entire index, including all of its documents:

client.execute {
  deleteIndex("activities")
}

By specifying _all as an index name, we can delete all the indexes in the cluster.

4. Conclusion

In this article, we saw how to use elastic4s to work with Elasticsearch in Scala. We took a look at various operations, from creating indexes to writing documents in them and from searching to deleting documents. We also took a look at how Futures can be used as a simple way to invoke such operations on a real Elasticsearch cluster.

As usual, the code is available over on GitHub.

2 Comments
Oldest
Newest
Inline Feedbacks
View all comments
Comments are closed on this article!