Java Top

Get started with Spring 5 and Spring Boot 2, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Overview

gRPC is a platform to do inter-process Remote Procedure Calls (RPC). It's highly performant and can run in any environment.

In this tutorial, we'll focus on gRPC error handling using Java. gRPC has very low latency and high throughput, so it's ideal to use in complex environments like microservice architectures. In these systems, it's critical to have a good understanding of the state, performance, and failures of the different components of the network. Therefore, a good error handling implementation is critical to help us achieve the previous goals.

2. Basics of Error Handling in gRPC

Errors in gRPC are first-class entities, i.e., every call in gRPC is either a payload message or a status error message.

The errors are codified in status messages and implemented across all supported languages.

In general, we should not include errors in the response payload. To that end, always use StreamObserver::OnError, which internally adds the status error to the trailing headers. The only exception, as we'll see below, is when we're working with streams.

All client or server gRPC libraries support the official gRPC error model. Java encapsulates this error model with the class io.grpc.Status. This class requires a standard error status code and an optional string error message to provide additional information. This error model has the advantage that it is supported independently of the data encoding used (protocol buffers, REST, etc.). However, it is pretty limited since we cannot include error details with the status.

If your gRPC application implements protocol buffers for data encoding, then you can use the richer error model for Google APIs. The com.google.rpc.Status class encapsulates this error model. This class provides com.google.rpc.Code values, an error message, and additional error details are appended as protobuf messages. Additionally, we can utilize a predefined set of protobuf error messages, defined in error_details.proto that cover the most common cases.  In the package com.google.rpc we have the classes: RetryInfo, DebugInfo, QuotaFailure, ErrorInfo, PrecondicionFailure, BadRequest, RequestInfo, ResourceInfo, and Help that encapsulate all the error messages in error_details.proto.

In addition to the two error models, we can define custom error messages that can be added as key-value pairs to the RPC metadata.

We're going to write a very simple application to show how to use these error models with a pricing service where the client sends commodity names, and the server provides pricing values.

3. Unary RPC Calls

Let's start considering the following service interface defined in commodity_price.proto:

service CommodityPriceProvider {
    rpc getBestCommodityPrice(Commodity) returns (CommodityQuote) {}
}

message Commodity {
    string access_token = 1;
    string commodity_name = 2;
}

message CommodityQuote {
    string commodity_name = 1;
    string producer_name = 2;
    double price = 3;
}

message ErrorResponse {
    string commodity_name = 1;
    string access_token = 2;
    string expected_token = 3;
    string expected_value = 4;
}

The input of the service is a Commodity message. In the request, the client has to provide an access_token and a commodity_name.

The server responds synchronously with a CommodityQuote that states the comodity_name, producer_name, and the associated price for the Commodity.

For illustration purposes, we also define a custom ErrorResponse.  This is an example of a custom error message that we'll send to the client as metadata.

3.1. Response Using io.grpc.Status

In the server's service call, we check the request for a valid Commodity:

public void getBestCommodityPrice(Commodity request, StreamObserver<CommodityQuote> responseObserver) {

    if (commodityLookupBasePrice.get(request.getCommodityName()) == null) {
 
        Metadata.Key<ErrorResponse> errorResponseKey = ProtoUtils.keyForProto(ErrorResponse.getDefaultInstance());
        ErrorResponse errorResponse = ErrorResponse.newBuilder()
          .setCommodityName(request.getCommodityName())
          .setAccessToken(request.getAccessToken())
          .setExpectedValue("Only Commodity1, Commodity2 are supported")
          .build();
        Metadata metadata = new Metadata();
        metadata.put(errorResponseKey, errorResponse);
        responseObserver.onError(io.grpc.Status.INVALID_ARGUMENT.withDescription("The commodity is not supported")
          .asRuntimeException(metadata));
    } 
    // ...
}

In this simple example, we return an error if the Commodity doesn't exist in the commodityLookupBasePrice HashTable.

First, we build a custom ErrorResponse and create a key-value pair which we add to the metadata in metadata.put(errorResponseKey, errorResponse).

