Skip to content

Rivet

Declarative SQL pipelines with multi-engine execution, quality checks, and built-in testing. Define once — run anywhere.

pip install 'rivetsql[all]'

What is Rivet?

Rivet separates what to compute from how to compute it and where data lives. Write your pipeline logic once as declarative joints, then run it on DuckDB, Polars, PySpark, or Postgres — without changing a single line.

graph LR
    A[raw_orders<br/><small>source</small>] --> B[clean_orders<br/><small>transform</small>]
    B --> C[orders_summary<br/><small>sink</small>]

    style A fill:#6c63ff,color:#fff,stroke:none
    style B fill:#818cf8,color:#fff,stroke:none
    style C fill:#3b82f6,color:#fff,stroke:none

Multi-Engine

Swap between DuckDB, Polars, PySpark, and Postgres without rewriting pipelines. Adjacent SQL joints on the same engine are automatically fused into a single query.

Declarative Pipelines

Define joints in SQL, YAML, or Python. Rivet compiles them into an immutable execution plan — no imperative orchestration code needed.

Quality Checks

Assertions validate data before writes. Audits verify data after writes. Catch bad data before it reaches your warehouse.

Built-in Testing

Offline fixture-based tests validate joint logic without any database. Fast, deterministic, and CI-friendly.

Plugin System

Extend Rivet with engines, catalogs, and adapters. First-party plugins for DuckDB, Polars, PySpark, Postgres, AWS, and Databricks.

Interactive REPL

Full-screen terminal UI for exploring data, running ad-hoc queries, browsing catalogs, and debugging pipelines.


The Three Pillars

Joints

What to compute

Named, declarative units of computation. Sources read data, SQL and Python joints transform it, sinks write it out.

Engines

How to compute

Pluggable compute backends. DuckDB for local speed, PySpark for scale, Polars for DataFrames, Postgres for databases.

Catalogs

Where data lives

Named references to data locations — filesystems, databases, S3 buckets, Glue, Unity Catalog. Storage-agnostic.


Quick Example

-- sources/raw_orders.sql
-- rivet:name: raw_orders
-- rivet:type: source
-- rivet:catalog: local
-- rivet:table: raw_orders.csv

-- joints/clean_orders.sql
-- rivet:name: clean_orders
-- rivet:type: sql
SELECT id, customer_name, amount, created_at
FROM raw_orders
WHERE amount > 0

-- sinks/orders_clean.sql
-- rivet:name: orders_clean
-- rivet:type: sink
-- rivet:catalog: local
-- rivet:table: orders_clean
-- rivet:upstream: [clean_orders]
# sources/raw_orders.yaml
name: raw_orders
type: source
catalog: local
table: raw_orders.csv

# joints/clean_orders.yaml
name: clean_orders
type: sql
upstream: [raw_orders]
sql: |
  SELECT id, customer_name, amount, created_at
  FROM raw_orders
  WHERE amount > 0

# sinks/orders_clean.yaml
name: orders_clean
type: sink
catalog: local
table: orders_clean
upstream: [clean_orders]
# joints/clean_orders.py
# rivet:name: clean_orders
# rivet:type: python
# rivet:upstream: [raw_orders]
import polars as pl
from rivet_core.models import Material

def transform(material: Material) -> pl.DataFrame:
    df = material.to_polars()
    return df.filter(pl.col("amount") > 0).select(
        "id", "customer_name", "amount", "created_at"
    )
from rivet_core.models import Joint

raw_orders = Joint(
    name="raw_orders",
    joint_type="source",
    catalog="local",
    table="raw_orders.csv",
)

clean_orders = Joint(
    name="clean_orders",
    joint_type="sql",
    upstream=["raw_orders"],
    sql="SELECT id, customer_name, amount, created_at FROM raw_orders WHERE amount > 0",
)

orders_clean = Joint(
    name="orders_clean",
    joint_type="sink",
    catalog="local",
    table="orders_clean",
    upstream=["clean_orders"],
)
$ rivet run
 compiled 3 joints in 45ms
  raw_orders        OK (5 rows)
  clean_orders      OK (4 rows)
  orders_clean      OK (4 rows)

  45ms | 3 joints | 1 groups | 0 failures

Explore