Skip to content

Pipeline Model

Hakiri accepts manifests in TOML (hakiri.toml, pipelines/*.toml) and JSON (pipelines/*.json). Both deserialize into the same PipelineSpec and validate against the same JSON Schema. Humans hand-edit TOML (comments round-trip, no YAML indent footguns); agents emit JSON (LLMs produce flawless JSON; $schema gives free validation and editor autocomplete).

hakiri.toml
[project]
name = "openhackers-warehouse"
version = "0.1.0"
[context]
path = ".hakiri/context" # local store location
default_destination = "context" # all pipelines land here unless overridden
[sync]
bucket = "r2://oh-context"
endpoint = "https://<accountid>.r2.cloudflarestorage.com"
region = "auto"
# credentials via env: HAKIRI_SYNC_KEY_ID / HAKIRI_SYNC_KEY_SECRET
[[pipeline]]
id = "github-issues"
source = { connector = "github", config = { repo = "OpenHackersClub/gctrl" } }
tables = ["issues", "pull_requests", "comments"]
schedule = "every 15m"
incremental = "updated_at" # cursor strategy
[[pipeline]]
id = "shopify-orders"
source = { connector = "shopify", config = { shop = "openhackers" } }
secrets = ["SHOPIFY_TOKEN"]
tables = ["orders", "customers", "products"]
schedule = "0 */2 * * *" # cron
incremental = "updated_at"
destination = { connector = "context" } # explicit

Identical semantics, same schema, same in-memory representation. The $schema line gives editor LSPs (VS Code, JetBrains, Helix) inline validation and autocomplete with no plugin beyond stock JSON Schema support.

{
"$schema": "https://hakiri.dev/schemas/v1/pipeline.json",
"id": "github-issues",
"source": { "connector": "github", "config": { "repo": "OpenHackersClub/gctrl" } },
"tables": ["issues", "pull_requests", "comments"],
"schedule": "every 15m",
"incremental": "updated_at"
}

The single source of truth is the Rust type, derived once and consumed everywhere:

use schemars::JsonSchema;
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize, JsonSchema)]
pub struct PipelineSpec {
pub id: String,
pub source: SourceSpec,
pub tables: Vec<String>,
#[serde(default)] pub destination: Option<DestinationSpec>,
#[serde(default)] pub schedule: Option<Schedule>,
#[serde(default)] pub incremental: Option<String>,
#[serde(default)] pub transforms: Vec<TransformSpec>,
}
  • serde_toml deserializes TOML → PipelineSpec
  • serde_json deserializes JSON → PipelineSpec
  • schemars derives the JSON Schema from PipelineSpec

hakiri schema export writes the generated schema to .hakiri/schema/pipeline.json; it’s also published at a stable URL (final domain TBD with the project name) for editor integration.

On startup the runtime scans the project:

