Skip to content

Plugin Development

This guide explains how to build a custom Rivet plugin. Plugins extend Rivet with new engines, catalogs, sources, and sinks by implementing abstract base classes from rivet_core and registering via Python entry points.


Canonical Plugin Structure

All first-party plugins follow this directory layout. Third-party plugins should match it for consistency.

rivet_{name}/
├── __init__.py          # Registration function: core-first, adapters best-effort
├── engine.py            # ComputeEnginePlugin (if applicable)
├── catalog.py           # CatalogPlugin (if applicable)
├── source.py            # SourcePlugin (if applicable)
├── sink.py              # SinkPlugin (if applicable)
├── errors.py            # Error mapping utilities (if applicable, NOT _errors.py)
├── adapters/            # Only if plugin has ComputeEngineAdapters
│   ├── __init__.py
│   ├── s3.py            # Named by the catalog/engine type they bridge
│   ├── glue.py
│   └── unity.py
└── pyproject.toml

Rules:

  • Adapter modules live in an adapters/ subdirectory, never as flat files in the plugin root.
  • Adapter files are named by the catalog or engine type they bridge (e.g., s3.py, glue.py, unity.py, duckdb.py, pyspark.py).
  • If a plugin has no adapters, omit the adapters/ directory entirely.
  • Error modules are named errors.py (not _errors.py).

Canonical __init__.py Registration Pattern

The registration function in __init__.py follows a strict two-phase pattern:

  1. Core components first — catalogs, engines, sources, sinks, and cross-joint adapters are imported and registered without try/except guards. These are always available.
  2. Optional adapters second — each ComputeEngineAdapter is registered in its own try/except ImportError: pass block, since adapters depend on optional third-party packages that may not be installed.
"""rivet_{name} — {Description} plugin for Rivet."""

from __future__ import annotations
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from rivet_core.plugins import PluginRegistry


def {Name}Plugin(registry: PluginRegistry) -> None:
    """Register all rivet_{name} components into the plugin registry.

    Core components (catalog, engine, source, sink) are always registered.
    Cross-catalog adapters are registered best-effort since they depend
    on optional packages.
    """
    # 1. Core components — always registered, no try/except
    from rivet_{name}.engine import {Name}ComputeEnginePlugin

    registry.register_engine_plugin({Name}ComputeEnginePlugin())

    # 2. Optional adapters — each in its own try/except ImportError: pass
    try:
        from rivet_{name}.adapters.s3 import S3{Name}Adapter

        registry.register_adapter(S3{Name}Adapter())
    except ImportError:
        pass

If a plugin has no optional adapters (e.g., rivet_aws), the best-effort section and its docstring explanation are omitted entirely.


Import Boundary Rule

Plugins import only rivet_core public API. A plugin must never import:

  • Another plugin
  • rivet_config
  • rivet_bridge
  • rivet_cli

This boundary is enforced as a build failure, not a warning. It ensures plugins remain decoupled and independently installable.

# ✅ Correct
from rivet_core.plugins import CatalogPlugin, PluginRegistry
from rivet_core.models import Catalog, ComputeEngine, Joint, Material

# ❌ Wrong — never import other plugins or internal packages
from rivet_duckdb.engine import DuckDBComputeEnginePlugin  # no
from rivet_config.manifest import ProjectManifest           # no

Plugin ABCs

All plugin ABCs live in rivet_core.plugins. Each ABC defines the contract a plugin must satisfy.

CatalogPlugin

Connects Rivet to a data store (database, filesystem, object store). Provides validation, instantiation, table reference resolution, and optional introspection.

from abc import ABC, abstractmethod
from typing import Any

from rivet_core.models import Catalog, CatalogNode, ObjectSchema, ObjectMetadata


