eBook – Guide Spring Cloud – NPI EA (cat=Spring Cloud)
announcement - icon

Let's get started with a Microservice Architecture with Spring Cloud:

>> Join Pro and download the eBook

eBook – Mockito – NPI EA (tag = Mockito)
announcement - icon

Mocking is an essential part of unit testing, and the Mockito library makes it easy to write clean and intuitive unit tests for your Java code.

Get started with mocking and improve your application tests using our Mockito guide:

Download the eBook

eBook – Java Concurrency – NPI EA (cat=Java Concurrency)
announcement - icon

Handling concurrency in an application can be a tricky process with many potential pitfalls. A solid grasp of the fundamentals will go a long way to help minimize these issues.

Get started with understanding multi-threaded applications with our Java Concurrency guide:

>> Download the eBook

eBook – Reactive – NPI EA (cat=Reactive)
announcement - icon

Spring 5 added support for reactive programming with the Spring WebFlux module, which has been improved upon ever since. Get started with the Reactor project basics and reactive programming in Spring Boot:

>> Join Pro and download the eBook

eBook – Java Streams – NPI EA (cat=Java Streams)
announcement - icon

Since its introduction in Java 8, the Stream API has become a staple of Java development. The basic operations like iterating, filtering, mapping sequences of elements are deceptively simple to use.

But these can also be overused and fall into some common pitfalls.

To get a better understanding on how Streams work and how to combine them with other language features, check out our guide to Java Streams:

>> Join Pro and download the eBook

eBook – Jackson – NPI EA (cat=Jackson)
announcement - icon

Do JSON right with Jackson

Download the E-book

eBook – HTTP Client – NPI EA (cat=Http Client-Side)
announcement - icon

Get the most out of the Apache HTTP Client

Download the E-book

eBook – Maven – NPI EA (cat = Maven)
announcement - icon

Get Started with Apache Maven:

Download the E-book

eBook – Persistence – NPI EA (cat=Persistence)
announcement - icon

Working on getting your persistence layer right with Spring?

Explore the eBook

eBook – RwS – NPI EA (cat=Spring MVC)
announcement - icon

Building a REST API with Spring?

Download the E-book

Course – LS – NPI EA (cat=Jackson)
announcement - icon

Get started with Spring and Spring Boot, through the Learn Spring course:

>> LEARN SPRING
Course – RWSB – NPI EA (cat=REST)
announcement - icon

Explore Spring Boot 3 and Spring 6 in-depth through building a full REST API with the framework:

>> The New “REST With Spring Boot”

Course – LSS – NPI EA (cat=Spring Security)
announcement - icon

Yes, Spring Security can be complex, from the more advanced functionality within the Core to the deep OAuth support in the framework.

I built the security material as two full courses - Core and OAuth, to get practical with these more complex scenarios. We explore when and how to use each feature and code through it on the backing project.

You can explore the course here:

>> Learn Spring Security

Course – LSD – NPI EA (tag=Spring Data JPA)
announcement - icon

Spring Data JPA is a great way to handle the complexity of JPA with the powerful simplicity of Spring Boot.

Get started with Spring Data JPA through the guided reference course:

>> CHECK OUT THE COURSE

Partner – Moderne – NPI EA (cat=Spring Boot)
announcement - icon

Refactor Java code safely — and automatically — with OpenRewrite.

Refactoring big codebases by hand is slow, risky, and easy to put off. That’s where OpenRewrite comes in. The open-source framework for large-scale, automated code transformations helps teams modernize safely and consistently.

Each month, the creators and maintainers of OpenRewrite at Moderne run live, hands-on training sessions — one for newcomers and one for experienced users. You’ll see how recipes work, how to apply them across projects, and how to modernize code with confidence.

Join the next session, bring your questions, and learn how to automate the kind of work that usually eats your sprint time.

Partner – LambdaTest – NPI EA (cat=Testing)
announcement - icon

Regression testing is an important step in the release process, to ensure that new code doesn't break the existing functionality. As the codebase evolves, we want to run these tests frequently to help catch any issues early on.

The best way to ensure these tests run frequently on an automated basis is, of course, to include them in the CI/CD pipeline. This way, the regression tests will execute automatically whenever we commit code to the repository.

In this tutorial, we'll see how to create regression tests using Selenium, and then include them in our pipeline using GitHub Actions:, to be run on the LambdaTest cloud grid:

>> How to Run Selenium Regression Tests With GitHub Actions

Course – LJB – NPI EA (cat = Core Java)
announcement - icon

Code your way through and build up a solid, practical foundation of Java:

>> Learn Java Basics

1. Overview

In this tutorial, we’ll explore how the Kafka Consumer retrieves messages from the broker. We’ll learn the configurable properties that can directly impact how many messages the Kafka Consumer reads at once. Finally, we’ll explore how adjusting these settings affects the Consumer‘s behavior.