hakiri.toml # project-level config + inline [[pipeline]] blocks
pipelines/*.toml # one pipeline per file, typically human-edited
pipelines/*.json # one pipeline per file, typically agent-authored

Each file deserializes into PipelineSpec and is validated against the JSON Schema. Pipelines merge by id. Duplicate ids across files are a hard error with explicit diagnostics:

error: pipeline `github-issues` defined in two places:
hakiri.toml:42
pipelines/github-issues.json:1
hint: keep one source of truth; run `hakiri pipeline convert github-issues --to json` to migrate
  • pipeline.create defaults to writing JSON at pipelines/<id>.json — the agent path of least resistance. Override with --format=toml to append to hakiri.toml.
  • pipeline.edit preserves the existing file’s format. TOML edits go through toml_edit so comments and field ordering survive.
  • pipeline.convert <id> --to json|toml produces the alternate form. Lossless except for TOML comments, which JSON can’t represent.

Each population uses the syntax that suits it (humans → TOML, agents → JSON) against one shared canonical schema in Rust. Rationale, alternatives considered (TOML-only, JSON-only, YAML), and trade-offs: see ADR-0003.

Hakiri operates on a desired-state model — like Terraform for infrastructure, Kubernetes controllers for cluster state, or dbt for warehouse models. The manifest declares what should exist; the runtime reconciles reality to it. There are no imperative scripts in the user-facing surface.

Terminal window
# Show what would change without doing anything
hakiri plan
# → for each pipeline, compute the diff vs. current context state:
# - rows the source has that the destination doesn't (estimated)
# - schema changes that would be applied
# - new pipelines that haven't run yet
# Apply all pipelines (= run each one to convergence)
hakiri apply
# Apply one pipeline
hakiri apply github-issues
# Continuous reconciliation (long-running)
hakiri serve
# → scheduler fires each pipeline on its declared schedule
# → at any clean idle point, system state matches the manifest

Properties:

  • Idempotent. hakiri apply twice with no source changes is a no-op (or only re-runs sources whose schedule has elapsed).
  • Convergent. Interrupted runs resume from the last checkpoint; the final state is independent of how many crashes happened along the way.
  • Inspectable. hakiri plan --json produces a structured diff agents can reason about before deciding to apply.
  • Reviewable. Manifests live in git; the diff between two commits is the change to the system. Code review applies to data pipelines the same way it applies to infrastructure.

This is GitOps for data pipelines: commit the manifest, the runtime reconciles. No “first run the migration script, then the backfill, then the cutover”. The manifest is the program; the runtime is the interpreter.

stateDiagram-v2
  [*] --> Idle
  Idle --> Discovering: trigger
  Discovering --> Reading: schema OK
  Discovering --> Failed: schema incompatible
  Reading --> Writing: batch ready
  Writing --> Reading: more data
  Writing --> Committing: source exhausted
  Committing --> Idle: cursor saved, snapshot durable
  Reading --> Failed: transient (retries exhausted) / permanent
  Failed --> Idle: operator/agent decision

A trigger is one of:

  • scheduled — cron or every <duration> expression
  • manualhakiri run <pipeline>
  • event — a webhook into the daemon, an SQS message, a Cloudflare queue message

Each pipeline owns a Cursor per table. Cursors are opaque to the runtime — the connector defines their semantics. Three common shapes:

  • Timestamp cursor: {"updated_at": "2026-05-11T08:00:00Z"} for any source with updated_at
  • Numeric cursor: {"id": 12834782} for monotonic id sources
  • Opaque token: {"next_page_token": "abc.."} for paginated APIs (GitHub, Notion)

Cursors live in ~/.hakiri/<project>/state.sqlite:

CREATE TABLE pipeline_cursor (
pipeline_id TEXT NOT NULL,
table_name TEXT NOT NULL,
cursor_json TEXT NOT NULL,
updated_at TEXT NOT NULL,
PRIMARY KEY (pipeline_id, table_name)
);
CREATE TABLE pipeline_run (
run_id TEXT PRIMARY KEY,
pipeline_id TEXT NOT NULL,
started_at TEXT NOT NULL,
ended_at TEXT,
status TEXT NOT NULL, -- pending | running | success | failed | partial
row_count INTEGER,
bytes INTEGER,
error_kind TEXT,
error_msg TEXT,
trace_id TEXT -- OTel trace
);

State sync rules:

  • Cursors are last-write-wins across replicas — the source-of-truth is whichever replica wrote the most recent successful run. This is safe because cursors represent “I’ve read up to here”; advancing past a peer doesn’t lose data, only causes duplicate fetches that the destination deduplicates.
  • Runs are append-only — every replica writes its own runs; merging is a union.

See 04-context-store.md for the wire format.

Hakiri infers schemas from incoming batches and reconciles against the stored schema:

ChangeDefault behavior
New column addedAppend it; backfill NULL for older rows
Column dropped from sourceKeep stored column; new rows get NULL
Type widening (int32 → int64)Allow; rewrite is lazy on next read
Type narrowing (int64 → int32)Reject with SchemaIncompatible; emit guidance to either cast in transform or hakiri schema accept --pipeline X
Column renamedSource must declare via cursor metadata; otherwise treated as drop+add
Primary key changedReject; requires explicit migration

Schema decisions are recorded in the catalog, so a hakiri schema log <pipeline> shows every evolution event.

Transforms — Polars expressions, authored in Python or TypeScript

Section titled “Transforms — Polars expressions, authored in Python or TypeScript”

Hakiri ships a real transformation layer, but it does not embed a Python interpreter or a JavaScript runtime to run user code at pipeline time. Instead, transforms are Polars expression trees — authored in Python or TypeScript using the existing Polars bindings, serialized into the manifest, and executed entirely in Rust by Polars’s query engine.

Rationale and alternatives (PyO3-embedded Python, deno_core JS, DataFusion as the engine, custom DSL): see ADR-0010.

  • Authoring stays in the language teams already use. Data and analytics engineers write Python; frontend-leaning teams write TypeScript. Both languages have first-class Polars bindings.
  • Execution stays in Rust. Polars’s Rust engine evaluates the expression tree against Arrow batches; the Hakiri binary does not embed Python or Node, preserving the footprint budget (Pillar 1, Challenge 5).
  • Sandboxed by construction. A Polars expression cannot open a socket, read the filesystem, or call out to a model API. The expression algebra is the sandbox.
  • One canonical plan, three authoring surfaces. Rust authors use the Polars Rust API directly; Python authors use hakiri-py; TypeScript authors use hakiri-ts. All three produce the same serialized expression plan in the manifest.

The expression algebra inherits Polars’s full set: select, with_columns, filter, rename, cast, sort, group_by + aggregations, join (between in-flight batches and pre-loaded reference tables), window functions, string and temporal expressions, regex, JSON-path extraction, struct/list operations. The manifest’s transforms = [...] list is a LazyFrame plan — composable, optimizable, and serializable.

transforms/github_issues.py
import hakiri as hk
import polars as pl
@hk.pipeline("github-issues")
def github_issues_transforms(df: pl.LazyFrame) -> pl.LazyFrame:
return (
df.rename({"user.login": "author"})
.with_columns([
pl.col("created_at").cast(pl.Datetime("ms", "UTC")),
pl.col("body").str.len_chars().alias("body_len"),
pl.col("labels").list.eval(pl.element().struct.field("name")).alias("label_names"),
])
.filter(pl.col("state") == "open")
)
# Compile to the manifest's expression plan:
# $ hakiri compile transforms/github_issues.py
# → writes pipelines/github-issues.transforms.json (Polars's serialized plan format)
transforms/github_issues.ts
import { pipeline, pl } from "hakiri";
export default pipeline("github-issues", (df) =>
df.rename({ "user.login": "author" })
.withColumns([
pl.col("created_at").cast(pl.Datetime("ms", "UTC")),
pl.col("body").str.lenChars().alias("body_len"),
pl.col("labels").list.eval(pl.element().struct.field("name")).alias("label_names"),
])
.filter(pl.col("state").eq("open"))
);
// Compile via `hakiri compile transforms/github_issues.ts`

The compiled artifact is JSON the runtime deserializes into a LazyFrame plan. It is checked into the repo alongside the source .py/.ts for reproducibility and review:

pipelines/github-issues.transforms.json
{
"$schema": "https://hakiri.dev/schemas/v1/transforms.json",
"pipeline_id": "github-issues",
"engine": "polars",
"engine_version": "0.43.0",
"source_lang": "python",
"source_hash": "sha256:…",
"plan": { /* Polars's serialized LazyFrame plan */ }
}

