
Rivet¶
Declarative SQL pipelines with multi-engine execution, quality checks, and built-in testing. Define once — run anywhere.
What is Rivet?¶
Rivet separates what to compute from how to compute it and where data lives. Write your pipeline logic once as declarative joints, then run it on DuckDB, Polars, PySpark, or Postgres — without changing a single line.
graph LR
A[raw_orders<br/><small>source</small>] --> B[clean_orders<br/><small>transform</small>]
B --> C[orders_summary<br/><small>sink</small>]
style A fill:#6c63ff,color:#fff,stroke:none
style B fill:#818cf8,color:#fff,stroke:none
style C fill:#3b82f6,color:#fff,stroke:none
Multi-Engine¶
Swap between DuckDB, Polars, PySpark, and Postgres without rewriting pipelines. Adjacent SQL joints on the same engine are automatically fused into a single query.
Declarative Pipelines¶
Define joints in SQL, YAML, or Python. Rivet compiles them into an immutable execution plan — no imperative orchestration code needed.
Quality Checks¶
Assertions validate data before writes. Audits verify data after writes. Catch bad data before it reaches your warehouse.
Built-in Testing¶
Offline fixture-based tests validate joint logic without any database. Fast, deterministic, and CI-friendly.
Plugin System¶
Extend Rivet with engines, catalogs, and adapters. First-party plugins for DuckDB, Polars, PySpark, Postgres, AWS, and Databricks.
Interactive REPL¶
Full-screen terminal UI for exploring data, running ad-hoc queries, browsing catalogs, and debugging pipelines.
The Three Pillars¶
Joints¶
What to compute
Named, declarative units of computation. Sources read data, SQL and Python joints transform it, sinks write it out.
Engines¶
How to compute
Pluggable compute backends. DuckDB for local speed, PySpark for scale, Polars for DataFrames, Postgres for databases.
Catalogs¶
Where data lives
Named references to data locations — filesystems, databases, S3 buckets, Glue, Unity Catalog. Storage-agnostic.
Quick Example¶
-- sources/raw_orders.sql
-- rivet:name: raw_orders
-- rivet:type: source
-- rivet:catalog: local
-- rivet:table: raw_orders.csv
-- joints/clean_orders.sql
-- rivet:name: clean_orders
-- rivet:type: sql
SELECT id, customer_name, amount, created_at
FROM raw_orders
WHERE amount > 0
-- sinks/orders_clean.sql
-- rivet:name: orders_clean
-- rivet:type: sink
-- rivet:catalog: local
-- rivet:table: orders_clean
-- rivet:upstream: [clean_orders]
# sources/raw_orders.yaml
name: raw_orders
type: source
catalog: local
table: raw_orders.csv
# joints/clean_orders.yaml
name: clean_orders
type: sql
upstream: [raw_orders]
sql: |
SELECT id, customer_name, amount, created_at
FROM raw_orders
WHERE amount > 0
# sinks/orders_clean.yaml
name: orders_clean
type: sink
catalog: local
table: orders_clean
upstream: [clean_orders]
# joints/clean_orders.py
# rivet:name: clean_orders
# rivet:type: python
# rivet:upstream: [raw_orders]
import polars as pl
from rivet_core.models import Material
def transform(material: Material) -> pl.DataFrame:
df = material.to_polars()
return df.filter(pl.col("amount") > 0).select(
"id", "customer_name", "amount", "created_at"
)
from rivet_core.models import Joint
raw_orders = Joint(
name="raw_orders",
joint_type="source",
catalog="local",
table="raw_orders.csv",
)
clean_orders = Joint(
name="clean_orders",
joint_type="sql",
upstream=["raw_orders"],
sql="SELECT id, customer_name, amount, created_at FROM raw_orders WHERE amount > 0",
)
orders_clean = Joint(
name="orders_clean",
joint_type="sink",
catalog="local",
table="orders_clean",
upstream=["clean_orders"],
)
$ rivet run
✓ compiled 3 joints in 45ms
raw_orders ✓ OK (5 rows)
clean_orders ✓ OK (4 rows)
orders_clean ✓ OK (4 rows)
45ms | 3 joints | 1 groups | 0 failures