Restore
A Restore is used to restore messages from cold storage to a message source like Kafka.
It has many different configuration options to allow for a wide range of use cases. You can use it to restore data from multiple topics in parallel, restore data until a certain point in time or until a certain offset.
Another use case is to restore or migrate data from one cluster to another cluster. It supports mapping schemas between topics, renaming topics, and much more.
Restore resources can be managed using the kubectl command line tool,
and are available by the name restore or restores.
$ kubectl get restoresNAME STATUS AGEmy-restore Restoring 1sCreating a Restore
Section titled “Creating a Restore”The following is an example of a Restore.
It will restore 2 topics from the my-volume-storage Storage to the my-kafka-cluster Endpoint.
apiVersion: kannika.io/v1alphakind: Restoremetadata: name: restore-examplespec: source: "my-volume-storage" sink: "my-kafka-cluster" enabled: true config: topics: - target: target-topicA source: source-topicA - target: target-topicB source: source-topicBIn this example:
-
A Restore named
restore-exampleis created, indicated by the.metadata.namefield. This name will become the basis for the Kubernetes Job (and other resources) which are created later. -
The Restore will connect to the
my-volume-storageStorage to fetch data, indicated by the.spec.sourcefield. The Restore will restore data to themy-kafka-clusterEvent Hub Endpoint defined in thespec.sinkfield. -
The
.spec.config.topicsfield contains the mapping between the source and target topics. In this example, the topicssource-topicAandsource-topicBwill be restored totarget-topicAandtarget-topicBrespectively. -
The
.spec.enabledfield indicates whether the Restore should be enabled or not. Note that this field is optional and defaults tofalse. This means that the Restore will not be started unless this field is explicitly set totrue. Please check the Drafting and starting a Restore section for more information.
Drafting and starting a Restore
Section titled “Drafting and starting a Restore”The following is an example of a Draft Restore.
apiVersion: kannika.io/v1alphakind: Restoremetadata: name: restore-examplespec: source: "my-volume-storage" sink: "my-kafka-cluster" enabled: false config: {}This is a minimal Restore definition.
In this state,
the Restore will not be started (the Job will not be created yet)
because the .spec.enabled field is not set to true.
The Restore is in a Draft state.
At this stage of the Restore lifecycle, the Restore can act as a working document, and be edited and modified as needed. More importantly, topics can be imported and mapped to target topics, or removed.
To start the Restore,
the .spec.enabled field must be set to true.
Restore to multiple topics in parallel
Section titled “Restore to multiple topics in parallel”The following is an example of a draft Restore that can restore to multiple topics in parallel.
apiVersion: kannika.io/v1alphakind: Restoremetadata: name: parallel-examplespec: source: "my-volume-storage" sink: "my-sink" enabled: true config: parallelism: 3In this example,
the .spec.config.parallelism field is set to 3.
This means that the Restore will restore 3 topics in parallel.
Use cases where this is useful:
- when restoring a large number of topics
- when restoring large, single partition topics
By default, a Restore will use a single producer at any given time. This may limit the amount of concurrency achievable if the target topics are configured in a way that requires different producer configurations.
At this time,
a matching value of message.max.bytes is the only cluster-side topic setting considered for sharing a producer,
but more may be used in the future.
To increase the number of producers a Restore is allowed to use concurrently,
set the spec.config.maxProducers attribute:
apiVersion: kannika.io/v1alphakind: Restoremetadata: name: parallel-examplespec: source: "my-volume-storage" sink: "my-sink" enabled: true config: parallelism: 3 maxProducers: 2In this example,
the .spec.config.parallelism field is set to 3
and .spec.config.maxProducers is set to 2.
This means that the Restore will restore up to 3 topics in parallel
and that it is allowed to create an additional producer if required.
Adding the legacy offset to the headers
Section titled “Adding the legacy offset to the headers”You can add the original offsets to the headers of the restored messages. The value of the header will be the original offset as a string.
The following is an example of a Restore that adds the original offset to the headers of the restored message.
apiVersion: kannika.io/v1alphakind: Restoremetadata: name: legacy-offset-header-examplespec: source: "my-volume-storage" sink: "sink" enabled: true config: legacyOffsetHeader: "original-offset" topics: - target: target-topic source: source-topicIn this example,
the .spec.config.legacyOffsetHeader field is set to original-offset.
This means that the original offset will be added to the headers of the restored messages with the key original-offset.
If you wish to add the header only to a specific topic and not to all topics, you must split the Restore into multiple Restores.
Restoring data between two dates
Section titled “Restoring data between two dates”It is possible to restore records whose timestamp is within some time period.
Two settings are available to achieve this:
restoreFromDateTime: start restoring from the given datetimerestoreUntilDateTime: restore data up to the given datetime
When used together, those settings define a time window.
The following is an example of a Restore that restores the data up to the start of the year 2021:
apiVersion: kannika.io/v1alphakind: Restoremetadata: name: restore-until-examplespec: source: "my-volume-storage" sink: "sink" enabled: true config: restoreUntilDateTime: "2021-01-01T00:00:00Z" topics: - target: target-topic source: source-topicIn this example,
the .spec.config.restoreUntilDateTime field is set to 2021-01-01T00:00:00Z.
This means that the Restore job will only restore data with a timestamp earlier than January 1st, 2021.
Any messages with a timestamp from this point in time will not be restored.
In the next example, we define a time window using both parameters in order to restore 1 month worth of data:
apiVersion: kannika.io/v1alphakind: Restoremetadata: name: restore-window-examplespec: source: "my-volume-storage" sink: "sink" enabled: true config: restoreFromDateTime: "2021-01-01T00:00:00Z" restoreUntilDateTime: "2021-02-01T00:00:00Z" topics: - target: target-topic source: source-topicIn this example,
the .spec.config.restoreFromDateTime and .spec.config.restoreUntilDateTime are used together to instruct the Restore to select only records from the month of January 2021.
Selecting offsets to restore
Section titled “Selecting offsets to restore”For each partition of a given topic, you can define a range of offsets to restore.
The following is an example of a Restore using multiple filtering rules:
apiVersion: kannika.io/v1alphakind: Restoremetadata: name: restore-until-examplespec: source: "source" sink: "sink" enabled: true config: topics: - target: target-topic source: source-topic partitions: - number: 0 restoreUntilOffset: 100 - number: 1 restoreFromOffset: 50 restoreUntilOffset: 1500 - number: 2In this example, we’ve defined various combinations of parameters to request different filtering rules.
The Restore job will only restore:
- offsets from 0 (inclusive) to 100 (exclusive) from partition #0;
- offsets from 50 (inclusive) to 1500 (exclusive) from partition #1;
- everything from partition #2;
- nothing from every other partition (if any).
Preflight checks
Section titled “Preflight checks”A Restore performs preflight checks before starting the restore process for each topic. These checks are used to ensure that topic can be restored without any issues.
The current checks are:
- Check if the target topic exists
- Check if the partition count of the source and target topics are the same
Future checks that are planned are:
- Check if the target topic is empty
- Check if the target topic has the same configuration as the source topic
Check the roadmap for more information.
Disabling preflight checks
Section titled “Disabling preflight checks”In some cases, it can be necessary to disable the preflight checks.
Fine-grained control over the preflight checks is not available yet,
but you can disable all preflight checks for a specific topic by setting the disablePreflightChecks field to true in the mapping configuration.
Example:
apiVersion: kannika.io/v1alphakind: Restoremetadata: name: restore-without-preflight-checksspec: sink: "sink" source: "source" enabled: true config: topics: - target: target-topic-name source: source-topic-name disablePreflightChecks: true # Disable preflight checks for this topicPausing and resuming a Restore
Section titled “Pausing and resuming a Restore”You can pause and resume a Restore while it is in progress.
Even though a Restore is designed to be robust and contains retries and error handling, a Restore can be interrupted for numerous reasons:
- Kubernetes terminated the Pod due to resource constraints.
- The associated Pod got deleted.
- The process failed for some other reason.
In these cases, you may want to resume the Restore from where it left off.
Another situation where resuming a restore is useful is when you have restored data to another environment, perhaps while a Backup was running, and then you want to restore more data at a later time.
In any case, you can restart and resume the Restore from the last successful point.
Pausing a Restore
Section titled “Pausing a Restore”You can pause a Restore by setting the .spec.enabled field to false.
$ kubectl patch restore <restore-name> --type='merge' -p '{"spec":{"enabled":false}}'Resuming a Restore
Section titled “Resuming a Restore”You can resume the Restore by deleting the associated Job. If the Restore is enabled, the operator will automatically regenerate a new Job and restart the Restore process.
# Delete the job$ kubectl delete job $(kubectl get restore <restore-name> -o jsonpath='{.metadata.annotations.io\.kannika/restore-job-name}')# Make sure the Restore is enabled$ kubectl patch restore <restore-name> --type='merge' -p '{"spec":{"enabled":true}}'The name of the Job is stored in the io.kannika/restore-job-name annotation on the Restore.
The Job itself has the io.kannika/restore label.
How is progress tracked?
Section titled “How is progress tracked?”The progress of a Restore is tracked by the Restore Report.
When a Restore receives a SIGTERM or SIGKILL signal, it will gracefully stop processing new messages and update the Restore Report with the latest progress. The graceful shutdown period is determined by the Kubernetes settings for the Pod.
In case of errors, the restore will also update the Restore Report with the latest progress and error information.
When the Restore is resumed, it will read the Restore Report and continue restoring from the last successful offset for each partition.
Restarting from scratch
Section titled “Restarting from scratch”If you want to restart the Restore from scratch, there are a few options.
To use the same Restore resource, you need to:
- first delete the Restore Report associated with the Restore,
- then delete the Job associated with the Restore.
Alternatively,
taking a copy of the existing Restore might be easier,
as long as the io.kannika/report-pvc-name and io.kannika/restore-job-name annotations are not copied over.
# Delete the report$ kubectl delete pvc $(kubectl get restore <restore-name> -o jsonpath='{.metadata.annotations.io\.kannika/report-pvc-name}')# Delete the job$ kubectl delete job $(kubectl get restore <restore-name> -o jsonpath='{.metadata.annotations.io\.kannika/restore-job-name}')# Make sure the Restore is enabled$ kubectl patch restore <restore-name> --type='merge' -p '{"spec":{"enabled":true}}'Mapping schemas
Section titled “Mapping schemas”You can map schemas between the source and target topics when restoring data. Check the Schema Mapping page for more details.