class CatalogPlugin(ABC):
    type: str                              # unique catalog type identifier
    required_options: list[str]            # options that must be present
    optional_options: dict[str, Any]       # options with default values
    credential_options: list[str]          # sensitive options (masked in logs)
    credential_groups: dict[str, list[str]] = {}  # auth_type → relevant credential options
    env_var_hints: dict[str, str] = {}     # option → environment variable name

    @abstractmethod
    def validate(self, options: dict[str, Any]) -> None:
        """Raise on invalid options."""

    @abstractmethod
    def instantiate(self, name: str, options: dict[str, Any]) -> Catalog:
        """Create a Catalog instance from config."""

    @abstractmethod
    def default_table_reference(self, logical_name: str, options: dict[str, Any]) -> str:
        """Map a logical joint name to a physical table reference."""

    def resolve_table_reference(self, logical_name: str, catalog: Catalog) -> str:
        """Resolve logical table name to physical reference.

        Priority: table_map entry > default_table_reference > logical name passthrough.
        """

    # Optional introspection — override to enable catalog browsing
    def list_tables(self, catalog: Catalog) -> list[CatalogNode]: ...
    def get_schema(self, catalog: Catalog, table: str) -> ObjectSchema: ...
    def test_connection(self, catalog: Catalog) -> None: ...
    def get_metadata(self, catalog: Catalog, table: str) -> ObjectMetadata | None: ...
    def list_children(self, catalog: Catalog, path: list[str]) -> list[CatalogNode]:
        """List immediate children of a path. Default filters list_tables()."""

ComputeEnginePlugin

Creates a compute engine instance and executes SQL against it.

import pyarrow

from rivet_core.models import ComputeEngine


class ComputeEnginePlugin(ABC):
    engine_type: str                                    # unique engine type identifier
    supported_catalog_types: dict[str, list[str]]       # catalog_type → capabilities

    @abstractmethod
    def create_engine(self, name: str, config: dict[str, Any]) -> ComputeEngine:
        """Create a ComputeEngine instance from config."""

    @abstractmethod
    def validate(self, options: dict[str, Any]) -> None:
        """Raise on invalid engine options."""

    @abstractmethod
    def execute_sql(
        self,
        engine: ComputeEngine,
        sql: str,
        input_tables: dict[str, pyarrow.Table],
    ) -> pyarrow.Table:
        """Execute SQL and return results as an Arrow table."""

    @property
    def materialization_strategy_name(self) -> str | None:
        """Preferred materialization strategy name, or None for default (arrow)."""
        return None

    def get_reference_resolver(self) -> ReferenceResolver | None:
        """Return a ReferenceResolver for rewriting SQL references, or None."""
        return None

    def collect_metrics(self, execution_context: Any) -> PluginMetrics | None:
        """Collect engine-specific metrics after execution, or None."""
        return None

    @property
    def default_concurrency_limit(self) -> int:
        """Default max fused groups executing in parallel on this engine.

        Override to reflect the backend's natural parallelism (e.g., 4 for
        Databricks SQL warehouses). Users can further override per-engine
        via config["concurrency_limit"] in profiles.yaml.
        """
        return 1

ComputeEngineAdapter

Bridges an engine and a catalog by dispatching read and write operations. Adapters are registered for a specific (engine_type, catalog_type) pair.

read_dispatch accepts an optional pushdown parameter (PushdownPlan | None) and returns an AdapterPushdownResult containing the materialized data and a ResidualPlan listing any pushdown operations the adapter could not apply. When pushdown is None, the adapter performs a full scan and returns an empty residual.

from rivet_core.plugins import ComputeEngineAdapter
from rivet_core.optimizer import PushdownPlan, AdapterPushdownResult, ResidualPlan, EMPTY_RESIDUAL


class ComputeEngineAdapter(ABC):
    target_engine_type: str        # engine type this adapter serves
    catalog_type: str              # catalog type this adapter handles
    capabilities: list[str]        # e.g. ["read", "write"]
    source: str                    # "engine_plugin" or "catalog_plugin"
    source_plugin: str | None      # package name, e.g. "rivet_postgres"

    @abstractmethod
    def read_dispatch(
        self,
        engine: Any,
        catalog: Any,
        joint: Any,
        pushdown: PushdownPlan | None = None,
    ) -> AdapterPushdownResult:
        """Read data from catalog through engine.

        When pushdown is provided, the adapter should apply as many operations
        as possible (predicates, projections, limit, casts) natively. Any
        operations that cannot be applied are returned in the residual, and the
        executor applies them post-materialization via PyArrow.

        When pushdown is None, perform a full scan and return EMPTY_RESIDUAL.
        """

    @abstractmethod
    def write_dispatch(self, engine: Any, catalog: Any, joint: Any, material: Any) -> Any:
        """Write materialized data to catalog through engine."""

    def supports_native_sql_write(self, write_strategy: str) -> bool:
        """Whether this adapter supports native SQL write for the given strategy.

        Returns False by default — adapters opt in by overriding.
        When True, the executor passes a NativeSqlWriteContext as the material
        argument to write_dispatch instead of an Arrow-based Material.
        """
        return False

