1. Overview

gRPC is a schema-first RPC framework that connects services by streaming requests and responses over an HTTP/2 connection. It has support for many programming languages. In this tutorial, we’ll explore how to use the Akka gRPC ecosystem to implement a gRPC service.

2. Akka gRPC

Akka gRPC uses Akka Streams and Akka HTTP to provide support for gRPC.

2.1. Setting up the Dependencies

First of all, we need to add the sbt-akka-grpc plugin in our project/plugins.sbt file:

addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "2.2.1")

Then, we need to enable it in our build.sbt file:

enablePlugins(AkkaGrpcPlugin)

2.2. Defining the .proto File

The next step is to define our protobuf definitions within a .proto file. We need to add the file within the src/main/protobuf folder of our project. For this example, we’re going to define a simple message exchange service with both request and response entities:

syntax = "proto3";

import "google/protobuf/timestamp.proto";

option java_package = "com.baeldung.scala.akka_2.grpc";

package akka2grpc;

message MessageRequest {
  string id = 1;
  string message = 2;
  optional google.protobuf.Timestamp timestamp = 3;
  repeated string extra_info = 4;
}

message MessageResponse {
  string id = 1;
  string response_message = 2;
  optional google.protobuf.Timestamp timestamp = 3;
  repeated string extra_info = 4;
}

service MessageExchangeService {

  rpc SendMessage(MessageRequest) returns (MessageResponse) {}

  rpc StreamMessagesSingleResponse(stream MessageRequest) returns (MessageResponse) {}

  rpc SendMessageStreamResponse(MessageRequest) returns (stream MessageResponse) {}

  rpc StreamMessages(stream MessageRequest) returns (stream MessageResponse) {}
}

In the file above, we’ve defined a MessageRequest that contains an id, a message, an optional field of type Timestamp, and a list of Strings, just to attach extra information if needed. The same definitions apply to the MessageResponse.

To define the Timestamp fields, we had to import the google/protobuf/timestamp.proto file, which is an internal file of gRPC and contains the definition. We’ve also defined the package of the generated classes, using the java_package option. The gRPC protocol supports various field types that we can use in our messages.

After the message definitions, we’ve defined our remote calls. The gRPC protocol allows us to use both simple and streaming requests and responses as well. In the example above, we’ve defined a simple request with a simple response, a streaming request with a simple response, a simple request with a streaming response, and a streaming request with a streaming response.

Finally, we need to compile our project to generate our service definition classes:

sbt compile

2.3. Creating the Server

Now that we have all the necessary classes to create our service, let’s create our server. First, we’ll define a service class that will extend our defined service:

class MessageExchangeServiceImpl(implicit val mat: Materializer)
  extends MessageExchangeService

After that, we can start implementing our calls. We’ll first create a helper method to create a current Timestamp, as defined by the Google library:

private def getTimestamp: Timestamp = {
  val time = Instant.now()
  Timestamp.of(time.getEpochSecond, time.getNano)
}

Then, we can continue by implementing the simplest of our calls:

override def sendMessage(
  receivingMessage: MessageRequest
): Future[MessageResponse] = {
    val response = MessageResponse(
      id = UUID.randomUUID().toString,
      responseMessage = s"Responding to ${receivingMessage.message}",
      timestamp = Some(getTimestamp),
      extraInfo = receivingMessage.extraInfo
    )
    Future.successful(response)
}

We generate a random id for our MessageResponse and a simple response, adding a current timestamp and the extraInfo field as passthrough parameters. Then, we return the response. No actual asynchronous computations occur, so we can use an already completed Future. Moving on with our implementations, we’ll define the streamMessageSingleResponse call:

override def streamMessagesSingleResponse(
  receivingMessageStream: Source[MessageRequest, NotUsed]
): Future[MessageResponse] =
    receivingMessageStream
      .runWith(Sink.seq)
      .map(messages =>
        MessageResponse(
          id = UUID.randomUUID().toString,
          responseMessage =
            s"Responding to stream ${messages.map(_.message).mkString(", ")}",
          timestamp = Some(getTimestamp),
          extraInfo = messages.flatMap(_.extraInfo)
        )
      )