2. Setting up the Environment

Kafka Consumers are fetching records for a given partition in batches of configurable sizes. We cannot configure the exact number of records to be fetched in one batch, but we can configure the size of these batches, measured in bytes.

For the code snippets in this article, we’ll need a simple Spring application that uses the kafka-clients library to interact with the Kafka broker. We’ll create a Java class that internally uses a KafkaConsumer to subscribe to a topic and log the incoming messages. If you want to dive deeper, feel free to read through our article dedicated to the Kafka Consumer API and follow along.

One of the key differences in our example will be the logging: Instead of logging one message at a time, let’s collect them and log the whole batch. This way, we’ll be able to see exactly how many messages are fetched for each poll(). Additionally, let’s enrich the log by incorporating details like the initial and final offsets of the batch along with the consumer’s groupId:

class VariableFetchSizeKafkaListener implements Runnable {
    private final String topic;
    private final KafkaConsumer<String, String> consumer;
    
    // constructor

    @Override
    public void run() {
        consumer.subscribe(singletonList(topic));
        int pollCount = 1;

        while (true) {
            List<ConsumerRecord<String, String>> records = new ArrayList<>();
            for (var record : consumer.poll(ofMillis(500))) {
                records.add(record);
            }
            if (!records.isEmpty()) {
                String batchOffsets = String.format("%s -> %s", records.get(0).offset(), records.get(records.size() - 1).offset());
                String groupId = consumer.groupMetadata().groupId();
                log.info("groupId: {}, poll: #{}, fetched: #{} records, offsets: {}", groupId, pollCount++, records.size(), batchOffsets);
            }
        }
    }
}

The Testcontainers library will help us set up the test environment by spinning up a Docker container with a running Kafka broker. If you want to learn more about setting up the Testcontainer’s Kafka module, check out how we configured the test environment here and follow along.

In our particular case, we can define an additional method that publishes several messages on a given topic. For instance, let’s assume we are streaming values read by a temperature sensor to a topic named “engine.sensor.temperature“:

void publishTestData(int recordsCount, String topic) {
    List<ProducerRecord<String, String>> records = IntStream.range(0, recordsCount)
      .mapToObj(__ -> new ProducerRecord<>(topic, "key1", "temperature=255F"))
      .collect(toList());
    // publish all to kafka
}

As we can see, we have used the same key for all the messages. As a result, all records will be sent to the same partition. For payload, we’ve used a short, fixed text depicting a temperature measurement.

3. Testing the Default Behaviour

Let’s start by creating a Kafka listener using the default consumer configuration. Then, we’ll publish a few messages to see how many batches our listener consumes. As we can see, our custom listener uses the Consumer API internally. As a result, to instantiate VariableFetchSizeKafkaListener, we’ll have to configure and create a KafkaConsumer first:

Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "default_config");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);

For now, we’ll use KafkaConsumer‘s default values for the minimum and maximum fetch sizes. Based on this consumer, we can instantiate our listener and run it asynchronously to avoid blocking the main thread:

CompletableFuture.runAsync(
  new VariableFetchSizeKafkaListener(topic, kafkaConsumer)
);

Finally, let’s block the test thread for a few seconds, giving some time for the listener to consume the messages. The goal of this article is to start the listeners and observe how they perform. We’ll use the Junit5 tests as a convenient way of setting up and exploring their behavior, but for simplicity, we won’t include any specific assertions. As a result, this will be our starting @Test:

@Test
void whenUsingDefaultConfiguration_thenProcessInBatchesOf() throws Exception {
    String topic = "engine.sensors.temperature";
    publishTestData(300, topic);

    Properties props = new Properties();
    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "default_config");
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);

    CompletableFuture.runAsync(
      new VariableFetchSizeKafkaListener(topic, kafkaConsumer)
    );

    Thread.sleep(5_000L);
}

Now, let’s run the test and inspect the logs to see how many records will be fetched in a single batch:

10:48:46.958 [ForkJoinPool.commonPool-worker-2] INFO  c.b.k.c.VariableFetchSizeKafkaListener - groupId: default_config, poll: #1, fetched: #300 records, offsets: 0 -> 299

As we can notice, we fetched all the 300 records in a single batch because our messages are small. Both the key and body are short strings: the key is four characters, and the body is 16 characters long. That’s a total of 20 bytes, plus some extra for the record’s metadata. On the other hand, the default value for the maximum batch size is one mebibyte (1.024 x 1.024 bytes), or simply 1,048,576 bytes.

4. Configuring Maximum Partition Fetch Size

The “max.partition.fetch.bytes” in Kafka determines the largest amount of data that a consumer can fetch from a single partition in a single request. Consequently, even for a small number of short messages, we can force our listeners to fetch the records in multiple batches by changing the property.

