Migrate consumer groups
A consumer group is a set of consumers that work together to consume messages from one or more topics. Each consumer in the group is assigned a subset of partitions, and Kafka tracks the offset (position) of each consumer group per partition.
This offset tells Kafka which messages have already been processed, so consumers can resume from where they left off after a restart or failure.
Challenge
Section titled “Challenge”When cloning data from one Kafka environment to another (e.g., production to QA), consumer group offsets cannot be directly transferred. Kafka assigns new offsets to restored messages, which means your consumers would either re-process all messages from the beginning, or skip messages entirely.
Solution
Section titled “Solution”Kannika Armory and Kannika Bridge work together to solve this problem:
- Armory restores messages with a special header containing the original offset.
- Bridge reads these headers, calculates the equivalent target offsets and applies the offsets in the target cluster.
This allows your consumers in the target environment to continue exactly where they left off.
Prerequisites
Section titled “Prerequisites”- A Kannika Armory instance available, running on a Kubernetes environment.
- Local installation of the
kbridgebinary. - Local installation of the
kubectlbinary. - Local installation of
sed(included by default on macOS and Linux).
Refer to the Setup section to set up the lab environment.
Scenario introduction
Section titled “Scenario introduction”In this scenario, you will simulate migrating data from a production environment to a QA environment:
- Source topic:
orders-prod(simulating production) - contains 5 order messages - Target topic:
orders-qa(simulating QA environment) - empty, ready for restore - Consumer group:
order-processor- has processed 3 of 5 messages in production
The goal is to restore messages to the QA topic and migrate the consumer group offsets, so that consumers can continue processing from message 4 (not re-processing messages 1-3).
Run the setup script:
curl -fsSL https://raw.githubusercontent.com/kannika-io/armory-examples/main/install.sh | bash -s -- migrate-consumer-groupsOr clone the armory-examples repository:
git clone https://github.com/kannika-io/armory-examples.gitcd armory-examples./setup migrate-consumer-groupsThis sets up:
Kubernetes cluster: kannika-kind├── Namespace: kannika-system│ └── Kannika Armory└── Namespace: kannika-data ├── EventHub: prod-kafka → kafka-source:9092 ├── EventHub: qa-kafka → kafka-target:9093 ├── Storage: prod-storage └── Backup: prod-backup
Kafka: kafka-source:9092 (localhost:9092)├── Topic: orders-prod (5 messages starting at offset 100)└── ConsumerGroup: order-processor (offset: 103)
Kafka: kafka-target:9093 (localhost:9093)└── Topic: orders-qa (empty)Step 1: Pause the Backup
Section titled “Step 1: Pause the Backup”Before restoring, pause the backup to ensure the latest data is flushed to storage:
kubectl patch backup prod-backup -n kannika-data \ --type merge \ -p '{"spec":{"enabled":false}}'Verify the backup is paused:
kubectl get backup prod-backup -n kannika-dataNAME STATUSprod-backup PausedStep 2: Restore messages with Kannika Armory
Section titled “Step 2: Restore messages with Kannika Armory”Create a Restore with the legacyOffsetHeader option to preserve original offsets in message headers:
kubectl apply -f tutorials/migrate-consumer-groups/restore-prod-to-qa.yaml# Restore job that restores orders-prod from backup to orders-qa# The legacyOffsetHeader preserves original offsets for consumer group migrationapiVersion: kannika.io/v1alphakind: Restoremetadata: name: restore-prod-to-qa namespace: kannika-dataspec: source: prod-storage # Storage to read backup data from sink: qa-kafka # EventHub to restore to enabled: true # Start restoring immediately config: legacyOffsetHeader: "__original_offset" # Header for original offset (for consumer group translation) topics: # Topics to restore - source: orders-prod # Topic name in backup target: orders-qa # Topic name to restore to in EventHubMonitor progress:
kubectl get restore restore-prod-to-qa -n kannika-data -wWait for the status to show Done.
Each restored message now contains an __original_offset header with its original offset from the source topic.
Step 3: Migrate consumer offsets with Kannika Bridge
Section titled “Step 3: Migrate consumer offsets with Kannika Bridge”The next step is to migrate the consumer group offsets from the source topic to the target topic. We can achieve this using the three-step workflow provided by Kannika Bridge:
- Fetch the current offsets from the source topic
- Calculate the equivalent offsets on the target topic
- Apply the calculated offsets to the target topic
Fetch consumer group offsets from the source
Section titled “Fetch consumer group offsets from the source”Fetch the current consumer group offsets from the source topic:
kbridge fetch -b $(docker compose port kafka-source 9092) \ -t orders-prod \ > source_offsets.csvView the fetched offsets:
cat source_offsets.csvorder-processor,orders-prod,0,103This shows the order-processor consumer group has processed up to offset 3 on orders-prod.
Map topic names
Section titled “Map topic names”The source offsets reference the source topic (orders-prod),
but we need to calculate offsets for the target topic (orders-qa).
Update the topic names in the CSV:
sed -i '' 's/orders-prod/orders-qa/g' source_offsets.csvAlternatively, use yq to extract the topic mappings from the restore config:
yq '.spec.config.topics[] | "s/\(.source)/\(.target)/g"' tutorials/migrate-consumer-groups/restore-prod-to-qa.yaml \ | xargs -I{} sed -i '' '{}' source_offsets.csvVerify that the topics have been renamed:
cat source_offsets.csvorder-processor,orders-qa,0,103Calculate target offsets
Section titled “Calculate target offsets”Use the calculate command to find the equivalent offsets on the target topic.
It reads the __original_offset header from restored messages:
kbridge calculate -b $(docker compose port kafka-target 9093) \ -H __original_offset \ -i source_offsets.csv \ > target_offsets.csvThe -H flag specifies the header name that contains the original offset.
This must match the legacyOffsetHeader value from the Restore configuration.
View the calculated offsets (note the topic has changed to orders-qa):
cat target_offsets.csvorder-processor,orders-qa,0,3Apply the target offsets
Section titled “Apply the target offsets”Apply the calculated offsets to create the consumer group on the target topic:
kbridge apply -b $(docker-compose port kafka-target 9093) -i target_offsets.csvStep 4: Verify the migration
Section titled “Step 4: Verify the migration”After applying offsets, verify they were set correctly:
docker exec kafka-target kafka-consumer-groups \ --bootstrap-server localhost:9093 \ --group order-processor \ --describeThe consumer group should now show offsets for orders-qa:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAGorder-processor orders-qa 0 3 5 2The current offset (3) corresponds to the original offset 103 in production. The LAG of 2 shows there are 2 remaining messages to process.
When you start consuming from orders-qa with the order-processor consumer group,
it will continue from where it left off in production.
The first 3 messages that were already processed are skipped.
Step 5: Cleanup
Section titled “Step 5: Cleanup”To clean up all resources, run the teardown script:
./teardownSummary
Section titled “Summary”In this tutorial, you learned how to:
- Restore data with the
legacyOffsetHeaderoption to preserve original offsets in message headers - Use the kbridge three-step workflow (fetch, calculate, apply) to migrate consumer group offsets
- Verify that consumers can continue from the correct position
This workflow ensures seamless environment cloning with consumer state preservation.