The runtime reads only the plan field; the source_* fields are provenance. PR review is on the diff of either the source file (if checked in) or the plan JSON.

Where transforms run in the pipeline graph

Section titled “Where transforms run in the pipeline graph”
flowchart LR
  Source[Source connector<br/>(WASM Component)] -->|Arrow RecordBatch| Transform[Transforms<br/>(Polars LazyFrame plan)]
  Transform -->|Arrow RecordBatch| Destination[Destination<br/>(context store / sync)]

The connector emits Arrow record batches; the transform plan rewrites them; the destination consumes them. Polars’s columnar engine works on the same Arrow buffers the connector produced — zero-copy through the pipeline.

Polars covers ~95% of the per-batch transform needs we expect. For the remainder — calling a model for entity extraction, hitting a sidecar enrichment service, anything stateful across batches — write a WASM Component that implements a small transform WIT interface, sister to the source/destination interfaces from ADR-0001:

// in wit/connector.wit
interface transform {
use types.{record-batch, error};
prepare: func(schema-in: schema) -> result<schema, error>;
apply: func(batch: record-batch) -> result<record-batch, error>;
}

Python authors compile to WASM via componentize-py; TypeScript authors via jco. The component runs in the same Wasmtime host as connectors, with the same capability-declared host access — a “call an embedding model” transform declares the network capability and the operator grants it, exactly as for connectors.

This keeps the two-tier story clean: declarative Polars expressions for the common case, sandboxed WASM Components for the irregular case. No third tier (no embedded Python at runtime, no JS interpreter).

The thesis from ELT — keep transforms close to the destination — still holds for business-logic transforms (joins across pipelines, large aggregations, dimensional models). Those belong in SQL views over the context store, not in pipeline transforms.

