Skip to content

WASI plugins

Kannika Armory supports a plugin system built on top of the WebAssemly System Interface (WASI) and the WebAssembly Component Model.

A WASI plugin is a single WebAssembly (.wasm) file that must be made accessible to the operator using a Persistent Volume Claim (PVC). Once that PVC has been created, you can reference it in a Backup or a Restore by defining a wasiPluginRef in spec.plugins:

apiVersion: kannika.io/v1alpha
kind: Backup
metadata:
name: backup-example
spec:
enabled: true
source: "my-kafka-cluster"
sink: "my-bucket"
plugins:
- wasiPluginRef:
witVersion: "0.1"
claimName: plugins-store-pvc
path: my_plugin.wasm
spec:
truth: "Un tiens vaut mieux que deux tu l'auras"
streams:
- topic: "foo"

In this example, We’ve listed a wasiPluginRef entry in the list of enabled plugins for a Backup.

A wasiPluginRef has some mandatory properties:

  • The witVersion tells the engine we are using version 0.1 of the wit definition files (see below).
  • A claimName that references a persistent volume claim where plugins are stored.
  • A path that tells the Backup where the plugin is located within the persistent volume.

An optional configuration can be added using the spec field. It must be a yaml object that will be forwarded as a string to the plugin’s initialization function.

As always, a topicSelector can be used to enable the plugin on only a subset of topics.

Restore resources accept the same kind of configuration.

If you wish to write your own plugins, then you must implement the interface Kannika Armory expects.

That interface is defined using the Wasm Interface Type language. The latest WIT files for Kannika Armory are located here: https://github.com/kannika-io/armory-plugins-wit

Backups and Restores have slightly different needs so each have their own interface (or world in WIT lingo). Depending on the kind of plugin you wish to develop, you will have to implement one or the other.

Below are some guides to get you started in one of these programming languages:

Here’s an example on how to write a repartitioning plugin to be used with a Restore in Rust. This plugin will accept a number of partitions, no_partition as a configuration option and will simply overwrite the original partition of the record with that logic:

new_partition = old_partition % no_partition

You will need cargo component to build a WebAssembly component:

Terminal window
$ cargo install cargo-component

Then, create a new library project and add the serde and serde_yaml crates to de-serialize the plugin’s configuration.

Terminal window
$ cargo component new acme-repartitioning --lib
$ cd acme-repartitioning
$ cargo add serde --features derive
$ cargo add serde_yaml

Next, remove the template files in the wit/ folder and copy types.wit and restore.wit into it instead.

Also, edit the Cargo.toml file to match the package of the WIT files:

[package]
name = "acme-repartitioning"
version = "0.1.0"
edition = "2021"
[dependencies]
wit-bindgen-rt = { version = "0.29.0", features = ["bitflags"] }
[lib]
crate-type = ["cdylib"]
[profile.release]
codegen-units = 1
opt-level = "s"
debug = false
strip = true
lto = true
[package.metadata.component]
package = "component:acme-repartitioning"
package = "kannika:wasi-plugins"
[package.metadata.component.dependencies]

Your project should look like this:

├── Cargo.lock
├── Cargo.toml
├── src
│   └── lib.rs
└── wit
├── restore.wit
└── types.wit

And here’s the implementation of the plugin:

src/lib.rs
use std::num::NonZeroU64;
use std::sync::OnceLock;
// The bindings are generated by `cargo-component`
#[allow(warnings)]
mod bindings;
use bindings::kannika::wasi_plugins::types::{KafkaRecord, Outcome};
use bindings::{Context, Guest, TargetTopicRequirements};
/// This struct represents our plugin.
struct MyPlugin;
/// The plugin's configuration
#[derive(serde::Deserialize)]
#[serde(rename_all = "camelCase")]
struct MyCfg {
/// Number of partitions
no_partitions: NonZeroU64,
}
impl Default for MyCfg {
fn default() -> Self {
Self { no_partitions: NonZeroU64::MIN }
}
}
/// The plugin's configuration.
/// Set during initialization when `init()` is called.
static CFG: OnceLock<MyCfg> = OnceLock::new();
/// Helper function to retrieve the configuration.
fn get_cfg() -> &'static MyCfg {
CFG.get().expect("init() should have been called")
}
// The `Guest` trait is found in the generated bindings.
// It is the rust-facing interface for the `wasi-restore-plugin` "world".
impl Guest for MyPlugin {
/// This function is the first one called. It is called just after the wasm
/// file has been loaded.
///
/// The `Context` struct gives us some information about the restore job
/// initializing the plugin such as the target topic's name.
///
/// If the plugin's `spec` field was defined in the Restore resource, then
/// `cfg` will hold that configuration as a yaml string that must be de-serialized
/// into a `MyCfg`.
fn init(_ctx: Context, cfg: Option<String>) -> Result<(), String> {
// If configuration is provided, try to de-serialize it.
let cfg = cfg
.as_deref()
.map(serde_yaml::from_str::<MyCfg>)
.transpose()
.map_err(|err| err.to_string())?;
CFG.get_or_init(|| cfg.unwrap_or_default());
Ok(())
}
/// This is called after init(), but before a Restore starts.
///
/// It gives a chance to the plugin to inform the Restore job about its
/// requirements regarding the target topic. If the target topic fails to
/// meet them, the restore will fail during the 'preflight checks' phase.
fn update_target_topic_requirements(
mut req: TargetTopicRequirements,
) -> TargetTopicRequirements {
req.no_partitions = get_cfg().no_partitions.get();
req
}
/// This is called for every record of a backup.
///
/// The plugin must decide whether to accept the record, reject it, or
/// return an error. The record may be modified by this function.
///
/// For more information on the `Outcome` being returned, see the
/// types.wit file.
fn transform(mut rec: KafkaRecord) -> Outcome {
let no_partitions = get_cfg().no_partitions.get();
let partition = rec.partition.unwrap();
rec.partition = Some(partition % (no_partitions as i32));
rec.offset = None;
Outcome::Accepted(rec)
}
}
bindings::export!(MyPlugin with_types_in bindings);

All that’s left to do is build the component:

Terminal window
cargo component build --release

The compiled wasm file will be found in the target/wasm32-wasip1/release/ folder. and it can be loaded into a Restore as described above:

apiVersion: kannika.io/v1alpha
kind: Restore
metadata:
name: plugin-restore
spec:
enabled: true
source: "my-kafka-cluster"
sink: "my-bucket"
config:
plugins:
- wasiPluginRef:
witVersion: "0.1"
claimName: plugins-store-pvc
path: acme_repartitioning.wasm
spec:
noPartitions: 3
topics:
- target: "bar"
source: "foo"

Enjoy!

In this example, we’re going to build a plugin meant to be used in a Backup that will remove a particular header from records before they are saved to the storage.

Let’s start by creating a dev environment for our plugin:

Terminal window
$ mkdir off-with-their-headers
$ cd off-with-their-headers
$ python -m venv venv
$ source venv/bin/activate

We will need pyyaml to read the configuration:

Terminal window
$ pip install pyyaml

Then let’s install componentize-py. This tool will convert our python code into a WebAssembly component.

Terminal window
$ pip install componentize-py
Collecting componentize-py
Obtaining dependency information for componentize-py from https://files.pythonhosted.org/packages/f8/da/33010c66e6ad7827d783e91415cba9efe677d875524d525fe39dfccc028f/componentize_py-0.13.5-cp37-abi3-manylinux_2_28_x86_64.whl.metadata
Using cached componentize_py-0.13.5-cp37-abi3-manylinux_2_28_x86_64.whl.metadata (4.0 kB)
Using cached componentize_py-0.13.5-cp37-abi3-manylinux_2_28_x86_64.whl (40.3 MB)
Installing collected packages: componentize-py
Successfully installed componentize-py-0.13.5

When that’s done, copy the backup.wit and types.wit files to a folder called wit/ and generate python code from them:

Terminal window
$ componentize-py --wit-path wit/ --world wasi-backup-plugin bindings .

We can now write our plugin’s implementation:

plugin.py
import wasi_backup_plugin
from wasi_backup_plugin import Context
from wasi_backup_plugin.types import *
from wasi_backup_plugin.imports import types
import yaml
header_to_remove : Optional[str] = None
class WasiBackupPlugin(wasi_backup_plugin.WasiBackupPlugin):
def init(self, ctx: Context, cfg: Optional[str]) -> Optional[str]:
global header_to_remove
if cfg is not None:
cfg = yaml.safe_load(cfg)
header_to_remove = cfg.get('headerToRemove')
return None
def transform(self, rec: types.KafkaRecord) -> types.Outcome:
if header_to_remove is not None:
rec.headers = [hdr for hdr in rec.headers if hdr.key != header_to_remove]
return types.Outcome_Accepted(rec)

