Partner – Microsoft – NPI EA (cat = Baeldung)
announcement - icon

Azure Container Apps is a fully managed serverless container service that enables you to build and deploy modern, cloud-native Java applications and microservices at scale. It offers a simplified developer experience while providing the flexibility and portability of containers.

Of course, Azure Container Apps has really solid support for our ecosystem, from a number of build options, managed Java components, native metrics, dynamic logger, and quite a bit more.

To learn more about Java features on Azure Container Apps, visit the documentation page.

You can also ask questions and leave feedback on the Azure Container Apps GitHub page.

Partner – Microsoft – NPI EA (cat= Spring Boot)
announcement - icon

Azure Container Apps is a fully managed serverless container service that enables you to build and deploy modern, cloud-native Java applications and microservices at scale. It offers a simplified developer experience while providing the flexibility and portability of containers.

Of course, Azure Container Apps has really solid support for our ecosystem, from a number of build options, managed Java components, native metrics, dynamic logger, and quite a bit more.

To learn more about Java features on Azure Container Apps, you can get started over on the documentation page.

And, you can also ask questions and leave feedback on the Azure Container Apps GitHub page.

Partner – Orkes – NPI EA (cat=Spring)
announcement - icon

Modern software architecture is often broken. Slow delivery leads to missed opportunities, innovation is stalled due to architectural complexities, and engineering resources are exceedingly expensive.

Orkes is the leading workflow orchestration platform built to enable teams to transform the way they develop, connect, and deploy applications, microservices, AI agents, and more.

With Orkes Conductor managed through Orkes Cloud, developers can focus on building mission critical applications without worrying about infrastructure maintenance to meet goals and, simply put, taking new products live faster and reducing total cost of ownership.

Try a 14-Day Free Trial of Orkes Conductor today.

Partner – Orkes – NPI EA (tag=Microservices)
announcement - icon

Modern software architecture is often broken. Slow delivery leads to missed opportunities, innovation is stalled due to architectural complexities, and engineering resources are exceedingly expensive.

Orkes is the leading workflow orchestration platform built to enable teams to transform the way they develop, connect, and deploy applications, microservices, AI agents, and more.

With Orkes Conductor managed through Orkes Cloud, developers can focus on building mission critical applications without worrying about infrastructure maintenance to meet goals and, simply put, taking new products live faster and reducing total cost of ownership.

Try a 14-Day Free Trial of Orkes Conductor today.

eBook – Guide Spring Cloud – NPI EA (cat=Spring Cloud)
announcement - icon

Let's get started with a Microservice Architecture with Spring Cloud:

>> Join Pro and download the eBook

eBook – Mockito – NPI EA (tag = Mockito)
announcement - icon

Mocking is an essential part of unit testing, and the Mockito library makes it easy to write clean and intuitive unit tests for your Java code.

Get started with mocking and improve your application tests using our Mockito guide:

Download the eBook

eBook – Java Concurrency – NPI EA (cat=Java Concurrency)
announcement - icon

Handling concurrency in an application can be a tricky process with many potential pitfalls. A solid grasp of the fundamentals will go a long way to help minimize these issues.

Get started with understanding multi-threaded applications with our Java Concurrency guide:

>> Download the eBook

eBook – Reactive – NPI EA (cat=Reactive)
announcement - icon

Spring 5 added support for reactive programming with the Spring WebFlux module, which has been improved upon ever since. Get started with the Reactor project basics and reactive programming in Spring Boot:

>> Join Pro and download the eBook

eBook – Java Streams – NPI EA (cat=Java Streams)
announcement - icon

Since its introduction in Java 8, the Stream API has become a staple of Java development. The basic operations like iterating, filtering, mapping sequences of elements are deceptively simple to use.

But these can also be overused and fall into some common pitfalls.

To get a better understanding on how Streams work and how to combine them with other language features, check out our guide to Java Streams:

>> Join Pro and download the eBook

eBook – Jackson – NPI EA (cat=Jackson)
announcement - icon