Use a Polars transform when…Use a SQL view in the context store when…
The transform is per-source-batch and statelessThe transform joins across multiple tables
Type casting, renaming, struct flattening, list/regex extractionAggregations the agent will query repeatedly
Filtering before write (drop heartbeat events)Dimensional modeling, slowly-changing dimensions
Per-record redaction tied to source schemaCross-source customer-360 views

Both compose: a Polars transform shapes the landing format; a DuckDB view shapes the agent-facing query surface.

Two strategies, pick per pipeline:

  • In-process scheduler — runs while hakiri serve is up. Good for self-hosted.
  • External triggerhakiri run <pipeline> from a cron, Cloudflare Workflow, GitHub Action, or AWS EventBridge rule. The binary exits 0/non-zero per run.

The scheduler is intentionally simple: no DAG dependencies, no pipeline_A.success → pipeline_B.start orchestration in v0. If you need a DAG, drive it from outside (Workflows / Step Functions / Airflow). This keeps the runtime’s job narrow.

Backfilling — pulling a source’s historical data on first run before settling into incremental sync — is the common case, not the edge case. A first run against GitHub Issues for a 10-year-old repo is hours of paginated reads; against a 50M-row Postgres table it’s a multi-day CDC seed. The runtime treats backfill as a first-class pipeline phase with its own scheduling, parallelism, and resume semantics — not as “just a long first run that might time out.”

A pipeline has three observable phases. The catalog (meta.sqlite) records the current phase per pipeline.

stateDiagram-v2
  [*] --> planning
  planning --> backfilling: chunks discovered
  backfilling --> backfilling: chunk completed,<br/>more remain
  backfilling --> streaming: cursor caught up to "now"
  streaming --> streaming: incremental tick
  streaming --> backfilling: gap detected<br/>(operator-driven)
  • planningdiscover() is called; the runtime computes the chunk plan from declared cursor kind, source statistics, and the operator’s backfill_window config. No data is fetched yet.
  • backfilling — chunks execute, possibly across many scheduler ticks, possibly in parallel where the source allows. Each completed chunk commits an independent Parquet file and advances the catalog’s per-chunk cursor.
  • streaming — the cursor has caught up to the source’s “now” within the declared SLA. Subsequent scheduled ticks pull only the incremental delta.

While a pipeline is backfilling, scheduled cron ticks do not start a fresh run — they either extend the backfill (dispatch more chunks) or no-op if the chunk queue is full. This prevents the “every 15 minutes a new pull starts but the previous one is still backfilling” pile-up that bites naïve schedulers.

Chunk plan — what gets backfilled in what unit

Section titled “Chunk plan — what gets backfilled in what unit”

The chunk plan is the unit of resumability and parallelism. The shape depends on the source’s cursor-kind (declared via the WIT export from 02-connectors.md, see also ADR-0005):

cursor-kindNatural chunkParallelismExample
monotonic (timestamp / autoincrement)Time window or id rangeSafe across chunks; each is independentcreated_at BETWEEN '2020-01-01' AND '2020-02-01' for one chunk
opaque-token (vendor page token)Page-token segment, sequential by designNone — tokens are stateful on the vendor side; must be walked in orderStripe starting_after, GitHub Link headers
snapshot-id (LSN, version-id)LSN rangeSequential per stream; parallel across streamsPostgres logical replication: backfill the table snapshot, then replay WAL from a noted LSN
partitioned-monotonic (declared by the connector)Partition × time windowSafe across partitionsGitHub Issues by repo; one chunk per (repo, month) — repos are independent

Connectors declare their cursor kind and any partition keys via a small WIT export the runtime reads at discover():

// in wit/connector.wit
record backfill-hint {
cursor-kind: cursor-kind, // see ADR-0005
partition-keys: list<string>, // empty = single-stream
natural-window: option<duration>, // hint for time-windowed chunking
rate-budget: option<rate-budget>, // requests/sec the source tolerates
}
interface source {
backfill-hint: func() -> backfill-hint;
// ... rest of the source interface
}

The runtime builds the chunk plan from the hint plus operator overrides (backfill_window = "30d", backfill_parallelism = 4). The plan is stored in the catalog as pipeline_chunks(pipeline_id, chunk_id, predicate, status, attempts, started_at, completed_at, cursor_committed) so progress survives crashes.

