eBook – Guide Spring Cloud – NPI EA (cat=Spring Cloud)
announcement - icon

Let's get started with a Microservice Architecture with Spring Cloud:

>> Join Pro and download the eBook

eBook – Mockito – NPI EA (tag = Mockito)
announcement - icon

Mocking is an essential part of unit testing, and the Mockito library makes it easy to write clean and intuitive unit tests for your Java code.

Get started with mocking and improve your application tests using our Mockito guide:

Download the eBook

eBook – Java Concurrency – NPI EA (cat=Java Concurrency)
announcement - icon

Handling concurrency in an application can be a tricky process with many potential pitfalls. A solid grasp of the fundamentals will go a long way to help minimize these issues.

Get started with understanding multi-threaded applications with our Java Concurrency guide:

>> Download the eBook

eBook – Reactive – NPI EA (cat=Reactive)
announcement - icon

Spring 5 added support for reactive programming with the Spring WebFlux module, which has been improved upon ever since. Get started with the Reactor project basics and reactive programming in Spring Boot:

>> Join Pro and download the eBook

eBook – Java Streams – NPI EA (cat=Java Streams)
announcement - icon

Since its introduction in Java 8, the Stream API has become a staple of Java development. The basic operations like iterating, filtering, mapping sequences of elements are deceptively simple to use.

But these can also be overused and fall into some common pitfalls.

To get a better understanding on how Streams work and how to combine them with other language features, check out our guide to Java Streams:

>> Join Pro and download the eBook

eBook – Jackson – NPI EA (cat=Jackson)
announcement - icon

Do JSON right with Jackson

Download the E-book

eBook – HTTP Client – NPI EA (cat=Http Client-Side)
announcement - icon

Get the most out of the Apache HTTP Client

Download the E-book

eBook – Maven – NPI EA (cat = Maven)
announcement - icon

Get Started with Apache Maven:

Download the E-book

eBook – Persistence – NPI EA (cat=Persistence)
announcement - icon

Working on getting your persistence layer right with Spring?

Explore the eBook

eBook – RwS – NPI EA (cat=Spring MVC)
announcement - icon

Building a REST API with Spring?

Download the E-book

Course – LS – NPI EA (cat=Jackson)
announcement - icon

Get started with Spring and Spring Boot, through the Learn Spring course:

>> LEARN SPRING
Course – RWSB – NPI EA (cat=REST)
announcement - icon

Explore Spring Boot 3 and Spring 6 in-depth through building a full REST API with the framework:

>> The New “REST With Spring Boot”

Course – LSS – NPI EA (cat=Spring Security)
announcement - icon

Yes, Spring Security can be complex, from the more advanced functionality within the Core to the deep OAuth support in the framework.

I built the security material as two full courses - Core and OAuth, to get practical with these more complex scenarios. We explore when and how to use each feature and code through it on the backing project.

You can explore the course here:

>> Learn Spring Security

Course – LSD – NPI EA (tag=Spring Data JPA)
announcement - icon

Spring Data JPA is a great way to handle the complexity of JPA with the powerful simplicity of Spring Boot.

Get started with Spring Data JPA through the guided reference course:

>> CHECK OUT THE COURSE

Partner – Moderne – NPI EA (cat=Spring Boot)
announcement - icon

Refactor Java code safely — and automatically — with OpenRewrite.

Refactoring big codebases by hand is slow, risky, and easy to put off. That’s where OpenRewrite comes in. The open-source framework for large-scale, automated code transformations helps teams modernize safely and consistently.

Each month, the creators and maintainers of OpenRewrite at Moderne run live, hands-on training sessions — one for newcomers and one for experienced users. You’ll see how recipes work, how to apply them across projects, and how to modernize code with confidence.

Join the next session, bring your questions, and learn how to automate the kind of work that usually eats your sprint time.

Course – LJB – NPI EA (cat = Core Java)
announcement - icon