We use io.grpc.Status to specify the error status. The function responseObserver::onError takes a Throwable as a parameter, so we use asRuntimeException(metadata) to convert the Status into a Throwable. asRuntimeException can optionally take a Metadata parameter (in our case, an ErrorResponse key-value pair), which adds to the trailers of the message.

If the client makes an invalid request, it will get back an exception:

@Test
public void whenUsingInvalidCommodityName_thenReturnExceptionIoRpcStatus() throws Exception {
 
    Commodity request = Commodity.newBuilder()
      .setAccessToken("123validToken")
      .setCommodityName("Commodity5")
      .build();

    StatusRuntimeException thrown = Assertions.assertThrows(StatusRuntimeException.class, () -> blockingStub.getBestCommodityPrice(request));

    assertEquals("INVALID_ARGUMENT", thrown.getStatus().getCode().toString());
    assertEquals("INVALID_ARGUMENT: The commodity is not supported", thrown.getMessage());
    Metadata metadata = Status.trailersFromThrowable(thrown);
    ErrorResponse errorResponse = metadata.get(ProtoUtils.keyForProto(ErrorResponse.getDefaultInstance()));
    assertEquals("Commodity5",errorResponse.getCommodityName());
    assertEquals("123validToken", errorResponse.getAccessToken());
    assertEquals("Only Commodity1, Commodity2 are supported", errorResponse.getExpectedValue());
}

The call to blockingStub::getBestCommodityPrice throws a StatusRuntimeExeption since the request has an invalid commodity name.

We use Status::trailerFromThrowable to access the metadata. ProtoUtils::keyForProto gives us the metadata key of ErrorResponse.

3.2. Response Using com.google.rpc.Status

Let's consider the following server code example:

public void getBestCommodityPrice(Commodity request, StreamObserver<CommodityQuote> responseObserver) {
    // ...
    if (request.getAccessToken().equals("123validToken") == false) {

        com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
          .setCode(com.google.rpc.Code.NOT_FOUND.getNumber())
          .setMessage("The access token not found")
          .addDetails(Any.pack(ErrorInfo.newBuilder()
            .setReason("Invalid Token")
            .setDomain("com.baeldung.grpc.errorhandling")
            .putMetadata("insertToken", "123validToken")
            .build()))
          .build();
        responseObserver.onError(StatusProto.toStatusRuntimeException(status));
    }
    // ...
}

In the implementation, getBestCommodityPrice returns an error if the request doesn't have a valid token.

Moreover, we set the status code, message, and details to com.google.rpc.Status.

In this example, we're using the predefined com.google.rpc.ErrorInfo instead of our custom ErrorDetails (although we could have used both if needed). We serialize ErrorInfo using Any::pack().

The class StatusProto::toStatusRuntimeException converts the com.google.rpc.Status into a Throwable.

In principle, we could also add other messages defined in error_details.proto to further customized the response.

The client implementation is straightforward:

@Test
public void whenUsingInvalidRequestToken_thenReturnExceptionGoogleRPCStatus() throws Exception {
 
    Commodity request = Commodity.newBuilder()
      .setAccessToken("invalidToken")
      .setCommodityName("Commodity1")
      .build();

    StatusRuntimeException thrown = Assertions.assertThrows(StatusRuntimeException.class,
      () -> blockingStub.getBestCommodityPrice(request));
    com.google.rpc.Status status = StatusProto.fromThrowable(thrown);
    assertNotNull(status);
    assertEquals("NOT_FOUND", Code.forNumber(status.getCode()).toString());
    assertEquals("The access token not found", status.getMessage());
    for (Any any : status.getDetailsList()) {
        if (any.is(ErrorInfo.class)) {
            ErrorInfo errorInfo = any.unpack(ErrorInfo.class);
            assertEquals("Invalid Token", errorInfo.getReason());
            assertEquals("com.baeldung.grpc.errorhandling", errorInfo.getDomain());
            assertEquals("123validToken", errorInfo.getMetadataMap().get("insertToken"));
        }
    }
}

StatusProto.fromThrowable is a utility method to get the com.google.rpc.Status directly from the exception.

From status::getDetailsList we get the com.google.rpc.ErrorInfo details.

4. Errors with gRPC Streams

gRPC streams allow servers and clients to send multiple messages in a single RPC call.