Native SQL Write Support

When the compute engine and catalog share the same backend, an adapter can opt into native SQL write by overriding supports_native_sql_write(). This lets the executor embed the fused SQL directly into the write DDL, eliminating the Arrow round-trip.

The executor passes a NativeSqlWriteContext (instead of a Material) as the material argument to write_dispatch(). The adapter detects the mode via isinstance:

from rivet_core.plugins import ComputeEngineAdapter, NativeSqlWriteContext

_NATIVE_WRITE_STRATEGIES = frozenset({"replace", "append", "truncate_insert"})


class MyLocalAdapter(ComputeEngineAdapter):
    target_engine_type = "my_engine"
    catalog_type = "my_catalog"
    capabilities = ["read", "write", "native_sql_write"]
    source = "engine_plugin"
    source_plugin = "rivet_my_plugin"

    def supports_native_sql_write(self, write_strategy: str) -> bool:
        return write_strategy in _NATIVE_WRITE_STRATEGIES

    def write_dispatch(self, engine, catalog, joint, material):
        if isinstance(material, NativeSqlWriteContext):
            # Native SQL write — embed fused SQL in DDL
            target = material.target_table
            sql = material.fused_sql
            if material.write_strategy == "replace":
                execute(f"CREATE TABLE {target} AS {sql}")
            elif material.write_strategy == "append":
                execute(f"INSERT INTO {target} {sql}")
            elif material.write_strategy == "truncate_insert":
                execute(f"DELETE FROM {target}")
                execute(f"INSERT INTO {target} {sql}")
            return None
        # Arrow fallback for unsupported strategies
        return self._arrow_write(engine, catalog, joint, material)

NativeSqlWriteContext is a frozen dataclass with these fields:

Field Type Description
fused_sql str The fused SQL from the group (CTE chain or final SELECT)
target_table str Target table name in the catalog
write_strategy str Write strategy (replace, append, truncate_insert)
input_tables dict[str, pyarrow.Table] Upstream materialized Arrow tables for registration
engine Any ComputeEngine instance
catalog Any Catalog instance
joint Any Joint metadata

Implementing Pushdown in a Custom Adapter

Each pushdown operation should be wrapped in try/except so that failures degrade gracefully — the failed operation moves to the residual instead of raising an error.

from rivet_core.plugins import ComputeEngineAdapter
from rivet_core.optimizer import (
    PushdownPlan,
    AdapterPushdownResult,
    ResidualPlan,
    EMPTY_RESIDUAL,
)
from rivet_core.models import Material


class MyAdapter(ComputeEngineAdapter):
    target_engine_type = "my_engine"
    catalog_type = "my_catalog"
    capabilities = ["read", "write"]
    source = "engine_plugin"
    source_plugin = "rivet_my_plugin"

    def read_dispatch(self, engine, catalog, joint, pushdown=None):
        # 1. Read data from source (full scan)
        df = self._read_full(engine, catalog, joint)

        if pushdown is None:
            return AdapterPushdownResult(
                material=Material(materialized_ref=df),
                residual=EMPTY_RESIDUAL,
            )

        residual_predicates = list(pushdown.predicates.residual)
        residual_casts = list(pushdown.casts.residual)
        residual_limit = pushdown.limit.residual_limit

        # 2. Apply predicates
        for pred in pushdown.predicates.pushed:
            try:
                df = df.filter(pred.expression)
            except Exception:
                residual_predicates.append(pred)

        # 3. Apply projection
        try:
            if pushdown.projections.pushed_columns:
                df = df.select(pushdown.projections.pushed_columns)
        except Exception:
            pass  # full columns already present

        # 4. Apply limit
        try:
            if pushdown.limit.pushed_limit is not None:
                df = df.head(pushdown.limit.pushed_limit)
        except Exception:
            residual_limit = pushdown.limit.pushed_limit

        # 5. Apply casts
        for cast in pushdown.casts.pushed:
            try:
                df = df.cast_column(cast.column, cast.to_type)
            except Exception:
                residual_casts.append(cast)

        return AdapterPushdownResult(
            material=Material(materialized_ref=df),
            residual=ResidualPlan(
                predicates=residual_predicates,
                limit=residual_limit,
                casts=residual_casts,
            ),
        )

    def write_dispatch(self, engine, catalog, joint, material):
        ...

SourcePlugin

Reads data from a catalog into a materialization.

from rivet_core.plugins import SourcePlugin
from rivet_core.models import Catalog, Joint, Material