Code your way through and build up a solid, practical foundation of Java:

>> Learn Java Basics

Partner – LambdaTest – NPI EA (cat= Testing)
announcement - icon

Distributed systems often come with complex challenges such as service-to-service communication, state management, asynchronous messaging, security, and more.

Dapr (Distributed Application Runtime) provides a set of APIs and building blocks to address these challenges, abstracting away infrastructure so we can focus on business logic.

In this tutorial, we'll focus on Dapr's pub/sub API for message brokering. Using its Spring Boot integration, we'll simplify the creation of a loosely coupled, portable, and easily testable pub/sub messaging system:

>> Flexible Pub/Sub Messaging With Spring Boot and Dapr

1. Overview

In Apache Kafka, the consumer offset is the pointer that indicates the next message that the consumer should read from a topic partition.
Normally, we don’t need to change the committed offset in the consumer application. However, sometimes we need to change the offset to reprocess or skip the messages. The purpose of the offset change can vary, such as fixing an application bug, reprocessing downstream failures, rebuilding state, or testing.

In this tutorial, we’ll learn how to reset the consumer offset in Kafka. We’ll implement different approaches, like using the Kafka CLI tool, programmatically resetting the consumer offset, and using the Kafka Admin API.

2. Reset With Kafka CLI

The Kafka CLI tool helps view, manage, and reset consumer offsets. This would be helpful for any one-time change or during testing.

2.1. Setup the Kafka

First, we’ll configure a local Kafka instance running in Kraft mode with the docker-compose.yml config:

version: "3.8"

services:
  kafka:
    image: confluentinc/cp-kafka:7.9.0
    hostname: kafka
    container_name: kafka
    ports:
      - "9092:9092"
      - "9101:9101"
    expose:
      - '29092'
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092'
      KAFKA_LISTENERS: 'PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:9092'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'

Next, we’ll run the above service in the terminal:

$ docker compose up -d

Then, let’s create a test-topic topic using the kafka-topics command:

$ docker exec kafka kafka-topics --bootstrap-server
  localhost:9092 --create --topic test-topic --partitions 2 --replication-factor 1

Let’s produce and consume a few test messages.

2.2. Producer and Consumer

We’ll produce two test messages by executing the kafka-console-producer command:

$ docker exec -it kafka kafka-console-producer --bootstrap-server localhost:9092 --topic test-topic
>test1
>test2

Then, we’ll consume the messages by running the kafka-console-consumer command:

$ docker exec -it kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic --group test-group
test1
test2

The consumer above reads both messages and commits the offset.

2.3. Reset With the kafka-consumer-groups Command

Let’s run the kafka-consumer-groups‘s describe command to confirm the current committed offset:

$ docker exec kafka kafka-consumer-groups --bootstrap-server localhost:9092 --group test-group --describe
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
test-group      test-topic      0          0               0               0               console-consumer-636e6769-eead-4491-ac63-4179e2620dac /127.0.0.1      console-consumer
test-group      test-topic      1          2               2               0               console-consumer-636e6769-eead-4491-ac63-4179e2620dac /127.0.0.1      console-consumer

For this example, we’ll reset the offset to the earliest committed offset value.

We should ensure that the consumer service is stopped before resetting any offset. Kafka does not allow resetting the offset in an active consumer.

First, we’ll preview the new offset using the kafka-consumer-groups –dry-run command:

$ docker exec kafka kafka-consumer-groups --bootstrap-server
  localhost:9092 --group test-group --topic test-topic --reset-offsets --to-earliest --dry-run
GROUP                          TOPIC                          PARTITION  NEW-OFFSET 
test-group                     test-topic                     0          0
test-group                     test-topic                     1          0

From the data above, we confirm the new offset value before applying the change.

Then, let’s run the kafka-consumer-groups‘s execute command to reset the offset:

$ docker exec kafka kafka-consumer-groups --bootstrap-server
  localhost:9092 --group test-group --topic test-topic --reset-offsets --to-earliest --execute

