Skip to content

    WASI plugins

    Kannika Armory supports a plugin system built on top of the WebAssemly System Interface (WASI) and the WebAssembly Component Model.

    Using WASI plugins

    A WASI plugin is a single WebAssembly (.wasm) file that must be made accessible to the operator using a Persistent Volume Claim (PVC). Once that PVC has been created, you can reference it in a Backup or a Restore by defining a wasiPluginRef in spec.plugins:

    apiVersion: kannika.io/v1alpha
    kind: Backup
    metadata:
    name: backup-example
    spec:
    enabled: true
    source: "my-kafka-cluster"
    sink: "my-bucket"
    plugins:
    - wasiPluginRef:
    witVersion: "0.1"
    claimName: plugins-store-pvc
    path: my_plugin.wasm
    spec:
    truth: "Un tiens vaut mieux que deux tu l'auras"
    streams:
    - topic: "foo"

    In this example, We’ve listed a wasiPluginRef entry in the list of enabled plugins for a Backup.

    A wasiPluginRef has some mandatory properties:

    • The witVersion tells the engine we are using version 0.1 of the wit definition files (see below).
    • A claimName that references a persistent volume claim where plugins are stored.
    • A path that tells the Backup where the plugin is located within the persistent volume.

    An optional configuration can be added using the spec field. It must be a yaml object that will be forwarded as a string to the plugin’s initialization function.

    As always, a topicSelector can be used to enable the plugin on only a subset of topics.

    Restore resources accept the same kind of configuration.

    Writing a WASI plugin

    If you wish to write your own plugins, then you must implement the interface Kannika Armory expects.

    That interface is defined using the Wasm Interface Type language. The latest WIT files for Kannika Armory are located here: https://github.com/kannika-io/armory-plugins-wit

    Backups and Restores have slightly different needs so each have their own interface (or world in WIT lingo). Depending on the kind of plugin you wish to develop, you will have to implement one or the other.

    Below are some guides to get you started in one of these programming languages:

    In Rust

    Here’s an example on how to write a repartitioning plugin to be used with a Restore in Rust. This plugin will accept a number of partitions, no_partition as a configuration option and will simply overwrite the original partition of the record with that logic:

    new_partition = old_partition % no_partition

    You will need cargo component to build a WebAssembly component:

    Terminal window
    $ cargo install cargo-component

    Then, create a new library project and add the serde and serde_yaml crates to de-serialize the plugin’s configuration.

    Terminal window
    $ cargo component new acme-repartitioning --lib
    $ cd acme-repartitioning
    $ cargo add serde --features derive
    $ cargo add serde_yaml

    Next, remove the template files in the wit/ folder and copy types.wit and restore.wit into it instead.

    Also, edit the Cargo.toml file to match the package of the WIT files:

    [package]
    name = "acme-repartitioning"
    version = "0.1.0"
    edition = "2021"
    [dependencies]
    wit-bindgen-rt = { version = "0.29.0", features = ["bitflags"] }
    [lib]
    crate-type = ["cdylib"]
    [profile.release]
    codegen-units = 1
    opt-level = "s"
    debug = false
    strip = true
    lto = true
    [package.metadata.component]
    package = "component:acme-repartitioning"
    package = "kannika:wasi-plugins"
    [package.metadata.component.dependencies]

    Your project should look like this:

    ├── Cargo.lock
    ├── Cargo.toml
    ├── src
    │   └── lib.rs
    └── wit
    ├── restore.wit
    └── types.wit

    And here’s the implementation of the plugin:

    src/lib.rs
    use std::num::NonZeroU64;
    use std::sync::OnceLock;
    // The bindings are generated by `cargo-component`
    #[allow(warnings)]
    mod bindings;
    use bindings::kannika::wasi_plugins::types::{KafkaRecord, Outcome};
    use bindings::{Context, Guest, TargetTopicRequirements};
    /// This struct represents our plugin.
    struct MyPlugin;
    /// The plugin's configuration
    #[derive(serde::Deserialize)]
    #[serde(rename_all = "camelCase")]
    struct MyCfg {
    /// Number of partitions
    no_partitions: NonZeroU64,
    }
    impl Default for MyCfg {
    fn default() -> Self {
    Self { no_partitions: NonZeroU64::MIN }
    }
    }
    /// The plugin's configuration.
    /// Set during initialization when `init()` is called.
    static CFG: OnceLock<MyCfg> = OnceLock::new();
    /// Helper function to retrieve the configuration.
    fn get_cfg() -> &'static MyCfg {
    CFG.get().expect("init() should have been called")
    }
    // The `Guest` trait is found in the generated bindings.
    // It is the rust-facing interface for the `wasi-restore-plugin` "world".
    impl Guest for MyPlugin {
    /// This function is the first one called. It is called just after the wasm
    /// file has been loaded.
    ///
    /// The `Context` struct gives us some information about the restore job
    /// initializing the plugin such as the target topic's name.
    ///
    /// If the plugin's `spec` field was defined in the Restore resource, then
    /// `cfg` will hold that configuration as a yaml string that must be de-serialized
    /// into a `MyCfg`.
    fn init(_ctx: Context, cfg: Option<String>) -> Result<(), String> {
    // If configuration is provided, try to de-serialize it.
    let cfg = cfg
    .as_deref()
    .map(serde_yaml::from_str::<MyCfg>)
    .transpose()
    .map_err(|err| err.to_string())?;
    CFG.get_or_init(|| cfg.unwrap_or_default());
    Ok(())
    }
    /// This is called after init(), but before a Restore starts.
    ///
    /// It gives a chance to the plugin to inform the Restore job about its
    /// requirements regarding the target topic. If the target topic fails to
    /// meet them, the restore will fail during the 'pre-flight checks' phase.
    fn update_target_topic_requirements(
    mut req: TargetTopicRequirements,
    ) -> TargetTopicRequirements {
    req.no_partitions = get_cfg().no_partitions.get();
    req
    }
    /// This is called for every record of a backup.
    ///
    /// The plugin must decide whether to accept the record, reject it, or
    /// return an error. The record may be modified by this function.
    ///
    /// For more information on the `Outcome` being returned, see the
    /// types.wit file.
    fn transform(mut rec: KafkaRecord) -> Outcome {
    let no_partitions = get_cfg().no_partitions.get();
    let partition = rec.partition.unwrap();
    rec.partition = Some(partition % (no_partitions as i32));
    rec.offset = None;
    Outcome::Accepted(rec)
    }
    }
    bindings::export!(MyPlugin with_types_in bindings);

    All that’s left to do is build the component:

    Terminal window
    cargo component build --release

    The compiled wasm file will be found in the target/wasm32-wasip1/release/ folder. and it can be loaded into a Restore as described above:

    apiVersion: kannika.io/v1alpha
    kind: Restore
    metadata:
    name: plugin-restore
    spec:
    enabled: true
    source: "my-kafka-cluster"
    sink: "my-bucket"
    config:
    plugins:
    - wasiPluginRef:
    witVersion: "0.1"
    claimName: plugins-store-pvc
    path: acme_repartitioning.wasm
    spec:
    noPartitions: 3
    topics:
    - target: "bar"
    source: "foo"

    Enjoy!

    In Python

    In this example, we’re going to build a plugin meant to be used in a Backup that will remove a particular header from records before they are saved to the storage.

    Let’s start by creating a dev environment for our plugin:

    Terminal window
    $ mkdir off-with-their-headers
    $ cd off-with-their-headers
    $ python -m venv venv
    $ source venv/bin/activate

    We will need pyyaml to read the configuration:

    Terminal window
    $ pip install pyyaml

    Then let’s install componentize-py. This tool will convert our python code into a WebAssembly component.

    Terminal window
    $ pip install componentize-py
    Collecting componentize-py
    Obtaining dependency information for componentize-py from https://files.pythonhosted.org/packages/f8/da/33010c66e6ad7827d783e91415cba9efe677d875524d525fe39dfccc028f/componentize_py-0.13.5-cp37-abi3-manylinux_2_28_x86_64.whl.metadata
    Using cached componentize_py-0.13.5-cp37-abi3-manylinux_2_28_x86_64.whl.metadata (4.0 kB)
    Using cached componentize_py-0.13.5-cp37-abi3-manylinux_2_28_x86_64.whl (40.3 MB)
    Installing collected packages: componentize-py
    Successfully installed componentize-py-0.13.5

    When that’s done, copy the backup.wit and types.wit files to a folder called wit/ and generate python code from them:

    Terminal window
    $ componentize-py --wit-path wit/ --world wasi-backup-plugin bindings .

    We can now write our plugin’s implementation:

    plugin.py
    import wasi_backup_plugin
    from wasi_backup_plugin import Context
    from wasi_backup_plugin.types import *
    from wasi_backup_plugin.imports import types
    import yaml
    header_to_remove : Optional[str] = None
    class WasiBackupPlugin(wasi_backup_plugin.WasiBackupPlugin):
    def init(self, ctx: Context, cfg: Optional[str]) -> Optional[str]:
    global header_to_remove
    if cfg is not None:
    cfg = yaml.safe_load(cfg)
    header_to_remove = cfg.get('headerToRemove')
    return None
    def transform(self, rec: types.KafkaRecord) -> types.Outcome:
    if header_to_remove is not None:
    rec.headers = [hdr for hdr in rec.headers if hdr.key != header_to_remove]
    return types.Outcome_Accepted(rec)

    We now can compile our plugin to a Wasm component:

    Terminal window
    componentize-py --wit-path wit/ --world wasi-backup-plugin componentize plugin -o plugin.wasm

    The plugin is ready to be used in a Backup to have it remove the header called ‘secret’, before the data is sent to the storage:

    apiVersion: kannika.io/v1alpha
    kind: Backup
    metadata:
    name: backup-example
    spec:
    enabled: true
    source: "my-kafka-cluster"
    sink: "my-bucket"
    plugins:
    - wasiPluginRef:
    witVersion: "0.1"
    claimName: plugins-store-pvc
    path: plugin.wasm
    spec:
    # This si the configuration passed to the plugin in the
    # init(ctx, cfg) function:
    headerToRemove: "secret"
    streams:
    - topic: "foo"

    That’s all there is to it !

    In Go

    At this time, the tooling in Go is not as easy to use as what is available in Rust or Python, so the process is a little bit more involved.

    Here’s a short guide to get you started. We’re going to implement a ‘pass-through’ plugin that does nothing. It is meant to serve as a template for a more complex plugin.

    We will need:

    Linux users will most likely find TinyGo in their distro’s packages.

    If you’re a Rust user, the wit-bindgen and wasm-tools can be installed through cargo. Otherwise, both projects offer pre-compiled binaries in their respective GitHub Releases section.

    We will then need to copy two libraries in our project’s directory:

    • the Binaryen library required by TinyGo when compiling to Wasm;
    • a library to be included in our plugin, wasi_snapshot_preview1.reactor.wasm, provided by the wasmtime project

    Start by initializing your project:

    Terminal window
    $ mkdir go-passthru
    $ cd go-passthru
    $ go mod init go-passthru

    Then, download the libraries:

    Terminal window
    $ curl -sLo wasi_snapshot_preview1.reactor.wasm https://github.com/bytecodealliance/wasmtime/releases/download/v23.0.2/wasi_snapshot_preview1.reactor.wasm
    $ curl -sLo binaryen-version_118-x86_64-linux.tar.gz https://github.com/WebAssembly/binaryen/releases/download/version_118/binaryen-version_118-x86_64-linux.tar.gz
    $ tar -xf binaryen-version_118-x86_64-linux.tar.gz

    Copy the restore.wit and types.wit files to a folder named wit/ and generate bindings using wit-bindgen:

    Terminal window
    $ wit-bindgen tiny-go wit/ --world wasi-restore-plugin --out-dir=gen

    From those generated bindings, we can copy and implement the plugin’s interface:

    plugin.go
    package main
    import (
    . "go-passthru/gen"
    )
    type PluginImpl struct {
    }
    func (i PluginImpl) Init(ctx WasiRestorePluginContext, cfg Option[string]) Result[struct{}, string] {
    return Ok[struct{}, string](struct{}{})
    }
    func (i PluginImpl) UpdateTargetTopicRequirements(req WasiRestorePluginTargetTopicRequirements) WasiRestorePluginTargetTopicRequirements {
    return req
    }
    func (i PluginImpl) Transform(rec WasiRestorePluginKafkaRecord) WasiRestorePluginOutcome {
    return KannikaWasiPlugins0_1_0_TypesOutcomeAccepted(rec)
    }
    // To enable our component to be a library, implement the component in the
    // `init` function which is always called first when a Go package is run.
    func init() {
    plugin := PluginImpl{}
    SetWasiRestorePlugin(plugin)
    }
    // main is required for the `wasi` target, even if it isn't used.
    func main() {}

    Almost there ! We can now build our wasm file:

    Terminal window
    $ export WASMOPT="$(pwd)/binaryen-version_118/bin/wasm-opt"
    $ tinygo build -no-debug -o plugin.wasm -target wasi plugin.go

    You should now have a plugin.wasm file, but it can’t be used as-is by Kannika Armory. You must convert it to a component using wasm-tools:

    Terminal window
    $ wasm-tools component embed -o plugin.embed.wasm --world wasi-restore-plugin ./wit plugin.wasm
    $ wasm-tools component new -o plugin.component.wasm --adapt wasi_snapshot_preview1=./wasi_snapshot_preview1.reactor.wasm plugin.embed.wasm

    The resulting plugin.component.wasm may now be used by a Restore.

    Glad you made it !