Do JSON right with Jackson

Download the E-book

eBook – HTTP Client – NPI EA (cat=Http Client-Side)
announcement - icon

Get the most out of the Apache HTTP Client

Download the E-book

eBook – Maven – NPI EA (cat = Maven)
announcement - icon

Get Started with Apache Maven:

Download the E-book

eBook – Persistence – NPI EA (cat=Persistence)
announcement - icon

Working on getting your persistence layer right with Spring?

Explore the eBook

eBook – RwS – NPI EA (cat=Spring MVC)
announcement - icon

Building a REST API with Spring?

Download the E-book

Course – LS – NPI EA (cat=Jackson)
announcement - icon

Get started with Spring and Spring Boot, through the Learn Spring course:

>> LEARN SPRING
Course – RWSB – NPI EA (cat=REST)
announcement - icon

Explore Spring Boot 3 and Spring 6 in-depth through building a full REST API with the framework:

>> The New “REST With Spring Boot”

Course – LSS – NPI EA (cat=Spring Security)
announcement - icon

Yes, Spring Security can be complex, from the more advanced functionality within the Core to the deep OAuth support in the framework.

I built the security material as two full courses - Core and OAuth, to get practical with these more complex scenarios. We explore when and how to use each feature and code through it on the backing project.

You can explore the course here:

>> Learn Spring Security

Course – LSD – NPI EA (tag=Spring Data JPA)
announcement - icon

Spring Data JPA is a great way to handle the complexity of JPA with the powerful simplicity of Spring Boot.

Get started with Spring Data JPA through the guided reference course:

>> CHECK OUT THE COURSE

Partner – MongoDB – NPI EA (tag=MongoDB)
announcement - icon

Traditional keyword-based search methods rely on exact word matches, often leading to irrelevant results depending on the user's phrasing.

By comparison, using a vector store allows us to represent the data as vector embeddings, based on meaningful relationships. We can then compare the meaning of the user’s query to the stored content, and retrieve more relevant, context-aware results.

Explore how to build an intelligent chatbot using MongoDB Atlas, Langchain4j and Spring Boot:

>> Building an AI Chatbot in Java With Langchain4j and MongoDB Atlas

Partner – LambdaTest – NPI EA (cat=Testing)
announcement - icon

Accessibility testing is a crucial aspect to ensure that your application is usable for everyone and meets accessibility standards that are required in many countries.

By automating these tests, teams can quickly detect issues related to screen reader compatibility, keyboard navigation, color contrast, and other aspects that could pose a barrier to using the software effectively for people with disabilities.

Learn how to automate accessibility testing with Selenium and the LambdaTest cloud-based testing platform that lets developers and testers perform accessibility automation on over 3000+ real environments:

Automated Accessibility Testing With Selenium

eBook – Reactive – NPI(cat= Reactive)
announcement - icon

Spring 5 added support for reactive programming with the Spring WebFlux module, which has been improved upon ever since. Get started with the Reactor project basics and reactive programming in Spring Boot:

>> Join Pro and download the eBook

1. Overview

In this article, we will look at the way the RxJava library helps us to handle backpressure.

Simply put – RxJava utilizes a concept of reactive streams by introducing Observables, to which one or many Observers can subscribe to. Dealing with possibly infinite streams is very challenging, as we need to face a problem of a backpressure.

It’s not difficult to get into a situation in which an Observable is emitting items more rapidly than a subscriber can consume them. We will look at the different solutions to the problem of growing buffer of unconsumed items.

2. Hot Observables Versus Cold Observables

First, let’s create a simple consumer function that will be used as a consumer of elements from Observables that we will define later:

public class ComputeFunction {
    public static void compute(Integer v) {
        try {
            System.out.println("compute integer v: " + v);
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Our compute() function is simply printing the argument. The important thing to notice here is an invocation of a Thread.sleep(1000) method – we are doing it to emulate some long running task that will cause Observable to fill up with items quicker that Observer can consume them.

We have two types of the Observables – Hot and Cold – that are totally different when it comes to a backpressure handling.

2.1. Cold Observables

A cold Observable emits a particular sequence of items but can begin emitting this sequence when its Observer finds it to be convenient, and at whatever rate the Observer desires, without disrupting the integrity of the sequence. Cold Observable is providing items in a lazy way.

The Observer is taking elements only when it is ready to process that item, and items do not need to be buffered in an Observable because they are requested in a pull fashion.

For example, if you create an Observable based on a static range of elements from one to one million, that Observable would emit the same sequence of items no matter how frequently those items are observed:

Observable.range(1, 1_000_000)
  .observeOn(Schedulers.computation())
  .subscribe(ComputeFunction::compute);

When we start our program, items will be computed by Observer lazily and will be requested in a pull fashion. The Schedulers.computation() method means that we want to run our Observer within a computation thread pool in RxJava.

The output of a program will consist of a result of a compute() method invoked for one by one item from an Observable:

compute integer v: 1
compute integer v: 2
compute integer v: 3
compute integer v: 4
...

Cold Observables do not need to have any form of a backpressure because they work in a pull fashion. Examples of items emitted by a cold Observable might include the results of a database query, file retrieval, or web request.

2.2. Hot Observables

A hot Observable begins generating items and emits them immediately when they are created. It is contrary to a Cold Observables pull model of processing. Hot Observable emits items at its own pace, and it is up to its observers to keep up.

When the Observer is not able to consume items as quickly as they are produced by an Observable they need to be buffered or handled in some other way, as they will fill up the memory, finally causing OutOfMemoryException.

Let’s consider an example of hot Observable, that is producing a 1 million items to an end consumer that is processing those items. When a compute() method in the Observer takes some time to process every item, the Observable is starting to fill up a memory with items, causing a program to fail:

PublishSubject<Integer> source = PublishSubject.<Integer>create();

source.observeOn(Schedulers.computation())
  .subscribe(ComputeFunction::compute, Throwable::printStackTrace);

IntStream.range(1, 1_000_000).forEach(source::onNext);

Running that program will fail with a MissingBackpressureException because we didn’t define a way of handling overproducing Observable.

Examples of items emitted by a hot Observable might include mouse & keyboard events, system events, or stock prices.

3. Buffering Overproducing Observable

The first way to handle overproducing Observable is to define some kind of a buffer for elements that cannot be processed by an Observer.

We can do it by calling a buffer() method:

PublishSubject<Integer> source = PublishSubject.<Integer>create();
        
source.buffer(1024)
  .observeOn(Schedulers.computation())
  .subscribe(ComputeFunction::compute, Throwable::printStackTrace);

Defining a buffer with a size of 1024 will give an Observer some time to catch up to an overproducing source. The buffer will store items that were not yet processed.

We can increase a buffer size to have enough room for produced values.

Note however that generally, this may be only a temporary fix as the overflow can still happen if the source overproduces the predicted buffer size.

4. Batching Emitted Items

We can batch overproduced items in windows of N elements.

When Observable is producing elements quicker than Observer can process them, we can alleviate this by grouping produced elements together and sending a batch of elements to Observer that is able to process a collection of elements instead of element one by one:

PublishSubject<Integer> source = PublishSubject.<Integer>create();

source.window(500)
  .observeOn(Schedulers.computation())
  .subscribe(ComputeFunction::compute, Throwable::printStackTrace);

Using window() method with argument 500, will tell Observable to group elements into the 500-sized batches. This technique can reduce a problem of overproducing Observable when Observer is able to process a batch of elements quicker comparing to processing elements one by one.

5. Skipping Elements

If some of the values produced by Observable can be safely ignored, we can use the sampling within a specific time and throttling operators.

The methods sample() and throttleFirst() are taking duration as a parameter:

  • The sample() method periodically looks into the sequence of elements and emits the last item that was produced within the duration specified as a parameter
  • The throttleFirst() method emits the first item that was produced after the duration specified as a parameter

The duration is a time after which one specific element is picked from the sequence of produced elements. We can specify a strategy for handling backpressure by skipping elements:

PublishSubject<Integer> source = PublishSubject.<Integer>create();

source.sample(100, TimeUnit.MILLISECONDS)
  .observeOn(Schedulers.computation())
  .subscribe(ComputeFunction::compute, Throwable::printStackTrace);

We specified that strategy of skipping elements will be a sample() method. We want a sample of a sequence from 100 milliseconds duration. That element will be emitted to the Observer.

Remember, however, that these operators only reduce the rate of value reception by the downstream Observer and thus they may still lead to MissingBackpressureException.

6. Handling a Filling Observable Buffer

In case that our strategies of sampling or batching elements do not help with filling up a buffer, we need to implement a strategy of handling cases when a buffer is filling up.

We need to use an onBackpressureBuffer() method to prevent BufferOverflowException.

The onBackpressureBuffer() method takes three arguments: a capacity of an Observable buffer, a method that is invoked when a buffer is filling up, and a strategy for handling elements that need to be discarded from a buffer. Strategies for overflow are in a BackpressureOverflow class.

There are 4 types of actions that can be executed when the buffer fills up:

  • ON_OVERFLOW_ERROR – this is the default behavior signaling a BufferOverflowException when the buffer is full
  • ON_OVERFLOW_DEFAULT – currently it is the same as ON_OVERFLOW_ERROR
  • ON_OVERFLOW_DROP_LATEST – if an overflow would happen, the current value will be simply ignored and only the old values will be delivered once the downstream Observer requests
  • ON_OVERFLOW_DROP_OLDEST – drops the oldest element in the buffer and adds the current value to it

Let’s see how to specify that strategy:

Observable.range(1, 1_000_000)
  .onBackpressureBuffer(16, () -> {}, BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST)
  .observeOn(Schedulers.computation())
  .subscribe(e -> {}, Throwable::printStackTrace);

Here our strategy for handling the overflowing buffer is dropping the oldest element in a buffer and adding newest item produced by an Observable.

Note that the last two strategies cause a discontinuity in the stream as they drop out elements. In addition, they won’t signal BufferOverflowException.

7. Dropping All Overproduced Elements

Whenever the downstream Observer is not ready to receive an element, we can use an onBackpressureDrop() method to drop that element from the sequence.

We can think of that method as an onBackpressureBuffer() method with a capacity of a buffer set to zero with a strategy ON_OVERFLOW_DROP_LATEST.

This operator is useful when we can safely ignore values from a source Observable (such as mouse moves or current GPS location signals) as there will be more up-to-date values later on:

Observable.range(1, 1_000_000)
  .onBackpressureDrop()
  .observeOn(Schedulers.computation())
  .doOnNext(ComputeFunction::compute)
  .subscribe(v -> {}, Throwable::printStackTrace);

The method onBackpressureDrop() is eliminating a problem of overproducing Observable but needs to be used with caution.

8. Conclusion

In this article, we looked at a problem of overproducing Observable and ways of dealing with a backpressure. We looked at strategies of buffering, batching and skipping elements when the Observer is not able to consume elements as quickly as they are produced by an Observable.

The code backing this article is available on GitHub. Once you're logged in as a Baeldung Pro Member, start learning and coding on the project.
Baeldung Pro – NPI EA (cat = Baeldung)
announcement - icon

Baeldung Pro comes with both absolutely No-Ads as well as finally with Dark Mode, for a clean learning experience:

>> Explore a clean Baeldung

Once the early-adopter seats are all used, the price will go up and stay at $33/year.

Partner – Microsoft – NPI EA (cat = Baeldung)
announcement - icon

Azure Container Apps is a fully managed serverless container service that enables you to build and deploy modern, cloud-native Java applications and microservices at scale. It offers a simplified developer experience while providing the flexibility and portability of containers.

Of course, Azure Container Apps has really solid support for our ecosystem, from a number of build options, managed Java components, native metrics, dynamic logger, and quite a bit more.

To learn more about Java features on Azure Container Apps, visit the documentation page.

You can also ask questions and leave feedback on the Azure Container Apps GitHub page.

Partner – Microsoft – NPI EA (cat = Spring Boot)
announcement - icon

Azure Container Apps is a fully managed serverless container service that enables you to build and deploy modern, cloud-native Java applications and microservices at scale. It offers a simplified developer experience while providing the flexibility and portability of containers.

Of course, Azure Container Apps has really solid support for our ecosystem, from a number of build options, managed Java components, native metrics, dynamic logger, and quite a bit more.

To learn more about Java features on Azure Container Apps, visit the documentation page.

You can also ask questions and leave feedback on the Azure Container Apps GitHub page.

Partner – Orkes – NPI EA (cat = Spring)
announcement - icon

Modern software architecture is often broken. Slow delivery leads to missed opportunities, innovation is stalled due to architectural complexities, and engineering resources are exceedingly expensive.

Orkes is the leading workflow orchestration platform built to enable teams to transform the way they develop, connect, and deploy applications, microservices, AI agents, and more.

With Orkes Conductor managed through Orkes Cloud, developers can focus on building mission critical applications without worrying about infrastructure maintenance to meet goals and, simply put, taking new products live faster and reducing total cost of ownership.

Try a 14-Day Free Trial of Orkes Conductor today.

Partner – Orkes – NPI EA (tag = Microservices)
announcement - icon

Modern software architecture is often broken. Slow delivery leads to missed opportunities, innovation is stalled due to architectural complexities, and engineering resources are exceedingly expensive.

Orkes is the leading workflow orchestration platform built to enable teams to transform the way they develop, connect, and deploy applications, microservices, AI agents, and more.

With Orkes Conductor managed through Orkes Cloud, developers can focus on building mission critical applications without worrying about infrastructure maintenance to meet goals and, simply put, taking new products live faster and reducing total cost of ownership.

Try a 14-Day Free Trial of Orkes Conductor today.

eBook – HTTP Client – NPI EA (cat=HTTP Client-Side)
announcement - icon

The Apache HTTP Client is a very robust library, suitable for both simple and advanced use cases when testing HTTP endpoints. Check out our guide covering basic request and response handling, as well as security, cookies, timeouts, and more:

>> Download the eBook

eBook – Java Concurrency – NPI EA (cat=Java Concurrency)
announcement - icon

Handling concurrency in an application can be a tricky process with many potential pitfalls. A solid grasp of the fundamentals will go a long way to help minimize these issues.

Get started with understanding multi-threaded applications with our Java Concurrency guide:

>> Download the eBook

eBook – Java Streams – NPI EA (cat=Java Streams)
announcement - icon

Since its introduction in Java 8, the Stream API has become a staple of Java development. The basic operations like iterating, filtering, mapping sequences of elements are deceptively simple to use.

But these can also be overused and fall into some common pitfalls.

To get a better understanding on how Streams work and how to combine them with other language features, check out our guide to Java Streams:

>> Join Pro and download the eBook

eBook – Persistence – NPI EA (cat=Persistence)
announcement - icon

Working on getting your persistence layer right with Spring?

Explore the eBook

Partner – MongoDB – NPI EA (tag=MongoDB)
announcement - icon

Traditional keyword-based search methods rely on exact word matches, often leading to irrelevant results depending on the user's phrasing.

By comparison, using a vector store allows us to represent the data as vector embeddings, based on meaningful relationships. We can then compare the meaning of the user’s query to the stored content, and retrieve more relevant, context-aware results.

Explore how to build an intelligent chatbot using MongoDB Atlas, Langchain4j and Spring Boot:

>> Building an AI Chatbot in Java With Langchain4j and MongoDB Atlas

Course – LS – NPI EA (cat=REST)

announcement - icon

Get started with Spring Boot and with core Spring, through the Learn Spring course:

>> CHECK OUT THE COURSE

eBook Jackson – NPI EA – 3 (cat = Jackson)