Skip to content

DuckDB

The rivet-duckdb plugin provides both a compute engine and a catalog for DuckDB. Recommended default for local analytics, fast SQL on files, and prototyping pipelines.

pip install 'rivetsql[duckdb]'

Engine Configuration

default:
  engines:
    - name: local
      type: duckdb
      options:
        threads: 4
        memory_limit: "8GB"
        temp_directory: /tmp/duckdb
        extensions: [httpfs, parquet]
      catalogs: [warehouse, files]

Engine Options

Option Type Default Description
threads int System default Threads for parallel execution
memory_limit str "4GB" Max memory (e.g. "4GB", "512MB")
temp_directory str None Spill-to-disk directory
extensions list[str] [] Extensions to load at startup
concurrency_limit int 1 Max fused groups executing in parallel. Keep at 1 for DuckDB (in-process, single-writer).

Supported Catalog Types

Catalog type Capabilities
duckdb projection, predicate, limit, cast pushdown; join; aggregation
arrow projection, predicate, limit, cast pushdown; join; aggregation
filesystem projection, predicate, limit, cast pushdown; join; aggregation

Cross-Catalog Adapters

Adapter Requires Description
DuckDBLocalAdapter Native SQL read/write for DuckDB engine → DuckDB catalog
S3DuckDBAdapter boto3 Read/write S3 via httpfs
GlueDuckDBAdapter boto3 Read/write Glue-managed tables
UnityDuckDBAdapter requests Read/write Unity Catalog tables

The DuckDBLocalAdapter supports native SQL write for replace, append, and truncate_insert strategies — the fused SQL is embedded directly into the write DDL, eliminating the Arrow round-trip. See Native SQL Write Optimization for details.


Catalog Configuration

default:
  catalogs:
    - name: warehouse
      type: duckdb
      options:
        path: warehouse.duckdb
        read_only: false
        schema: main

Catalog Options

Option Required Type Default Description
path No str ":memory:" Database file path, or ":memory:"
read_only No bool false Open in read-only mode
schema No str None Default schema

Capabilities

Operation Supported
List tables
Get schema
Get metadata
Test connection

Complex Type Support

DuckDB supports complex types through schema introspection:

  • Arrays: array<T> syntax (e.g., array<integer>, array<varchar>)
  • Structs: struct<field:type,...> syntax (e.g., struct<x:float,y:float>)
  • Nested types: Arbitrary nesting supported (e.g., array<struct<...>>)

Complex types are automatically mapped to Arrow types during schema introspection. See Complex Type Support for details.


File Formats

When reading from filesystem catalogs, DuckDB auto-detects the reader:

Extension Reader
.parquet read_parquet
.csv, .tsv read_csv_auto
.json, .ndjson, .jsonl read_json_auto

Usage Examples

Source

-- rivet:name: raw_orders
-- rivet:type: source
-- rivet:catalog: warehouse
-- rivet:table: orders
name: raw_orders
type: source
catalog: warehouse
table: orders
from rivet_core.models import Joint

raw_orders = Joint(
    name="raw_orders",
    joint_type="source",
    catalog="warehouse",
    table="orders",
)

Transform

-- rivet:name: order_totals
-- rivet:type: sql
-- rivet:upstream: raw_orders
SELECT customer_id, SUM(amount) AS total
FROM raw_orders
GROUP BY customer_id
name: order_totals
type: sql
upstream: [raw_orders]
sql: |
  SELECT customer_id, SUM(amount) AS total
  FROM raw_orders
  GROUP BY customer_id
# joints/order_totals.py
# rivet:name: order_totals
# rivet:type: python
# rivet:upstream: [raw_orders]
import pyarrow as pa
import pyarrow.compute as pc
from rivet_core.models import Material

def transform(material: Material) -> Material:
    table = material.to_arrow()
    totals = table.group_by("customer_id").aggregate([("amount", "sum")])
    return totals.rename_columns(["customer_id", "total"])
from rivet_core.models import Joint

order_totals = Joint(
    name="order_totals",
    joint_type="sql",
    upstream=["raw_orders"],
    sql="SELECT customer_id, SUM(amount) AS total FROM raw_orders GROUP BY customer_id",
)

Sink

-- rivet:name: write_totals
-- rivet:type: sink
-- rivet:upstream: order_totals
-- rivet:catalog: warehouse
-- rivet:table: customer_totals
-- rivet:write_strategy: replace
name: write_totals
type: sink
upstream: order_totals
catalog: warehouse
table: customer_totals
write_strategy: replace
from rivet_core.models import Joint

write_totals = Joint(
    name="write_totals",
    joint_type="sink",
    upstream=["order_totals"],
    catalog="warehouse",
    table="customer_totals",
    write_strategy="replace",
)

Known Limitations

  • Single-process only — DuckDB runs in-process and cannot distribute across machines. For scale, consider PySpark or Databricks.
  • Concurrent writes — Single writer at a time. Concurrent pipeline runs writing to the same database file will fail.
  • Extension availability — Cross-catalog adapters require optional packages (boto3, requests). Silently skipped if not installed.
  • Memory pressure — Large queries may exceed memory_limit. Set temp_directory to enable spill-to-disk.