Authors Top

We’re starting a new Scala area. If you have a few years of experience in the Scala ecosystem, and you’re interested in sharing that experience with the community, have a look at our Contribution Guidelines.

1. Overview

Server-sent Events (SSE) are a powerful tool for sending data streams from a server to a client.

In this article, we’ll explore how to receive SSE using Alpakka.

2. The akka-stream-alpakka-sse Library

The first thing we need to do is to add the akka-stream-alpakka-sse library to our dependencies. We’ll also need akka-stream and akka-http libraries to achieve communication with a server:

libraryDependencies ++= Seq(
  "com.lightbend.akka" %% "akka-stream-alpakka-sse" % "3.0.4",
  "com.typesafe.akka" %% "akka-stream" % "2.6.18",
  "com.typesafe.akka" %% "akka-http" % "10.2.7"
)

For our example, we’re going to use Wikimedia’s EventStreams. Then, we’ll create a URI to get the stream of data and a function that will consume it using the HTTP protocol. Since this is an Akka application, all our functions are going to need an implicit argument of ActorSystem type:

val wikiMediaUri: Uri = Uri("https://stream.wikimedia.org/v2/stream/recentchange")
def sendRequest(request: HttpRequest)(implicit actorSystem: ActorSystem) =
  Http().singleRequest(request)

After we’ve defined these arguments, we need to define our event source:

def eventSource(implicit actorSystem: ActorSystem) =
  EventSource(
    uri = wikiMediaUri,
    send = sendRequest,
    initialLastEventId = None,
    retryDelay = 1.second
  )

What we’re actually defining in this function is a Source that will consume from the given URI, using the provided send function. Wikimedia will provide many events per second, so we’ve added a delay of one second before consuming the next event.

Finally, the only thing left is to define a RunnableGraph to consume our events:

def eventsGraph(nrOfSamples: Int)(implicit actorSystem: ActorSystem) =
  eventSource
    .throttle(
      elements = 1,
      per = 1.second,
      maximumBurst = 1,
      ThrottleMode.Shaping
    )
    .take(nrOfSamples)
    .toMat(Sink.seq)(Keep.right)

Here, we’ve defined a RunnableGraph, also adding throttling and keeping a certain number of elements from our Source. After that, we send the elements to a Sink, keeping the materialized value for further processing if needed. The only thing left to do is to run our graph using the run() function.

3. Conclusion

In this article, we’ve demonstrated using the akka-stream-alpakka-sse library to consume Server-sent Events using the HTTP protocol. As always, the code is available over on GitHub.

Authors Bottom

We’re starting a new Scala area. If you have a few years of experience in the Scala ecosystem, and you’re interested in sharing that experience with the community, have a look at our Contribution Guidelines.

Comments are closed on this article!