In terms of error propagation, the approach that we have used so far is not valid with gRPC streams. The reason is that onError() has to be the last method invoked in the RPC because, after this call, the framework severs the communication between the client and server.

When we're using streams, this is not the desired behavior. Instead, we want to keep the connection open to respond to other messages that might come through the RPC.

A good solution to this problem is to add the error to the message itself, as we show in commodity_price.proto:

service CommodityPriceProvider {
  
    rpc getBestCommodityPrice(Commodity) returns (CommodityQuote) {}
  
    rpc bidirectionalListOfPrices(stream Commodity) returns (stream StreamingCommodityQuote) {}
}

message Commodity {
    string access_token = 1;
    string commodity_name = 2;
}

message StreamingCommodityQuote{
    oneof message{
        CommodityQuote comodity_quote = 1;
        google.rpc.Status status = 2;
   }   
}

The function bidirectionalListOfPrices returns a StreamingCommodityQuote. This message has the oneof keyword that signals that it can use either a CommodityQuote or a google.rpc.Status.

In the following example, if the client sends an invalid token, the server adds a status error to the body of the response:

public StreamObserver<Commodity> bidirectionalListOfPrices(StreamObserver<StreamingCommodityQuote> responseObserver) {

    return new StreamObserver<Commodity>() {
        @Override
        public void onNext(Commodity request) {

            if (request.getAccessToken().equals("123validToken") == false) {

                com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
                  .setCode(Code.NOT_FOUND.getNumber())
                  .setMessage("The access token not found")
                  .addDetails(Any.pack(ErrorInfo.newBuilder()
                    .setReason("Invalid Token")
                    .setDomain("com.baeldung.grpc.errorhandling")
                    .putMetadata("insertToken", "123validToken")
                    .build()))
                  .build();
                StreamingCommodityQuote streamingCommodityQuote = StreamingCommodityQuote.newBuilder()
                  .setStatus(status)
                  .build();
                responseObserver.onNext(streamingCommodityQuote);
            }
            // ...
        }
    }
}

The code creates an instance of com.google.rpc.Status and adds it to the StreamingCommodityQuote response message. It does not invoke onError(), so the framework does not interrupt the connection with the client.

Let's look at the client implementation:

public void onNext(StreamingCommodityQuote streamingCommodityQuote) {

    switch (streamingCommodityQuote.getMessageCase()) {
        case COMODITY_QUOTE:
            CommodityQuote commodityQuote = streamingCommodityQuote.getComodityQuote();
            logger.info("RESPONSE producer:" + commodityQuote.getCommodityName() + " price:" + commodityQuote.getPrice());
            break;
        case STATUS:
            com.google.rpc.Status status = streamingCommodityQuote.getStatus();
            logger.info("Status code:" + Code.forNumber(status.getCode()));
            logger.info("Status message:" + status.getMessage());
            for (Any any : status.getDetailsList()) {
                if (any.is(ErrorInfo.class)) {
                    ErrorInfo errorInfo;
                    try {
                        errorInfo = any.unpack(ErrorInfo.class);
                        logger.info("Reason:" + errorInfo.getReason());
                        logger.info("Domain:" + errorInfo.getDomain());
                        logger.info("Insert Token:" + errorInfo.getMetadataMap().get("insertToken"));
                    } catch (InvalidProtocolBufferException e) {
                        logger.error(e.getMessage());
                    }
                }
            }
            break;
        // ...
    }
}

The client gets the returned message in onNext(StreamingCommodityQuote) and uses a switch statement to distinguish between a CommodityQuote or a com.google.rpc.Status.

5. Conclusion

In this tutorial, we have shown how to implement error handling in gRPC for unary and stream-based RPC calls.

gRPC is a great framework to use for remote communications in distributed systems. In these systems, it's important to have a very robust error handling implementation to help to monitor the system. This is even more critical in complex architectures like microservices.

The source code of the examples can be found over on GitHub.

Java bottom

Get started with Spring 5 and Spring Boot 2, through the Learn Spring course:

>> CHECK OUT THE COURSE
Generic footer banner
guest
0 Comments
Inline Feedbacks
View all comments