class SourcePlugin(ABC):
    @abstractmethod
    def read(self, catalog: Catalog, joint: Joint, pushdown: Any | None) -> Material:
        """Read data from the catalog for the given joint."""

SinkPlugin

Writes materialized data to a catalog.

from rivet_core.plugins import SinkPlugin
from rivet_core.models import Catalog, Joint, Material


class SinkPlugin(ABC):
    @abstractmethod
    def write(self, catalog: Catalog, joint: Joint, material: Material, strategy: str) -> None:
        """Write data to the catalog using the specified write strategy."""

CrossJointAdapter

Declares how a consumer engine accesses upstream data at an engine boundary. Registered for a specific (consumer_engine_type, producer_engine_type) pair.

from rivet_core.plugins import CrossJointAdapter, CrossJointContext, UpstreamResolution
from rivet_core.strategies import MaterializedRef


class CrossJointAdapter(ABC):
    consumer_engine_type: str
    producer_engine_type: str

    @abstractmethod
    def resolve_upstream(
        self,
        producer_ref: MaterializedRef,
        consumer_engine: Any,
        joint_context: CrossJointContext,
    ) -> UpstreamResolution:
        """Resolve how the consumer engine accesses upstream data.

        Returns an UpstreamResolution with strategy:
        - "arrow_passthrough": pass Arrow data directly
        - "native_reference": use a catalog table reference (zero-copy)
        - "unsupported": cannot transfer data between these engines
        """

ReferenceResolver

Rewrites abstract table references into engine-native expressions at compile time. Returned by ComputeEnginePlugin.get_reference_resolver().

from rivet_core.plugins import ReferenceResolver


class ReferenceResolver(ABC):
    @abstractmethod
    def resolve_references(
        self,
        sql: str,
        joint: Any,
        catalog: Any,
        compiled_joints: dict[str, Any] | None = None,
        catalog_map: dict[str, Any] | None = None,
        fused_group_joints: list[str] | None = None,
    ) -> str | None:
        """Return rewritten SQL, or None if unchanged.

        fused_group_joints contains joint names in the same fused group
        (CTE aliases that must NOT be replaced with fully-qualified names).
        """

Entry Points and Registration

Rivet discovers plugins at startup via Python entry points. There are two registration patterns:

Register all components through a single function in the rivet.plugins entry point group. This is the pattern all built-in plugins use.

The entry point resolves to a callable plugin_fn(registry: PluginRegistry) -> None that registers all components at once:

# pyproject.toml
[project.entry-points."rivet.plugins"]
my_plugin = "rivet_my_plugin:MyPlugin"
# rivet_my_plugin/__init__.py
from __future__ import annotations
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from rivet_core.plugins import PluginRegistry


def MyPlugin(registry: PluginRegistry) -> None:
    from rivet_my_plugin.catalog import MyCatalogPlugin
    from rivet_my_plugin.engine import MyEnginePlugin

    registry.register_catalog_plugin(MyCatalogPlugin())
    registry.register_engine_plugin(MyEnginePlugin())

Granular Registration

Alternatively, register individual components in separate entry point groups. Each entry point resolves to a plugin class that is instantiated and registered automatically:

Entry point group Registration method ABC
rivet.catalogs register_catalog_plugin CatalogPlugin
rivet.compute_engines register_engine_plugin ComputeEnginePlugin
rivet.compute_engine_adapters register_adapter ComputeEngineAdapter
rivet.sources register_source SourcePlugin
rivet.sinks register_sink SinkPlugin
rivet.cross_joint_adapters register_cross_joint_adapter CrossJointAdapter
# pyproject.toml — granular registration
[project.entry-points."rivet.catalogs"]
my_catalog = "rivet_my_plugin.catalog:MyCatalogPlugin"

[project.entry-points."rivet.compute_engines"]
my_engine = "rivet_my_plugin.engine:MyEnginePlugin"

Complete Example: Minimal CatalogPlugin

A SQLite-backed catalog plugin that stores tables in a single database file.

# rivet_sqlite/catalog.py
from __future__ import annotations

import sqlite3
from typing import Any

from rivet_core.models import Catalog, CatalogNode, ObjectSchema
from rivet_core.plugins import CatalogPlugin


