WASI plugins
Kannika Armory supports a plugin system built on top of the WebAssemly System Interface (WASI) and the WebAssembly Component Model.
Using WASI plugins
A WASI plugin is a single WebAssembly (.wasm
) file that must be made accessible to the operator using a Persistent Volume Claim (PVC).
Once that PVC has been created, you can reference it in a Backup or a Restore by defining a wasiPluginRef
in spec.plugins
:
apiVersion: kannika.io/v1alphakind: Backupmetadata: name: backup-examplespec: enabled: true source: "my-kafka-cluster" sink: "my-bucket" plugins: - wasiPluginRef: witVersion: "0.1" claimName: plugins-store-pvc path: my_plugin.wasm spec: truth: "Un tiens vaut mieux que deux tu l'auras" streams: - topic: "foo"
In this example,
We’ve listed a wasiPluginRef
entry in the list of enabled plugins for a Backup.
A wasiPluginRef
has some mandatory properties:
- The
witVersion
tells the engine we are using version 0.1 of thewit
definition files (see below). - A
claimName
that references a persistent volume claim where plugins are stored. - A
path
that tells the Backup where the plugin is located within the persistent volume.
An optional configuration can be added using the spec
field.
It must be a yaml object that will be forwarded as a string to the plugin’s initialization function.
As always, a topicSelector
can be used to enable the plugin on only a subset of topics.
Restore resources accept the same kind of configuration.
Writing a WASI plugin
If you wish to write your own plugins, then you must implement the interface Kannika Armory expects.
That interface is defined using the Wasm Interface Type language. The latest WIT files for Kannika Armory are located here: https://github.com/kannika-io/armory-plugins-wit
Backups and Restores have slightly different needs so each have their own interface (or world in WIT lingo). Depending on the kind of plugin you wish to develop, you will have to implement one or the other.
Below are some guides to get you started in one of these programming languages:
In Rust
Here’s an example on how to write a repartitioning plugin to be used with a Restore in Rust.
This plugin will accept a number of partitions, no_partition
as a configuration option and will
simply overwrite the original partition of the record with that logic:
new_partition = old_partition % no_partition
You will need cargo component
to build a WebAssembly component:
$ cargo install cargo-component
Then, create a new library project
and add the serde
and serde_yaml
crates to de-serialize the plugin’s configuration.
$ cargo component new acme-repartitioning --lib$ cd acme-repartitioning$ cargo add serde --features derive$ cargo add serde_yaml
Next, remove the template files in the wit/
folder
and copy types.wit
and restore.wit
into it instead.
Also, edit the Cargo.toml
file to match the package of the WIT files:
[package]name = "acme-repartitioning"version = "0.1.0"edition = "2021"
[dependencies]wit-bindgen-rt = { version = "0.29.0", features = ["bitflags"] }
[lib]crate-type = ["cdylib"]
[profile.release]codegen-units = 1opt-level = "s"debug = falsestrip = truelto = true
[package.metadata.component]package = "component:acme-repartitioning"package = "kannika:wasi-plugins"
[package.metadata.component.dependencies]
Your project should look like this:
├── Cargo.lock├── Cargo.toml├── src│ └── lib.rs└── wit ├── restore.wit └── types.wit
And here’s the implementation of the plugin:
use std::num::NonZeroU64;use std::sync::OnceLock;
// The bindings are generated by `cargo-component`#[allow(warnings)]mod bindings;use bindings::kannika::wasi_plugins::types::{KafkaRecord, Outcome};use bindings::{Context, Guest, TargetTopicRequirements};
/// This struct represents our plugin.struct MyPlugin;
/// The plugin's configuration#[derive(serde::Deserialize)]#[serde(rename_all = "camelCase")]struct MyCfg { /// Number of partitions no_partitions: NonZeroU64,}
impl Default for MyCfg { fn default() -> Self { Self { no_partitions: NonZeroU64::MIN } }}
/// The plugin's configuration./// Set during initialization when `init()` is called.static CFG: OnceLock<MyCfg> = OnceLock::new();
/// Helper function to retrieve the configuration.fn get_cfg() -> &'static MyCfg { CFG.get().expect("init() should have been called")}
// The `Guest` trait is found in the generated bindings.// It is the rust-facing interface for the `wasi-restore-plugin` "world".impl Guest for MyPlugin { /// This function is the first one called. It is called just after the wasm /// file has been loaded. /// /// The `Context` struct gives us some information about the restore job /// initializing the plugin such as the target topic's name. /// /// If the plugin's `spec` field was defined in the Restore resource, then /// `cfg` will hold that configuration as a yaml string that must be de-serialized /// into a `MyCfg`. fn init(_ctx: Context, cfg: Option<String>) -> Result<(), String> { // If configuration is provided, try to de-serialize it. let cfg = cfg .as_deref() .map(serde_yaml::from_str::<MyCfg>) .transpose() .map_err(|err| err.to_string())?; CFG.get_or_init(|| cfg.unwrap_or_default()); Ok(()) }
/// This is called after init(), but before a Restore starts. /// /// It gives a chance to the plugin to inform the Restore job about its /// requirements regarding the target topic. If the target topic fails to /// meet them, the restore will fail during the 'preflight checks' phase. fn update_target_topic_requirements( mut req: TargetTopicRequirements, ) -> TargetTopicRequirements { req.no_partitions = get_cfg().no_partitions.get(); req }
/// This is called for every record of a backup. /// /// The plugin must decide whether to accept the record, reject it, or /// return an error. The record may be modified by this function. /// /// For more information on the `Outcome` being returned, see the /// types.wit file. fn transform(mut rec: KafkaRecord) -> Outcome { let no_partitions = get_cfg().no_partitions.get(); let partition = rec.partition.unwrap(); rec.partition = Some(partition % (no_partitions as i32)); rec.offset = None; Outcome::Accepted(rec) }}
bindings::export!(MyPlugin with_types_in bindings);
All that’s left to do is build the component:
cargo component build --release
The compiled wasm file will be found in the target/wasm32-wasip1/release/
folder.
and it can be loaded into a Restore as described above:
apiVersion: kannika.io/v1alphakind: Restoremetadata: name: plugin-restorespec: enabled: true source: "my-kafka-cluster" sink: "my-bucket" config: plugins: - wasiPluginRef: witVersion: "0.1" claimName: plugins-store-pvc path: acme_repartitioning.wasm spec: noPartitions: 3 topics: - target: "bar" source: "foo"
Enjoy!
In Python
In this example, we’re going to build a plugin meant to be used in a Backup that will remove a particular header from records before they are saved to the storage.
Let’s start by creating a dev environment for our plugin:
$ mkdir off-with-their-headers$ cd off-with-their-headers$ python -m venv venv$ source venv/bin/activate
We will need pyyaml to read the configuration:
$ pip install pyyaml
Then let’s install componentize-py
.
This tool will convert our python code into a WebAssembly component.
$ pip install componentize-pyCollecting componentize-py Obtaining dependency information for componentize-py from https://files.pythonhosted.org/packages/f8/da/33010c66e6ad7827d783e91415cba9efe677d875524d525fe39dfccc028f/componentize_py-0.13.5-cp37-abi3-manylinux_2_28_x86_64.whl.metadata Using cached componentize_py-0.13.5-cp37-abi3-manylinux_2_28_x86_64.whl.metadata (4.0 kB)Using cached componentize_py-0.13.5-cp37-abi3-manylinux_2_28_x86_64.whl (40.3 MB)Installing collected packages: componentize-pySuccessfully installed componentize-py-0.13.5
When that’s done,
copy the backup.wit
and types.wit
files to a folder called wit/
and generate python code from them:
$ componentize-py --wit-path wit/ --world wasi-backup-plugin bindings .
We can now write our plugin’s implementation:
import wasi_backup_pluginfrom wasi_backup_plugin import Contextfrom wasi_backup_plugin.types import *from wasi_backup_plugin.imports import types
import yaml
header_to_remove : Optional[str] = None
class WasiBackupPlugin(wasi_backup_plugin.WasiBackupPlugin): def init(self, ctx: Context, cfg: Optional[str]) -> Optional[str]: global header_to_remove if cfg is not None: cfg = yaml.safe_load(cfg) header_to_remove = cfg.get('headerToRemove') return None
def transform(self, rec: types.KafkaRecord) -> types.Outcome: if header_to_remove is not None: rec.headers = [hdr for hdr in rec.headers if hdr.key != header_to_remove] return types.Outcome_Accepted(rec)
We now can compile our plugin to a Wasm component:
componentize-py --wit-path wit/ --world wasi-backup-plugin componentize plugin -o plugin.wasm
The plugin is ready to be used in a Backup to have it remove the header called ‘secret’, before the data is sent to the storage:
apiVersion: kannika.io/v1alphakind: Backupmetadata: name: backup-examplespec: enabled: true source: "my-kafka-cluster" sink: "my-bucket" plugins: - wasiPluginRef: witVersion: "0.1" claimName: plugins-store-pvc path: plugin.wasm spec: # This si the configuration passed to the plugin in the # init(ctx, cfg) function: headerToRemove: "secret" streams: - topic: "foo"
That’s all there is to it !
In Go
At this time, the tooling in Go is not as easy to use as what is available in Rust or Python, so the process is a little bit more involved.
Here’s a short guide to get you started. We’re going to implement a ‘pass-through’ plugin that does nothing. It is meant to serve as a template for a more complex plugin.
We will need:
- the TinyGo toolchain to build a wasm file;
- the wit-bindgen tool to generate bindings from the WIT files;
- the wasm-tools to convert the wasm code into a wasi component.
Linux users will most likely find TinyGo in their distro’s packages.
If you’re a Rust user, the wit-bindgen
and wasm-tools
can be installed through cargo
.
Otherwise, both projects offer pre-compiled binaries in their respective GitHub Releases section.
We will then need to copy two libraries in our project’s directory:
- the Binaryen library required by TinyGo when compiling to Wasm;
- a library to be included in our plugin,
wasi_snapshot_preview1.reactor.wasm
, provided by the wasmtime project
Start by initializing your project:
$ mkdir go-passthru$ cd go-passthru$ go mod init go-passthru
Then, download the libraries:
$ curl -sLo wasi_snapshot_preview1.reactor.wasm https://github.com/bytecodealliance/wasmtime/releases/download/v23.0.2/wasi_snapshot_preview1.reactor.wasm$ curl -sLo binaryen-version_118-x86_64-linux.tar.gz https://github.com/WebAssembly/binaryen/releases/download/version_118/binaryen-version_118-x86_64-linux.tar.gz$ tar -xf binaryen-version_118-x86_64-linux.tar.gz
Copy the restore.wit
and types.wit
files to a folder named wit/
and generate bindings using wit-bindgen
:
$ wit-bindgen tiny-go wit/ --world wasi-restore-plugin --out-dir=gen
From those generated bindings, we can copy and implement the plugin’s interface:
package main
import ( . "go-passthru/gen")
type PluginImpl struct {}
func (i PluginImpl) Init(ctx WasiRestorePluginContext, cfg Option[string]) Result[struct{}, string] { return Ok[struct{}, string](struct{}{})}
func (i PluginImpl) UpdateTargetTopicRequirements(req WasiRestorePluginTargetTopicRequirements) WasiRestorePluginTargetTopicRequirements { return req}
func (i PluginImpl) Transform(rec WasiRestorePluginKafkaRecord) WasiRestorePluginOutcome { return KannikaWasiPlugins0_1_0_TypesOutcomeAccepted(rec)}
// To enable our component to be a library, implement the component in the// `init` function which is always called first when a Go package is run.func init() { plugin := PluginImpl{} SetWasiRestorePlugin(plugin)}
// main is required for the `wasi` target, even if it isn't used.func main() {}
Almost there ! We can now build our wasm file:
$ export WASMOPT="$(pwd)/binaryen-version_118/bin/wasm-opt"$ tinygo build -no-debug -o plugin.wasm -target wasi plugin.go
You should now have a plugin.wasm
file,
but it can’t be used as-is by Kannika Armory.
You must convert it to a component using wasm-tools
:
$ wasm-tools component embed -o plugin.embed.wasm --world wasi-restore-plugin ./wit plugin.wasm$ wasm-tools component new -o plugin.component.wasm --adapt wasi_snapshot_preview1=./wasi_snapshot_preview1.reactor.wasm plugin.embed.wasm
The resulting plugin.component.wasm
may now be used by a Restore.
Glad you made it !