Finally, we’ll execute the kafka-console-consumer command to restart the same consumer:

$ docker exec -it kafka kafka-console-consumer --bootstrap-server
  localhost:9092 --topic test-topic --group test-group
test1
test2

The consumer above replays the same two messages after the offset reset.

Here’s the other –reset-offsets options for the kafka-consumer-group command:

  • –to-latest – Move to the most recent offset, making the consumer skip previous messages and process new data
  • –to-offset <offset> – Moves the consumer to a specific, fixed offset number
  • –shift-by <n> – Moves the offset forward (+n) or backward (-n) from its current position
  • –to-datetime <timestamp> – Resets the offset to the position corresponding to a specific timestamp
  • –by-duration <duration> – Resets to a point in the recent past
  • –from-file <File> – Resets the offset to values mentioned in a specific CSV file

We should be careful before resetting the offset in the production environment and test it before changing it.

3. Programmatically Reset in Consumer

Kafka provides a client API for programmatically seeking to a particular offset in the consumer service. The new offset position can be from the beginning, latest, from a specific time, or an offset value.

For this example, we’ll implement a common use case: replaying all messages after a specific time.

3.1. Implement the Consumer Rebalance Listener

To seek from a desired offset position or timestamp, we’ll need to implement the ConsumerRebalanceListener interface. The ConsumerRebalanceListener implementation will include the seeking logic for partition assignment.

The ConsumerRebalanceListener implementation will include a required KafkaConsumer, a replayFromTime, and a seekDone flag to check whether the replay is complete.

Let’s implement the ReplayRebalanceListener class with the onPartitionsAssigned method:

public class ReplayRebalanceListener implements ConsumerRebalanceListener {

    private final KafkaConsumer<String, String> consumer;
    private final long replayFromTimeInEpoch;
    private final AtomicBoolean seekDone = new AtomicBoolean(false);

    public ReplayRebalanceListener(KafkaConsumer<String, String> consumer, long replayFromTimeInEpoch) {
        this.consumer = consumer;
        this.replayFromTimeInEpoch = replayFromTimeInEpoch;
    }
    
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        if (seekDone.get() || partitions.isEmpty()) {
            return;
        }

        Map<TopicPartition, Long> partitionsTimestamp = partitions.stream()
          .collect(Collectors.toMap(Function.identity(), tp -> replayFromTimeInEpoch));

        Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(partitionsTimestamp);

        partitions.forEach(tp -> {
            OffsetAndTimestamp offsetAndTimestamp = offsets.get(tp);
            if (offsetAndTimestamp != null) {
                consumer.seek(tp, offsetAndTimestamp.offset());
            }
        });

        seekDone.set(true);
    }
}

In the code above, we intentionally seek the consumer based on a timestamp for each partition.
The onPartitionsAssigned method will run before the consumer starts polling for the ConsumerRecords.
Additionally, if the seek was already done, then it returns without re-seeking. This is done to avoid unnecessary replays to the consumer due to any errors.

3.2. Implement the Kafka Consumer Service

We’ll implement a Kafka consumer service using the above ReplayRebalanceListener class.

First, we’ll define the KafkaConsumerService class with a KafkaConsumer and a running boolean flag:

public class KafkaConsumerService {
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class);
    private final KafkaConsumer<String, String> consumer;
    private final AtomicBoolean running = new AtomicBoolean(true);

    public KafkaConsumerService(Properties consumerProps, String topic, long replayFromTimestampInEpoch) {
        this.consumer = new KafkaConsumer<>(consumerProps);
        if (replayFromTimestampInEpoch > 0) {
            consumer.subscribe(List.of(topic),
              new ReplayRebalanceListener(consumer, replayFromTimestampInEpoch));
        } else {
            consumer.subscribe(List.of(topic));
        }
    }
}