We receive a Source of messages that we process and respond to with a single MessageResponse. Our responseMessage is a concatenation of all the messages of the Source, and the same applies to the extra info passthrough parameters. Following up, we continue by implementing the sendMessageStreamResponse call:

override def sendMessageStreamResponse(
  receivingMessage: MessageRequest
): Source[MessageResponse, NotUsed] =
    Source(
      List(
        MessageResponse(
          id = UUID.randomUUID().toString,
          responseMessage = s"Stream responding to ${receivingMessage.message}",
          timestamp = Some(getTimestamp),
          extraInfo = receivingMessage.extraInfo
        )
      )
    )

In this call, we expect to get a single message and then return a Source of MessageResponse. For our example, we return a simple List of one element wrapped in a Source. Finally, we have to implement the most interesting case, the streamMessages call:

override def streamMessages(
  receivingMessageStream: Source[MessageRequest, NotUsed]
): Source[MessageResponse, NotUsed] =
    receivingMessageStream.map(receivingMessage =>
      MessageResponse(
        id = UUID.randomUUID().toString,
        responseMessage = s"Stream responding to ${receivingMessage.message}",
        timestamp = Some(getTimestamp),
        extraInfo = receivingMessage.extraInfo
      )
    )

In this case, we receive a stream of requests, and for each MessageRequest contained in the stream, we respond with a MessageResponse using another stream. Our input is a Source and our output is the same. Now, we need to register our service in a service handler and bind it to an Akka HTTP server:

val service: HttpRequest => Future[HttpResponse] =
  MessageExchangeServiceHandler(new MessageExchangeServiceImpl())

def startServer: Future[Http.ServerBinding] =
  Http().newServerAt("127.0.0.1", 8090).bind(service)

With this function, we’ve started our HTTP server on port 8090. For our server to work, we need to enable HTTP/2 functionality in our application.conf file:

akka.http.server.preview.enable-http2 = on

2.4. Creating the Client

Now that we’ve implemented our server functionality, we need to create a client that will send messages to it. First of all, let’s add a definition to our configuration file:

akka.grpc.client {
  "akka2grpc.MessageExchangeService" {
    service-discovery.mechanism = "static"
    host = "localhost"
    port = 8090
    use-tls = false
  }
}

Under the akka.grpc.client namespace, we define the package and name of our service so that the client can find the necessary configuration. Then, we can create our client by loading and passing the configuration settings:

private val clientSettings: GrpcClientSettings =
  GrpcClientSettings.fromConfig(MessageExchangeService.name)

private val client = MessageExchangeServiceClient(clientSettings)

After that, we call the client to send the corresponding message for each call:

def sendSingleMessage(message: String): Future[MessageResponse] =
  client.sendMessage(
    MessageRequest(
      id = UUID.randomUUID().toString,
      message = message,
      timestamp = None,
      extraInfo = Seq.empty
    )
  )

def sendSingleMessageStreamResponse(
  message: String
): Source[MessageResponse, NotUsed] =
    client.sendMessageStreamResponse(
      MessageRequest(
        id = UUID.randomUUID().toString,
        message = message,
        timestamp = None,
        extraInfo = Seq.empty
      )
    )

def streamMessagesSingleResponse(
  messages: Source[String, NotUsed]
): Future[MessageResponse] =
    client.streamMessagesSingleResponse(
      messages.map(m =>
        MessageRequest(
          id = UUID.randomUUID().toString,
          message = m,
          timestamp = None,
          extraInfo = Seq.empty
        )
      )
    )

def streamMessages(
  messages: Source[String, NotUsed]
): Source[MessageResponse, NotUsed] =
    client.streamMessages(
      messages.map(m =>
        MessageRequest(
          id = UUID.randomUUID().toString,
          message = m,
          timestamp = None,
          extraInfo = Seq.empty
        )
      )
    )

Our client is ready to communicate with our server.

3. Conclusion

In this article, we’ve demonstrated how to define a gRPC service and implement both a server and a client. As always, the code is available over on GitHub.

Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.