class SQLiteCatalogPlugin(CatalogPlugin):
    type = "sqlite"
    required_options = ["path"]
    optional_options: dict[str, Any] = {}
    credential_options: list[str] = []

    def validate(self, options: dict[str, Any]) -> None:
        if "path" not in options:
            raise ValueError("SQLite catalog requires 'path' option")

    def instantiate(self, name: str, options: dict[str, Any]) -> Catalog:
        return Catalog(name=name, catalog_type=self.type, options=options)

    def default_table_reference(self, logical_name: str, options: dict[str, Any]) -> str:
        return logical_name

    def list_tables(self, catalog: Catalog) -> list[CatalogNode]:
        conn = sqlite3.connect(catalog.options["path"])
        cursor = conn.execute(
            "SELECT name FROM sqlite_master WHERE type='table'"
        )
        tables = [CatalogNode(name=row[0], path=[row[0]], node_type="table") for row in cursor]
        conn.close()
        return tables

    def get_schema(self, catalog: Catalog, table: str) -> ObjectSchema:
        conn = sqlite3.connect(catalog.options["path"])
        cursor = conn.execute(f"PRAGMA table_info('{table}')")
        columns = {row[1]: row[2] for row in cursor}
        conn.close()
        return ObjectSchema(columns=columns)

    def test_connection(self, catalog: Catalog) -> None:
        conn = sqlite3.connect(catalog.options["path"])
        conn.execute("SELECT 1")
        conn.close()

Complete Example: Minimal ComputeEnginePlugin

A compute engine plugin that uses SQLite for SQL execution.

# rivet_sqlite/engine.py
from __future__ import annotations

import sqlite3
from typing import Any

import pyarrow

from rivet_core.models import ComputeEngine
from rivet_core.plugins import ComputeEnginePlugin


class SQLiteComputeEnginePlugin(ComputeEnginePlugin):
    engine_type = "sqlite"
    supported_catalog_types: dict[str, list[str]] = {
        "sqlite": ["read", "write"],
    }

    def create_engine(self, name: str, config: dict[str, Any]) -> ComputeEngine:
        return ComputeEngine(name=name, engine_type=self.engine_type, config=config)

    def validate(self, options: dict[str, Any]) -> None:
        pass  # no required engine options

    def execute_sql(
        self,
        engine: ComputeEngine,
        sql: str,
        input_tables: dict[str, pyarrow.Table],
    ) -> pyarrow.Table:
        conn = sqlite3.connect(":memory:")
        # Register input tables
        for table_name, arrow_table in input_tables.items():
            rows = arrow_table.to_pydict()
            if rows:
                cols = list(rows.keys())
                n_rows = len(next(iter(rows.values())))
                conn.execute(
                    f"CREATE TABLE {table_name} ({', '.join(cols)})"
                )
                for i in range(n_rows):
                    values = [rows[c][i] for c in cols]
                    placeholders = ", ".join("?" for _ in cols)
                    conn.execute(
                        f"INSERT INTO {table_name} VALUES ({placeholders})",
                        values,
                    )
        cursor = conn.execute(sql)
        columns = [desc[0] for desc in cursor.description] if cursor.description else []
        data = cursor.fetchall()
        conn.close()
        arrays = {
            col: [row[i] for row in data] for i, col in enumerate(columns)
        }
        return pyarrow.table(arrays)

Registering the Plugin

# pyproject.toml
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "rivet-sqlite"
version = "0.1.0"
description = "SQLite plugin for Rivet"
requires-python = ">=3.11"
dependencies = ["rivet-core>=0.1.0"]

[project.entry-points."rivet.plugins"]
sqlite = "rivet_sqlite:SQLitePlugin"

[tool.hatch.build.targets.wheel]
packages = ["rivet_sqlite"]
# rivet_sqlite/__init__.py
from __future__ import annotations
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from rivet_core.plugins import PluginRegistry


def SQLitePlugin(registry: PluginRegistry) -> None:
    from rivet_sqlite.catalog import SQLiteCatalogPlugin
    from rivet_sqlite.engine import SQLiteComputeEnginePlugin

    registry.register_catalog_plugin(SQLiteCatalogPlugin())
    registry.register_engine_plugin(SQLiteComputeEnginePlugin())

Once installed (pip install -e .), Rivet discovers the plugin automatically — no manual registration required.

Using the Plugin

Configure the plugin in profiles.yaml:

default:
  engines:
    - name: sqlite_engine
      type: sqlite
      catalogs:
        - my_db
  catalogs:
    - name: my_db
      type: sqlite
      options:
        path: data/warehouse.db

Then use it in your pipeline like any other engine and catalog.