To observe this, let’s create two moreVariableFetchSizeKafkaListeners and configure them setting this property to only 500B, respectively 5KB. Firstly, let’s extract all the common consumer Properties in a dedicated  method to avoid code duplication:

Properties commonConsumerProperties() {
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    return props;
}

Then, let’s create the first listener and run it asynchronously:

Properties fetchSize_500B = commonConsumerProperties();
fetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "max_fetch_size_500B");
fetchSize_500B.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "500");
CompletableFuture.runAsync(
  new VariableFetchSizeKafkaListener("engine.sensors.temperature", new KafkaConsumer<>(fetchSize_500B))
);

As we can see, we are setting different consumer group IDs for the various listeners. This will allow them to consume the same test data. Now, let’s proceed with the second listener and complete the test:

@Test
void whenChangingMaxPartitionFetchBytesProperty_thenAdjustBatchSizesWhilePolling() throws Exception {
    String topic = "engine.sensors.temperature";
    publishTestData(300, topic);
    
    Properties fetchSize_500B = commonConsumerProperties();
    fetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "max_fetch_size_500B");
    fetchSize_500B.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "500");
    CompletableFuture.runAsync(
      new VariableFetchSizeKafkaListener(topic, new KafkaConsumer<>(fetchSize_500B))
    );

    Properties fetchSize_5KB = commonConsumerProperties();
    fetchSize_5KB.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "max_fetch_size_5KB");
    fetchSize_5KB.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5000");
    CompletableFuture.runAsync(
      new VariableFetchSizeKafkaListener(topic, new KafkaConsumer<>(fetchSize_5KB))
    );

    Thread.sleep(10_000L);
}

If we run this test, we can assume that the first consumer will fetch batches roughly ten times smaller than the second consumer. Let’s analyze the logs:

[worker-3] INFO - groupId: max_fetch_size_5KB, poll: #1, fetched: #56 records, offsets: 0 -> 55
[worker-2] INFO - groupId: max_fetch_size_500B, poll: #1, fetched: #5 records, offsets: 0 -> 4
[worker-2] INFO - groupId: max_fetch_size_500B, poll: #2, fetched: #5 records, offsets: 5 -> 9
[worker-3] INFO - groupId: max_fetch_size_5KB, poll: #2, fetched: #56 records, offsets: 56 -> 111
[worker-2] INFO - groupId: max_fetch_size_500B, poll: #3, fetched: #5 records, offsets: 10 -> 14
[worker-3] INFO - groupId: max_fetch_size_5KB, poll: #3, fetched: #56 records, offsets: 112 -> 167
[worker-2] INFO - groupId: max_fetch_size_500B, poll: #4, fetched: #5 records, offsets: 15 -> 19
[worker-3] INFO - groupId: max_fetch_size_5KB, poll: #4, fetched: #51 records, offsets: 168 -> 218
[worker-2] INFO - groupId: max_fetch_size_500B, poll: #5, fetched: #5 records, offsets: 20 -> 24
[...]

As expected, one of the listeners fetches, indeed, batches of data that are almost ten times larger than the other. Moreover, it’s important to understand that the number of records within a batch depends on the size of these records and their metadata. To highlight this, we can observe that the consumer with groupId  “max_fetch_size_5KB” fetched fewer records when polling the fourth time.

5. Configuring Minimum Fetch Size

The Consumer API also allows customizing the minimum fetch size through the “fetch.min.bytes” property. We can change this property to specify the minimum amount of data that a broker needs to respond. If this minimum value is not met, the broker waits longer before sending a response to the consumer’s fetch request. To emphasize this, we can add a delay to our test publisher within our test helper method. As a result, the producer will wait a specific number of milliseconds between sending each message:

@Test
void whenChangingMinFetchBytesProperty_thenAdjustWaitTimeWhilePolling() throws Exception {
    String topic = "engine.sensors.temperature";
    publishTestData(300, topic, 100L);  
    // ...
}

void publishTestData(int measurementsCount, String topic, long delayInMillis) {
    // ...
}

Let’s start by creating a VariableFetchSizeKafkaListener that will use the default configuration, having “fetch.min.bytes” equal to one byte. Similar to the previous examples, we’ll run this consumer asynchronously within a CompletableFuture:

// fetch.min.bytes = 1 byte (default)
Properties minFetchSize_1B = commonConsumerProperties();
minFetchSize_1B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "min_fetch_size_1B");
CompletableFuture.runAsync(
  new VariableFetchSizeKafkaListener(topic, new KafkaConsumer<>(minFetchSize_1B))
);

With this setup, and due to the delay we introduced, we can expect each record to be retrieved individually, one after the other. In other words, we can expect many batches of a single record. Also, we expect these batches to be consumed at a similar speed as our KafkaProducer publishes the data, which in our case is every 100 milliseconds. Let’s run the test and analyze the logs:

14:23:22.368 [worker-2] INFO - groupId: min_fetch_size_1B, poll: #1, fetched: #1 records, offsets: 0 -> 0
14:23:22.472 [worker-2] INFO - groupId: min_fetch_size_1B, poll: #2, fetched: #1 records, offsets: 1 -> 1
14:23:22.582 [worker-2] INFO - groupId: min_fetch_size_1B, poll: #3, fetched: #1 records, offsets: 2 -> 2
14:23:22.689 [worker-2] INFO - groupId: min_fetch_size_1B, poll: #4, fetched: #1 records, offsets: 3 -> 3
[...]

Moreover, we can force the consumer to wait for more data to accumulate by adjusting the “fetch.min.bytes” value to a larger size:

// fetch.min.bytes = 500 bytes
Properties minFetchSize_500B = commonConsumerProperties();
minFetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "mim_fetch_size_500B");
minFetchSize_500B.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "500");
CompletableFuture.runAsync(
  new VariableFetchSizeKafkaListener(topic, new KafkaConsumer<>(minFetchSize_500B))
);

With the property set to 500 bytes, we can anticipate the consumer to wait longer and fetch more data. Let’s run this example as well and observe the outcomes:

14:24:49.303 [worker-3] INFO - groupId: mim_fetch_size_500B, poll: #1, fetched: #6 records, offsets: 0 -> 5
14:24:49.812 [worker-3] INFO - groupId: mim_fetch_size_500B, poll: #2, fetched: #4 records, offsets: 6 -> 9
14:24:50.315 [worker-3] INFO - groupId: mim_fetch_size_500B, poll: #3, fetched: #5 records, offsets: 10 -> 14
14:24:50.819 [worker-3] INFO - groupId: mim_fetch_size_500B, poll: #4, fetched: #5 records, offsets: 15 -> 19
[...]

6. Conclusion

In this article, we discussed the way KafkaConsumers are fetching data from the broker. We learned that, by default, the consumer will fetch data if there is at least one new record. On the other hand, if the new data from the partition exceeds 1,048,576 bytes, it will be split into multiple batches of that maximum size. We discovered that customizing the “fetch.min.bytes” and “max.partition.fetch.bytes” properties allows us to tailor Kafka’s behavior to suit our specific requirements.

The code backing this article is available on GitHub. Once you're logged in as a Baeldung Pro Member, start learning and coding on the project.
Baeldung Pro – NPI EA (cat = Baeldung)
announcement - icon

Baeldung Pro comes with both absolutely No-Ads as well as finally with Dark Mode, for a clean learning experience:

>> Explore a clean Baeldung

Once the early-adopter seats are all used, the price will go up and stay at $33/year.

eBook – HTTP Client – NPI EA (cat=HTTP Client-Side)
announcement - icon

The Apache HTTP Client is a very robust library, suitable for both simple and advanced use cases when testing HTTP endpoints. Check out our guide covering basic request and response handling, as well as security, cookies, timeouts, and more:

>> Download the eBook

eBook – Java Concurrency – NPI EA (cat=Java Concurrency)
announcement - icon

Handling concurrency in an application can be a tricky process with many potential pitfalls. A solid grasp of the fundamentals will go a long way to help minimize these issues.

Get started with understanding multi-threaded applications with our Java Concurrency guide:

>> Download the eBook

eBook – Java Streams – NPI EA (cat=Java Streams)
announcement - icon

Since its introduction in Java 8, the Stream API has become a staple of Java development. The basic operations like iterating, filtering, mapping sequences of elements are deceptively simple to use.

But these can also be overused and fall into some common pitfalls.

To get a better understanding on how Streams work and how to combine them with other language features, check out our guide to Java Streams:

>> Join Pro and download the eBook

eBook – Persistence – NPI EA (cat=Persistence)
announcement - icon

Working on getting your persistence layer right with Spring?

Explore the eBook

Course – LS – NPI EA (cat=REST)

announcement - icon

Get started with Spring Boot and with core Spring, through the Learn Spring course:

>> CHECK OUT THE COURSE

Partner – Moderne – NPI EA (tag=Refactoring)
announcement - icon

Modern Java teams move fast — but codebases don’t always keep up. Frameworks change, dependencies drift, and tech debt builds until it starts to drag on delivery. OpenRewrite was built to fix that: an open-source refactoring engine that automates repetitive code changes while keeping developer intent intact.

The monthly training series, led by the creators and maintainers of OpenRewrite at Moderne, walks through real-world migrations and modernization patterns. Whether you’re new to recipes or ready to write your own, you’ll learn practical ways to refactor safely and at scale.

If you’ve ever wished refactoring felt as natural — and as fast — as writing code, this is a good place to start.

eBook Jackson – NPI EA – 3 (cat = Jackson)