We now can compile our plugin to a Wasm component:

Terminal window
componentize-py --wit-path wit/ --world wasi-backup-plugin componentize plugin -o plugin.wasm

The plugin is ready to be used in a Backup to have it remove the header called ‘secret’, before the data is sent to the storage:

apiVersion: kannika.io/v1alpha
kind: Backup
metadata:
name: backup-example
spec:
enabled: true
source: "my-kafka-cluster"
sink: "my-bucket"
plugins:
- wasiPluginRef:
witVersion: "0.1"
claimName: plugins-store-pvc
path: plugin.wasm
spec:
# This si the configuration passed to the plugin in the
# init(ctx, cfg) function:
headerToRemove: "secret"
streams:
- topic: "foo"

That’s all there is to it !

At this time, the tooling in Go is not as easy to use as what is available in Rust or Python, so the process is a little bit more involved.

Here’s a short guide to get you started. We’re going to implement a ‘pass-through’ plugin that does nothing. It is meant to serve as a template for a more complex plugin.

We will need:

Linux users will most likely find TinyGo in their distro’s packages.

If you’re a Rust user, the wit-bindgen and wasm-tools can be installed through cargo. Otherwise, both projects offer pre-compiled binaries in their respective GitHub Releases section.

We will then need to copy two libraries in our project’s directory:

  • the Binaryen library required by TinyGo when compiling to Wasm;
  • a library to be included in our plugin, wasi_snapshot_preview1.reactor.wasm, provided by the wasmtime project

Start by initializing your project:

Terminal window
$ mkdir go-passthru
$ cd go-passthru
$ go mod init go-passthru

Then, download the libraries:

Terminal window
$ curl -sLo wasi_snapshot_preview1.reactor.wasm https://github.com/bytecodealliance/wasmtime/releases/download/v23.0.2/wasi_snapshot_preview1.reactor.wasm
$ curl -sLo binaryen-version_118-x86_64-linux.tar.gz https://github.com/WebAssembly/binaryen/releases/download/version_118/binaryen-version_118-x86_64-linux.tar.gz
$ tar -xf binaryen-version_118-x86_64-linux.tar.gz

Copy the restore.wit and types.wit files to a folder named wit/ and generate bindings using wit-bindgen:

Terminal window
$ wit-bindgen tiny-go wit/ --world wasi-restore-plugin --out-dir=gen

From those generated bindings, we can copy and implement the plugin’s interface:

plugin.go
package main
import (
. "go-passthru/gen"
)
type PluginImpl struct {
}
func (i PluginImpl) Init(ctx WasiRestorePluginContext, cfg Option[string]) Result[struct{}, string] {
return Ok[struct{}, string](struct{}{})
}
func (i PluginImpl) UpdateTargetTopicRequirements(req WasiRestorePluginTargetTopicRequirements) WasiRestorePluginTargetTopicRequirements {
return req
}
func (i PluginImpl) Transform(rec WasiRestorePluginKafkaRecord) WasiRestorePluginOutcome {
return KannikaWasiPlugins0_1_0_TypesOutcomeAccepted(rec)
}
// To enable our component to be a library, implement the component in the
// `init` function which is always called first when a Go package is run.
func init() {
plugin := PluginImpl{}
SetWasiRestorePlugin(plugin)
}
// main is required for the `wasi` target, even if it isn't used.
func main() {}

Almost there ! We can now build our wasm file:

Terminal window
$ export WASMOPT="$(pwd)/binaryen-version_118/bin/wasm-opt"
$ tinygo build -no-debug -o plugin.wasm -target wasi plugin.go

You should now have a plugin.wasm file, but it can’t be used as-is by Kannika Armory. You must convert it to a component using wasm-tools:

Terminal window
$ wasm-tools component embed -o plugin.embed.wasm --world wasi-restore-plugin ./wit plugin.wasm
$ wasm-tools component new -o plugin.component.wasm --adapt wasi_snapshot_preview1=./wasi_snapshot_preview1.reactor.wasm plugin.embed.wasm

The resulting plugin.component.wasm may now be used by a Restore.

Glad you made it !