[[pipeline]]
id = "github-issues-historical"
source = { connector = "github", config = { repo = "torvalds/linux" } }
tables = ["issues"]
incremental = "updated_at"
[pipeline.backfill]
window = "30d" # one chunk per 30-day slice
parallelism = 4 # how many chunks in flight
rate_budget = "60/min" # cap source requests
start_from = "2010-01-01" # don't go further back than this
max_chunks_per_tick = 8 # how many chunks to dispatch per scheduler tick

Defaults come from the connector’s backfill-hint; the operator only sets what they want to override. The runtime warns at hakiri plan time if the configured parallelism violates the connector’s declared safety (opaque-token sources reject parallelism > 1).

Crash resume — node-id + attempt-isolated writes

Section titled “Crash resume — node-id + attempt-isolated writes”

Every chunk is its own atomic unit, with writes isolated per node and per attempt so a crash never produces a mixed-author chunk directory:

  1. Lease. Worker acquires a row-level lock on pipeline_chunks(chunk_id, attempt) (catalog backend: SELECT FOR UPDATE SKIP LOCKED on Postgres; DO actor scope on Cloudflare; BEGIN IMMEDIATE + retry on SQLite — see ADR-0011 for the per-backend primitive). Lease records the worker’s node-id and the attempt number.

  2. Pull, write to isolated path. Connector’s open(predicate, cursor=None) opens the chunk; the runtime iterates next() and writes Parquet batches under:

    tables/<t>/data/runs/<run-id>/<node-id>/chunk-<chunk-id>/attempt-<n>/
    part-00000.parquet
    part-00001.parquet
    ...

    This path is never shared between workers or attempts. Worker A writing chunk-47 attempt-1 and worker B writing chunk-47 attempt-2 produce disjoint files even if A’s leftovers survive the crash.

  3. Commit. Once next() returns none, the runtime writes the chunk’s _manifest.json to the attempt directory, then in a single catalog transaction:

    • Sets pipeline_chunks(chunk_id).status = 'done'.
    • Sets pipeline_chunks(chunk_id).winning_attempt = <node-id>/attempt-<n>.
    • The catalog row, not directory presence, is the commit signal. Queries against the run read the catalog to find the winning attempt; non-winning attempt directories are GC’d after the retention window.
  4. Crash. If the worker dies mid-chunk:

    • The lease expires after the TTL (default 10 minutes).
    • Another worker claims the chunk and increments attempt (chunk-47 now has attempt-2 lease).
    • The new worker writes to its own <node-id>/chunk-47/attempt-2/ path. It never touches the leftover files from attempt-1.
    • Once attempt-2 commits, the catalog records it as winning; attempt-1’s leftovers are GC’d.
    • If attempt-1 was somehow already partially committed (rare: the worker crashed after writing the catalog row but before the lease expired — only possible in a small clock-skew window), the existing row wins and attempt-2 doesn’t claim.

The invariant: the catalog row is the commit point; directory presence alone is never the truth. A reader that finds Parquet without a corresponding catalog done row treats it as in-flight or orphaned. The cleanup task GCs orphans after a retention window.

Independent commit means a 312-chunk backfill that crashes at chunk 247 resumes at 248 — the prior 247 chunks are durable Parquet pointed at by durable catalog rows, not in-memory progress.

Workers (in hakiri serve, in CF Containers, or in Fargate tasks) pull chunks from the catalog’s claim queue:

-- worker claim
UPDATE pipeline_chunks
SET holder_node = $worker_id, leased_until = now() + interval '10 min', status = 'running'
WHERE chunk_id = (
SELECT chunk_id FROM pipeline_chunks
WHERE pipeline_id = $pid AND status = 'pending'
ORDER BY chunk_id ASC
LIMIT 1 FOR UPDATE SKIP LOCKED
)
RETURNING chunk_id;

FOR UPDATE SKIP LOCKED (Postgres) and equivalent atomic CAS (DO, DynamoDB) make the claim queue contention-free under high worker counts. Per-pipeline parallelism caps how many chunks can be running concurrently; the runtime enforces it via a counted semaphore in the catalog.

Workflow-shaped clouds (CF Workflows, AWS Step Functions)

Section titled “Workflow-shaped clouds (CF Workflows, AWS Step Functions)”

ADR-0009 and 06-deployment.md describe how the first-class clouds host pipelines. Backfill maps onto their primitives without rewriting the manifest:

ConcernCloudflare WorkflowsAWS Step Functions
One workflow instance per pipeline runyes (WorkflowEntrypoint)yes (state machine execution)
Workflow step → chunkone step.do() per chunkone Task state per chunk
Step result size cap1 MiB → step returns R2 key, not payload256 KiB → state returns S3 key
Steps per workflow cap1024 steps25,000 transitions (Standard)
Long backfill (> per-workflow cap)Reconciler dispatches a fresh workflow instance per ≤1000-chunk batchStep Functions parent-child or new execution per batch
Long backfill (> 6h wall)Workflow chains across instances at the next cron tickStandard workflow extends; Express needs Fargate
Cold-start costContainer ~2–8s (keep-warm during backfill)Fargate Task warm during backfill

The runtime knows when it’s running under a workflow host and decomposes accordingly — same pipeline_chunks table, same chunk semantics, different orchestration primitive on top.

Three knobs the operator can set, surfaced in hakiri plan output before any data moves:

  1. rate_budget — caps source API calls. Hard-enforced via a token-bucket the runtime owns; connectors see throttled next() returns, never 429s from the source.
  2. destination_writes_per_sec — caps Parquet flush rate. Useful when destination is R2 in a different region and egress matters.
  3. max_parallel_chunks — global cap across all backfilling pipelines on this worker. Prevents one 10-pipeline project from saturating the worker pool.

Each chunk emits an OTel span with:

  • hakiri.pipeline_id, hakiri.chunk_id, hakiri.run_id
  • hakiri.chunk.predicate (e.g., created_at BETWEEN ... AND ...)
  • hakiri.chunk.rows, hakiri.chunk.bytes, hakiri.chunk.attempts
  • hakiri.chunk.duration_ms, hakiri.chunk.wait_ms (rate-limit waits)
  • hakiri.backfill.eta — recomputed per chunk from the remaining queue and recent throughput

The MCP server exposes pipeline.status(id):

{
"pipeline_id": "github-issues-historical",
"phase": "backfilling",
"chunks": { "done": 47, "running": 4, "pending": 261, "total": 312 },
"throughput": { "rows_per_sec": 850, "bytes_per_sec": 1240000 },
"eta_seconds": 3690,
"started_at": "2026-05-12T08:14:00Z",
"rate_budget": "60/min",
"rate_budget_consumed_pct": 87
}

An agent can poll this between MCP calls to know when to stop waiting and start querying.

Operators occasionally need to re-pull a window — schema changed retroactively, source had bad data, retention reset. The runtime supports it without rewriting code:

Terminal window
hakiri backfill rewind github-issues \
--from 2024-01-01 --to 2024-06-30 \
--reason "vendor-side data correction RT-481"

This:

  1. Marks chunks overlapping the window as pending, preserving the original chunks for audit.
  2. Writes new Parquet under a fresh run_id; old files remain for the retention window.
  3. Compaction (see 04-context-store.md § Compaction) merges new and old on the next pass, with the newer _ingested_at winning under last-write-wins. The rewind reason is recorded in the catalog and surfaced in OTel lineage edges, so an auditor can trace why a snapshot row differs from its earlier shape.

A pipeline’s schedule = "every 15m" is honored differently per phase:

  • backfilling — the cron tick is a progress probe, not a re-run trigger. It dispatches up to max_chunks_per_tick pending chunks (if worker capacity allows) and otherwise no-ops. The phase advances when the last chunk completes.
  • streaming — normal incremental pull from the latest cursor.

This means an operator writing schedule = "every 15m" against a fresh 10-year backfill gets correct behavior automatically: chunks fill the worker pool, the backfill completes over hours-to-days, and the 15-minute cadence kicks in the moment the cursor catches up — no manifest rewrite, no “mark as initialized” step.

  • Every record gets a content hash on write.
  • The destination’s prepare() accepts a primary_key (from the source schema); if set, write() is upsert-by-PK.
  • Without a PK, writes are append-only and dedup is the consumer’s job.

The context destination implements PK semantics by writing each batch to a numbered Parquet file under runs/<run_id>/<table>/ and exposing a DuckDB view that does last-write-wins per PK at query time. Compaction collapses files into snapshots on a schedule.

  • Singer compatibility shim. Worth shipping a hakiri singer <tap> that runs a Singer tap as a subprocess and adapts the JSONL stream? Cheap path to ecosystem coverage. Probably yes in M2.
  • dbt integration. If the destination is the context store, post-load transforms in dbt-style would be natural. Defer until a user asks.
  • Watermarks for late-arriving data. Skipped in v0; the cursor model assumes mostly-monotonic sources.