In the code above, we subscribe the Kafka consumer to the topic and include the ReplayRebalanceListener only if the replayFromTimestampInEpoch is non-zero.

We’ll implement the start method in the above class to start polling for ConsumerRecords:

public void start() {
    try {
        while (running.get()) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            records.forEach(record -> log.info("topic={} partition={} offset={} key={} value={}",
              record.topic(), record.partition(), record.offset(), record.key(), record.value())););
            consumer.commitSync();
        }
    } catch (WakeupException ex) {
        if (running.get()) {
            log.error("Error in the Kafka Consumer with exception {}", ex.getMessage(), ex);
            throw ex;
        }
    } finally {
        consumer.close();
    }
}

In the code above, the consumer continuously polls the topic, logs the consumer record, and finally commits.

Also, let’s add the shutdown method to gracefully shutdown the consumer service:

public void shutdown() {
    running.set(false);
    consumer.wakeup();
}

In Spring application, we can implement the same using the ConsumerSeekAware interface.

Next, we’ll test the KafkaConsumerService class.

3.3. Test the Consumer Service

We’ll implement the integration tests for the KafkaConsumerService class using the testcontainers for Kafka.

Let’s consider a test case where the consumer is replayed after a particular timestamp, and then verify if the consumer was already read or not based on using the same consumer group as a test consumer.

We’ll implement the consumeFromCommittedOffset helper method to get the committed offset:

private List<String> consumeFromCommittedOffset(String topic, String groupId) {
    List<String> values = new ArrayList<>();

    try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getConsumerConfig(groupId))) {
        consumer.subscribe(Collections.singleton(topic));

        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
        for (ConsumerRecord<String, String> r : records) {
           values.add(r.value());
        }
    }

    return values;
}

We’ll implement a test by sending two messages, and then replay the consumer after the first message timestamp:

@Test
void givenConsumerReplayIsEnabled_whenReplayTimestampIsProvided_thenConsumesFromTimestamp() {
    KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerConfig());
    long firstMsgTs = System.currentTimeMillis();
    producer.send(new ProducerRecord<>("test-topic-1", 0, firstMsgTs, "x1", "test1"));
    producer.flush();

    long baseTs = System.currentTimeMillis();

    long secondMsgTs = baseTs + 1L;
    producer.send(new ProducerRecord<>("test-topic-1", 0, secondMsgTs, "x2", "test2"));
    producer.flush();

    KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig("test-group-1"),
      "test-topic-1",
      baseTs);
    new Thread(kafkaConsumerService::start).start();

    Awaitility.await()
      .atMost(45, TimeUnit.SECONDS)
      .pollInterval(1, TimeUnit.SECONDS)
      .untilAsserted(() -> {
          List<String> consumed = consumeFromCommittedOffset("test-topic-1", "test-group-1");
          assertEquals(0, consumed.size());
          assertFalse(consumed.contains("test1"));
          assertFalse(consumed.contains("test2"));
      });

    kafkaConsumerService.shutdown();
}

We’ll verify the test assertions and the KafkaConsumerService logs:

16:49:38.898 [Thread-5] INFO c.b.k.r.c.KafkaConsumerService - topic=test-topic partition=0 offset=1 key=x2 value=test2

From the test above, we confirm the consumer is polling after the provided timestamp and skips the previous messages.

We should note that the consumer is recommended not to enable the ENABLE_AUTO_COMMIT_CONFIG. As the consumer commits happen periodically and independently of the rebalance, they might cause message duplication.

4. Reset With Kafka Admin API

Instead of using the Kafka CLI command or changing the consumer-side code, we can consider implementing the reset offset management into the existing automation tools.

Kafka provides the Admin API to programmatically administer and manage the cluster. Using this API, we can change the offset for any topic and consumer group.

4.1. Implement the Kafka Offset Admin Service

Using the AdminClient API, we can update the consumer offset for any topic and consumer group.

First, we’ll implement the ResetOffsetService class with the AdminClient instance:

public class ResetOffsetService {
    private final AdminClient adminClient;

    public ResetOffsetService(String bootstrapServers) {
        this.adminClient = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers));
    }
}

Next, we’ll implement the fetchPartitions method to get the topic partitions data in the above class:

private List<TopicPartition> fetchPartitions(String topic) throws ExecutionException, InterruptedException {
    return adminClient.describeTopics(List.of(topic))
      .values()
      .get(topic)
      .get()
      .partitions()
      .stream()
      .map(p -> new TopicPartition(topic, p.partition()))
      .toList();
}

Then, let’s implement the fetchEarliestOffsets method to fetch the above partition’s earliest offset values:

private Map<TopicPartition, OffsetAndMetadata> fetchEarliestOffsets(List<TopicPartition> partitions) {
    Map<TopicPartition, OffsetSpec> offsetSpecs = partitions.stream()
      .collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.earliest()));

    ListOffsetsResult offsetsResult = adminClient.listOffsets(offsetSpecs);
    Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

    partitions.forEach(tp -> {
        long offset = Optional.ofNullable(offsetsResult.partitionResult(tp))
          .map(kafkaFuture -> {
              try {
                  return kafkaFuture.get();
              } catch (InterruptedException | ExecutionException ex) {
                  log.error("Error in the Kafka Consumer reset with exception {}", ex.getMessage(), ex);
                  throw new RuntimeException(ex);
              }
           })
           .map(ListOffsetsResult.ListOffsetsResultInfo::offset)
           .orElseThrow(() -> new RuntimeException("No offset result returned for partition " + tp));
        
        offsets.put(tp, new OffsetAndMetadata(offset));
    });

    return offsets;
}

In the code above, we’re fetching the offset value per partition using the AdminClient‘s listOffsets method and then returning as a Map<TopicPartition, OffsetAndMetadata> instance.

To reset the offset, we’ll first fetch the partition’s information, get the earliestOffsets map, and alter the offsets using the AdminClient‘s alterConsumerGroupOffsets method.

Finally, we’ll implement the reset method with the topic and the consumerGroup as input in the above class:

public void reset(String topic, String consumerGroup) {
    List<TopicPartition> partitions;
    try {
        partitions = fetchPartitions(topic);
    } catch (ExecutionException | InterruptedException ex) {
        log.error("Error in the fetching partitions with exception {}", ex.getMessage(), ex);
        throw new RuntimeException(ex);
    }

    Map<TopicPartition, OffsetAndMetadata> earliestOffsets = fetchEarliestOffsets(partitions);

    try {
        adminClient.alterConsumerGroupOffsets(consumerGroup, earliestOffsets)
          .all()
          .get();
    } catch (InterruptedException | ExecutionException ex) {
        log.error("Error in the Kafka Consumer reset with exception {}", ex.getMessage(), ex);
        throw new RuntimeException(ex);
    }
}

In the code above, we’re throwing a RuntimeException in case of any failure in the above methods.

Instead of resetting to the earliest offset, we can also reset the offset after a particular timestamp, the latest, or any offset value.

4.2. Testing the Admin Service

Similar to the earlier test, we’ll implement the integration tests for the ResetOffsetService class using the testcontainers.

Let’s implement the fetchCommitedOffset method to get the partition’s committed offset:

private long fetchCommittedOffset(String groupId) throws ExecutionException, InterruptedException {
    Map<TopicPartition, OffsetAndMetadata> offsets = testAdminClient.listConsumerGroupOffsets(groupId)
      .partitionsToOffsetAndMetadata()
      .get();

    return offsets.values()
      .iterator()
      .next()
      .offset();
}

Now, we’ll write the test to verify the ResetService‘s reset method:

@Test
void givenMessagesAreConsumed_whenOffsetIsReset_thenOffsetIsSetToEarliest() {
    KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerConfig());
    producer.send(new ProducerRecord<>("test-topic-1", "msg-1"));
    producer.send(new ProducerRecord<>("test-topic-1", "msg-2"));
    producer.flush();

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getConsumerConfig("test-group-1"));
    consumer.subscribe(List.of("test-topic-1"));

    int consumed = 0;
    while (consumed < 2) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
        consumed += records.count();
    }

    consumer.commitSync();
    consumer.close();

    await().atMost(5, SECONDS)
      .pollInterval(Duration.ofMillis(300))
      .untilAsserted(() -> assertEquals(2L, fetchCommittedOffset("test-group-1")));

    resetService.reset("test-topic-1", "test-group-1");

    await().atMost(5, SECONDS)
      .pollInterval(Duration.ofMillis(300))
      .untilAsserted(() -> assertEquals(0L, fetchCommittedOffset("test-group-1")));
}

In the code above, we’re asserting that the committed offset is before and after the reset.

Consumers should be idempotent to safely handle offset resets without causing duplicate processing.

5. Conclusion

In this article, we’ve learned how to reset the consumer offset in Kafka. We’ve implemented it in multiple ways, like using the Kafka CLI tool, by replaying with the KafkaRebalanceListener interface and using the AdminClient API. We’ve also tested the implementations in a Docker environment.

As always, the example code can be found over on GitHub.

Baeldung Pro – NPI EA (cat = Baeldung)
announcement - icon

Baeldung Pro comes with both absolutely No-Ads as well as finally with Dark Mode, for a clean learning experience:

>> Explore a clean Baeldung

Once the early-adopter seats are all used, the price will go up and stay at $33/year.

eBook – HTTP Client – NPI EA (cat=HTTP Client-Side)
announcement - icon

The Apache HTTP Client is a very robust library, suitable for both simple and advanced use cases when testing HTTP endpoints. Check out our guide covering basic request and response handling, as well as security, cookies, timeouts, and more:

>> Download the eBook

eBook – Java Concurrency – NPI EA (cat=Java Concurrency)
announcement - icon

Handling concurrency in an application can be a tricky process with many potential pitfalls. A solid grasp of the fundamentals will go a long way to help minimize these issues.

Get started with understanding multi-threaded applications with our Java Concurrency guide:

>> Download the eBook

eBook – Java Streams – NPI EA (cat=Java Streams)
announcement - icon

Since its introduction in Java 8, the Stream API has become a staple of Java development. The basic operations like iterating, filtering, mapping sequences of elements are deceptively simple to use.

But these can also be overused and fall into some common pitfalls.

To get a better understanding on how Streams work and how to combine them with other language features, check out our guide to Java Streams:

>> Join Pro and download the eBook

eBook – Persistence – NPI EA (cat=Persistence)
announcement - icon

Working on getting your persistence layer right with Spring?

Explore the eBook

Course – LS – NPI EA (cat=REST)

announcement - icon

Get started with Spring Boot and with core Spring, through the Learn Spring course:

>> CHECK OUT THE COURSE

Partner – Moderne – NPI EA (tag=Refactoring)
announcement - icon

Modern Java teams move fast — but codebases don’t always keep up. Frameworks change, dependencies drift, and tech debt builds until it starts to drag on delivery. OpenRewrite was built to fix that: an open-source refactoring engine that automates repetitive code changes while keeping developer intent intact.

The monthly training series, led by the creators and maintainers of OpenRewrite at Moderne, walks through real-world migrations and modernization patterns. Whether you’re new to recipes or ready to write your own, you’ll learn practical ways to refactor safely and at scale.

If you’ve ever wished refactoring felt as natural — and as fast — as writing code, this is a good place to start.

Course – LS – NPI (cat=Java)
announcement - icon

Get started with Spring Boot and with core Spring, through the Learn Spring course:

>> CHECK OUT THE COURSE

eBook Jackson – NPI EA – 3 (cat = Jackson)
guest
0 Comments
Oldest
Newest
Inline Feedbacks
View all comments