Pipeline Model
A pipeline is a declarative manifest
Section titled “A pipeline is a declarative manifest”Hakiri accepts manifests in TOML (hakiri.toml, pipelines/*.toml) and JSON (pipelines/*.json). Both deserialize into the same PipelineSpec and validate against the same JSON Schema. Humans hand-edit TOML (comments round-trip, no YAML indent footguns); agents emit JSON (LLMs produce flawless JSON; $schema gives free validation and editor autocomplete).
TOML form (hakiri.toml)
Section titled “TOML form (hakiri.toml)”[project]name = "openhackers-warehouse"version = "0.1.0"
[context]path = ".hakiri/context" # local store locationdefault_destination = "context" # all pipelines land here unless overridden
[sync]bucket = "r2://oh-context"endpoint = "https://<accountid>.r2.cloudflarestorage.com"region = "auto"# credentials via env: HAKIRI_SYNC_KEY_ID / HAKIRI_SYNC_KEY_SECRET
[[pipeline]]id = "github-issues"source = { connector = "github", config = { repo = "OpenHackersClub/gctrl" } }tables = ["issues", "pull_requests", "comments"]schedule = "every 15m"incremental = "updated_at" # cursor strategy
[[pipeline]]id = "shopify-orders"source = { connector = "shopify", config = { shop = "openhackers" } }secrets = ["SHOPIFY_TOKEN"]tables = ["orders", "customers", "products"]schedule = "0 */2 * * *" # cronincremental = "updated_at"destination = { connector = "context" } # explicitJSON form (pipelines/<id>.json)
Section titled “JSON form (pipelines/<id>.json)”Identical semantics, same schema, same in-memory representation. The $schema line gives editor LSPs (VS Code, JetBrains, Helix) inline validation and autocomplete with no plugin beyond stock JSON Schema support.
{ "$schema": "https://hakiri.dev/schemas/v1/pipeline.json", "id": "github-issues", "source": { "connector": "github", "config": { "repo": "OpenHackersClub/gctrl" } }, "tables": ["issues", "pull_requests", "comments"], "schedule": "every 15m", "incremental": "updated_at"}The canonical schema lives in Rust
Section titled “The canonical schema lives in Rust”The single source of truth is the Rust type, derived once and consumed everywhere:
use schemars::JsonSchema;use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize, JsonSchema)]pub struct PipelineSpec { pub id: String, pub source: SourceSpec, pub tables: Vec<String>, #[serde(default)] pub destination: Option<DestinationSpec>, #[serde(default)] pub schedule: Option<Schedule>, #[serde(default)] pub incremental: Option<String>, #[serde(default)] pub transforms: Vec<TransformSpec>,}serde_tomldeserializes TOML →PipelineSpecserde_jsondeserializes JSON →PipelineSpecschemarsderives the JSON Schema fromPipelineSpec
hakiri schema export writes the generated schema to .hakiri/schema/pipeline.json; it’s also published at a stable URL (final domain TBD with the project name) for editor integration.
Runtime — merge manifests by id
Section titled “Runtime — merge manifests by id”On startup the runtime scans the project:
hakiri.toml # project-level config + inline [[pipeline]] blockspipelines/*.toml # one pipeline per file, typically human-editedpipelines/*.json # one pipeline per file, typically agent-authoredEach file deserializes into PipelineSpec and is validated against the JSON Schema. Pipelines merge by id. Duplicate ids across files are a hard error with explicit diagnostics:
error: pipeline `github-issues` defined in two places: hakiri.toml:42 pipelines/github-issues.json:1hint: keep one source of truth; run `hakiri pipeline convert github-issues --to json` to migrateEdits via MCP
Section titled “Edits via MCP”pipeline.createdefaults to writing JSON atpipelines/<id>.json— the agent path of least resistance. Override with--format=tomlto append tohakiri.toml.pipeline.editpreserves the existing file’s format. TOML edits go throughtoml_editso comments and field ordering survive.pipeline.convert <id> --to json|tomlproduces the alternate form. Lossless except for TOML comments, which JSON can’t represent.
Why two formats
Section titled “Why two formats”Each population uses the syntax that suits it (humans → TOML, agents → JSON) against one shared canonical schema in Rust. Rationale, alternatives considered (TOML-only, JSON-only, YAML), and trade-offs: see ADR-0003.
Plan, apply, reconcile
Section titled “Plan, apply, reconcile”Hakiri operates on a desired-state model — like Terraform for infrastructure, Kubernetes controllers for cluster state, or dbt for warehouse models. The manifest declares what should exist; the runtime reconciles reality to it. There are no imperative scripts in the user-facing surface.
# Show what would change without doing anythinghakiri plan# → for each pipeline, compute the diff vs. current context state:# - rows the source has that the destination doesn't (estimated)# - schema changes that would be applied# - new pipelines that haven't run yet
# Apply all pipelines (= run each one to convergence)hakiri apply
# Apply one pipelinehakiri apply github-issues
# Continuous reconciliation (long-running)hakiri serve# → scheduler fires each pipeline on its declared schedule# → at any clean idle point, system state matches the manifestProperties:
- Idempotent.
hakiri applytwice with no source changes is a no-op (or only re-runs sources whose schedule has elapsed). - Convergent. Interrupted runs resume from the last checkpoint; the final state is independent of how many crashes happened along the way.
- Inspectable.
hakiri plan --jsonproduces a structured diff agents can reason about before deciding to apply. - Reviewable. Manifests live in git; the diff between two commits is the change to the system. Code review applies to data pipelines the same way it applies to infrastructure.
This is GitOps for data pipelines: commit the manifest, the runtime reconciles. No “first run the migration script, then the backfill, then the cutover”. The manifest is the program; the runtime is the interpreter.
Pipeline lifecycle
Section titled “Pipeline lifecycle”stateDiagram-v2 [*] --> Idle Idle --> Discovering: trigger Discovering --> Reading: schema OK Discovering --> Failed: schema incompatible Reading --> Writing: batch ready Writing --> Reading: more data Writing --> Committing: source exhausted Committing --> Idle: cursor saved, snapshot durable Reading --> Failed: transient (retries exhausted) / permanent Failed --> Idle: operator/agent decision
A trigger is one of:
- scheduled — cron or
every <duration>expression - manual —
hakiri run <pipeline> - event — a webhook into the daemon, an SQS message, a Cloudflare queue message
State (cursors)
Section titled “State (cursors)”Each pipeline owns a Cursor per table. Cursors are opaque to the runtime — the connector defines their semantics. Three common shapes:
- Timestamp cursor:
{"updated_at": "2026-05-11T08:00:00Z"}for any source withupdated_at - Numeric cursor:
{"id": 12834782}for monotonic id sources - Opaque token:
{"next_page_token": "abc.."}for paginated APIs (GitHub, Notion)
Cursors live in ~/.hakiri/<project>/state.sqlite:
CREATE TABLE pipeline_cursor ( pipeline_id TEXT NOT NULL, table_name TEXT NOT NULL, cursor_json TEXT NOT NULL, updated_at TEXT NOT NULL, PRIMARY KEY (pipeline_id, table_name));
CREATE TABLE pipeline_run ( run_id TEXT PRIMARY KEY, pipeline_id TEXT NOT NULL, started_at TEXT NOT NULL, ended_at TEXT, status TEXT NOT NULL, -- pending | running | success | failed | partial row_count INTEGER, bytes INTEGER, error_kind TEXT, error_msg TEXT, trace_id TEXT -- OTel trace);State sync rules:
- Cursors are last-write-wins across replicas — the source-of-truth is whichever replica wrote the most recent successful run. This is safe because cursors represent “I’ve read up to here”; advancing past a peer doesn’t lose data, only causes duplicate fetches that the destination deduplicates.
- Runs are append-only — every replica writes its own runs; merging is a union.
See 04-context-store.md for the wire format.
Schema evolution
Section titled “Schema evolution”Hakiri infers schemas from incoming batches and reconciles against the stored schema:
| Change | Default behavior |
|---|---|
| New column added | Append it; backfill NULL for older rows |
| Column dropped from source | Keep stored column; new rows get NULL |
| Type widening (int32 → int64) | Allow; rewrite is lazy on next read |
| Type narrowing (int64 → int32) | Reject with SchemaIncompatible; emit guidance to either cast in transform or hakiri schema accept --pipeline X |
| Column renamed | Source must declare via cursor metadata; otherwise treated as drop+add |
| Primary key changed | Reject; requires explicit migration |
Schema decisions are recorded in the catalog, so a hakiri schema log <pipeline> shows every evolution event.
Transforms — Polars expressions, authored in Python or TypeScript
Section titled “Transforms — Polars expressions, authored in Python or TypeScript”Hakiri ships a real transformation layer, but it does not embed a Python interpreter or a JavaScript runtime to run user code at pipeline time. Instead, transforms are Polars expression trees — authored in Python or TypeScript using the existing Polars bindings, serialized into the manifest, and executed entirely in Rust by Polars’s query engine.
Rationale and alternatives (PyO3-embedded Python, deno_core JS, DataFusion as the engine, custom DSL): see ADR-0010.
Why this shape
Section titled “Why this shape”- Authoring stays in the language teams already use. Data and analytics engineers write Python; frontend-leaning teams write TypeScript. Both languages have first-class Polars bindings.
- Execution stays in Rust. Polars’s Rust engine evaluates the expression tree against Arrow batches; the Hakiri binary does not embed Python or Node, preserving the footprint budget (Pillar 1, Challenge 5).
- Sandboxed by construction. A Polars expression cannot open a socket, read the filesystem, or call out to a model API. The expression algebra is the sandbox.
- One canonical plan, three authoring surfaces. Rust authors use the Polars Rust API directly; Python authors use
hakiri-py; TypeScript authors usehakiri-ts. All three produce the same serialized expression plan in the manifest.
Built-in algebra (covers the common case)
Section titled “Built-in algebra (covers the common case)”The expression algebra inherits Polars’s full set: select, with_columns, filter, rename, cast, sort, group_by + aggregations, join (between in-flight batches and pre-loaded reference tables), window functions, string and temporal expressions, regex, JSON-path extraction, struct/list operations. The manifest’s transforms = [...] list is a LazyFrame plan — composable, optimizable, and serializable.
Python authoring
Section titled “Python authoring”import hakiri as hkimport polars as pl
@hk.pipeline("github-issues")def github_issues_transforms(df: pl.LazyFrame) -> pl.LazyFrame: return ( df.rename({"user.login": "author"}) .with_columns([ pl.col("created_at").cast(pl.Datetime("ms", "UTC")), pl.col("body").str.len_chars().alias("body_len"), pl.col("labels").list.eval(pl.element().struct.field("name")).alias("label_names"), ]) .filter(pl.col("state") == "open") )
# Compile to the manifest's expression plan:# $ hakiri compile transforms/github_issues.py# → writes pipelines/github-issues.transforms.json (Polars's serialized plan format)TypeScript authoring
Section titled “TypeScript authoring”import { pipeline, pl } from "hakiri";
export default pipeline("github-issues", (df) => df.rename({ "user.login": "author" }) .withColumns([ pl.col("created_at").cast(pl.Datetime("ms", "UTC")), pl.col("body").str.lenChars().alias("body_len"), pl.col("labels").list.eval(pl.element().struct.field("name")).alias("label_names"), ]) .filter(pl.col("state").eq("open")));
// Compile via `hakiri compile transforms/github_issues.ts`Manifest after compilation
Section titled “Manifest after compilation”The compiled artifact is JSON the runtime deserializes into a LazyFrame plan. It is checked into the repo alongside the source .py/.ts for reproducibility and review:
{ "$schema": "https://hakiri.dev/schemas/v1/transforms.json", "pipeline_id": "github-issues", "engine": "polars", "engine_version": "0.43.0", "source_lang": "python", "source_hash": "sha256:…", "plan": { /* Polars's serialized LazyFrame plan */ }}The runtime reads only the plan field; the source_* fields are provenance. PR review is on the diff of either the source file (if checked in) or the plan JSON.
Where transforms run in the pipeline graph
Section titled “Where transforms run in the pipeline graph”flowchart LR Source[Source connector<br/>(WASM Component)] -->|Arrow RecordBatch| Transform[Transforms<br/>(Polars LazyFrame plan)] Transform -->|Arrow RecordBatch| Destination[Destination<br/>(context store / sync)]
The connector emits Arrow record batches; the transform plan rewrites them; the destination consumes them. Polars’s columnar engine works on the same Arrow buffers the connector produced — zero-copy through the pipeline.
When Polars expressions are not enough
Section titled “When Polars expressions are not enough”Polars covers ~95% of the per-batch transform needs we expect. For the remainder — calling a model for entity extraction, hitting a sidecar enrichment service, anything stateful across batches — write a WASM Component that implements a small transform WIT interface, sister to the source/destination interfaces from ADR-0001:
// in wit/connector.witinterface transform { use types.{record-batch, error}; prepare: func(schema-in: schema) -> result<schema, error>; apply: func(batch: record-batch) -> result<record-batch, error>;}Python authors compile to WASM via componentize-py; TypeScript authors via jco. The component runs in the same Wasmtime host as connectors, with the same capability-declared host access — a “call an embedding model” transform declares the network capability and the operator grants it, exactly as for connectors.
This keeps the two-tier story clean: declarative Polars expressions for the common case, sandboxed WASM Components for the irregular case. No third tier (no embedded Python at runtime, no JS interpreter).
Pre-landing vs post-landing transforms
Section titled “Pre-landing vs post-landing transforms”The thesis from ELT — keep transforms close to the destination — still holds for business-logic transforms (joins across pipelines, large aggregations, dimensional models). Those belong in SQL views over the context store, not in pipeline transforms.
| Use a Polars transform when… | Use a SQL view in the context store when… |
|---|---|
| The transform is per-source-batch and stateless | The transform joins across multiple tables |
| Type casting, renaming, struct flattening, list/regex extraction | Aggregations the agent will query repeatedly |
| Filtering before write (drop heartbeat events) | Dimensional modeling, slowly-changing dimensions |
| Per-record redaction tied to source schema | Cross-source customer-360 views |
Both compose: a Polars transform shapes the landing format; a DuckDB view shapes the agent-facing query surface.
Scheduling
Section titled “Scheduling”Two strategies, pick per pipeline:
- In-process scheduler — runs while
hakiri serveis up. Good for self-hosted. - External trigger —
hakiri run <pipeline>from a cron, Cloudflare Workflow, GitHub Action, or AWS EventBridge rule. The binary exits 0/non-zero per run.
The scheduler is intentionally simple: no DAG dependencies, no pipeline_A.success → pipeline_B.start orchestration in v0. If you need a DAG, drive it from outside (Workflows / Step Functions / Airflow). This keeps the runtime’s job narrow.
Backfill orchestration
Section titled “Backfill orchestration”Backfilling — pulling a source’s historical data on first run before settling into incremental sync — is the common case, not the edge case. A first run against GitHub Issues for a 10-year-old repo is hours of paginated reads; against a 50M-row Postgres table it’s a multi-day CDC seed. The runtime treats backfill as a first-class pipeline phase with its own scheduling, parallelism, and resume semantics — not as “just a long first run that might time out.”
Phases
Section titled “Phases”A pipeline has three observable phases. The catalog (meta.sqlite) records the current phase per pipeline.
stateDiagram-v2 [*] --> planning planning --> backfilling: chunks discovered backfilling --> backfilling: chunk completed,<br/>more remain backfilling --> streaming: cursor caught up to "now" streaming --> streaming: incremental tick streaming --> backfilling: gap detected<br/>(operator-driven)
planning—discover()is called; the runtime computes the chunk plan from declared cursor kind, source statistics, and the operator’sbackfill_windowconfig. No data is fetched yet.backfilling— chunks execute, possibly across many scheduler ticks, possibly in parallel where the source allows. Each completed chunk commits an independent Parquet file and advances the catalog’s per-chunk cursor.streaming— the cursor has caught up to the source’s “now” within the declared SLA. Subsequent scheduled ticks pull only the incremental delta.
While a pipeline is backfilling, scheduled cron ticks do not start a fresh run — they either extend the backfill (dispatch more chunks) or no-op if the chunk queue is full. This prevents the “every 15 minutes a new pull starts but the previous one is still backfilling” pile-up that bites naïve schedulers.
Chunk plan — what gets backfilled in what unit
Section titled “Chunk plan — what gets backfilled in what unit”The chunk plan is the unit of resumability and parallelism. The shape depends on the source’s cursor-kind (declared via the WIT export from 02-connectors.md, see also ADR-0005):
cursor-kind | Natural chunk | Parallelism | Example |
|---|---|---|---|
monotonic (timestamp / autoincrement) | Time window or id range | Safe across chunks; each is independent | created_at BETWEEN '2020-01-01' AND '2020-02-01' for one chunk |
opaque-token (vendor page token) | Page-token segment, sequential by design | None — tokens are stateful on the vendor side; must be walked in order | Stripe starting_after, GitHub Link headers |
snapshot-id (LSN, version-id) | LSN range | Sequential per stream; parallel across streams | Postgres logical replication: backfill the table snapshot, then replay WAL from a noted LSN |
partitioned-monotonic (declared by the connector) | Partition × time window | Safe across partitions | GitHub Issues by repo; one chunk per (repo, month) — repos are independent |
Connectors declare their cursor kind and any partition keys via a small WIT export the runtime reads at discover():
// in wit/connector.witrecord backfill-hint { cursor-kind: cursor-kind, // see ADR-0005 partition-keys: list<string>, // empty = single-stream natural-window: option<duration>, // hint for time-windowed chunking rate-budget: option<rate-budget>, // requests/sec the source tolerates}
interface source { backfill-hint: func() -> backfill-hint; // ... rest of the source interface}The runtime builds the chunk plan from the hint plus operator overrides (backfill_window = "30d", backfill_parallelism = 4). The plan is stored in the catalog as pipeline_chunks(pipeline_id, chunk_id, predicate, status, attempts, started_at, completed_at, cursor_committed) so progress survives crashes.
Operator controls
Section titled “Operator controls”[[pipeline]]id = "github-issues-historical"source = { connector = "github", config = { repo = "torvalds/linux" } }tables = ["issues"]incremental = "updated_at"
[pipeline.backfill] window = "30d" # one chunk per 30-day slice parallelism = 4 # how many chunks in flight rate_budget = "60/min" # cap source requests start_from = "2010-01-01" # don't go further back than this max_chunks_per_tick = 8 # how many chunks to dispatch per scheduler tickDefaults come from the connector’s backfill-hint; the operator only sets what they want to override. The runtime warns at hakiri plan time if the configured parallelism violates the connector’s declared safety (opaque-token sources reject parallelism > 1).
Crash resume — node-id + attempt-isolated writes
Section titled “Crash resume — node-id + attempt-isolated writes”Every chunk is its own atomic unit, with writes isolated per node and per attempt so a crash never produces a mixed-author chunk directory:
-
Lease. Worker acquires a row-level lock on
pipeline_chunks(chunk_id, attempt)(catalog backend:SELECT FOR UPDATE SKIP LOCKEDon Postgres; DO actor scope on Cloudflare;BEGIN IMMEDIATE+ retry on SQLite — see ADR-0011 for the per-backend primitive). Lease records the worker’s node-id and the attempt number. -
Pull, write to isolated path. Connector’s
open(predicate, cursor=None)opens the chunk; the runtime iteratesnext()and writes Parquet batches under:tables/<t>/data/runs/<run-id>/<node-id>/chunk-<chunk-id>/attempt-<n>/part-00000.parquetpart-00001.parquet...This path is never shared between workers or attempts. Worker A writing chunk-47 attempt-1 and worker B writing chunk-47 attempt-2 produce disjoint files even if A’s leftovers survive the crash.
-
Commit. Once
next()returnsnone, the runtime writes the chunk’s_manifest.jsonto the attempt directory, then in a single catalog transaction:- Sets
pipeline_chunks(chunk_id).status = 'done'. - Sets
pipeline_chunks(chunk_id).winning_attempt = <node-id>/attempt-<n>. - The catalog row, not directory presence, is the commit signal. Queries against the run read the catalog to find the winning attempt; non-winning attempt directories are GC’d after the retention window.
- Sets
-
Crash. If the worker dies mid-chunk:
- The lease expires after the TTL (default 10 minutes).
- Another worker claims the chunk and increments
attempt(chunk-47 now has attempt-2 lease). - The new worker writes to its own
<node-id>/chunk-47/attempt-2/path. It never touches the leftover files from attempt-1. - Once attempt-2 commits, the catalog records it as winning; attempt-1’s leftovers are GC’d.
- If attempt-1 was somehow already partially committed (rare: the worker crashed after writing the catalog row but before the lease expired — only possible in a small clock-skew window), the existing row wins and attempt-2 doesn’t claim.
The invariant: the catalog row is the commit point; directory presence alone is never the truth. A reader that finds Parquet without a corresponding catalog done row treats it as in-flight or orphaned. The cleanup task GCs orphans after a retention window.
Independent commit means a 312-chunk backfill that crashes at chunk 247 resumes at 248 — the prior 247 chunks are durable Parquet pointed at by durable catalog rows, not in-memory progress.
Parallel execution
Section titled “Parallel execution”Workers (in hakiri serve, in CF Containers, or in Fargate tasks) pull chunks from the catalog’s claim queue:
-- worker claimUPDATE pipeline_chunksSET holder_node = $worker_id, leased_until = now() + interval '10 min', status = 'running'WHERE chunk_id = ( SELECT chunk_id FROM pipeline_chunks WHERE pipeline_id = $pid AND status = 'pending' ORDER BY chunk_id ASC LIMIT 1 FOR UPDATE SKIP LOCKED)RETURNING chunk_id;FOR UPDATE SKIP LOCKED (Postgres) and equivalent atomic CAS (DO, DynamoDB) make the claim queue contention-free under high worker counts. Per-pipeline parallelism caps how many chunks can be running concurrently; the runtime enforces it via a counted semaphore in the catalog.
Workflow-shaped clouds (CF Workflows, AWS Step Functions)
Section titled “Workflow-shaped clouds (CF Workflows, AWS Step Functions)”ADR-0009 and 06-deployment.md describe how the first-class clouds host pipelines. Backfill maps onto their primitives without rewriting the manifest:
| Concern | Cloudflare Workflows | AWS Step Functions |
|---|---|---|
| One workflow instance per pipeline run | yes (WorkflowEntrypoint) | yes (state machine execution) |
| Workflow step → chunk | one step.do() per chunk | one Task state per chunk |
| Step result size cap | 1 MiB → step returns R2 key, not payload | 256 KiB → state returns S3 key |
| Steps per workflow cap | 1024 steps | 25,000 transitions (Standard) |
| Long backfill (> per-workflow cap) | Reconciler dispatches a fresh workflow instance per ≤1000-chunk batch | Step Functions parent-child or new execution per batch |
| Long backfill (> 6h wall) | Workflow chains across instances at the next cron tick | Standard workflow extends; Express needs Fargate |
| Cold-start cost | Container ~2–8s (keep-warm during backfill) | Fargate Task warm during backfill |
The runtime knows when it’s running under a workflow host and decomposes accordingly — same pipeline_chunks table, same chunk semantics, different orchestration primitive on top.
Backpressure and cost control
Section titled “Backpressure and cost control”Three knobs the operator can set, surfaced in hakiri plan output before any data moves:
rate_budget— caps source API calls. Hard-enforced via a token-bucket the runtime owns; connectors see throttlednext()returns, never 429s from the source.destination_writes_per_sec— caps Parquet flush rate. Useful when destination is R2 in a different region and egress matters.max_parallel_chunks— global cap across all backfilling pipelines on this worker. Prevents one 10-pipeline project from saturating the worker pool.
Observability
Section titled “Observability”Each chunk emits an OTel span with:
hakiri.pipeline_id,hakiri.chunk_id,hakiri.run_idhakiri.chunk.predicate(e.g.,created_at BETWEEN ... AND ...)hakiri.chunk.rows,hakiri.chunk.bytes,hakiri.chunk.attemptshakiri.chunk.duration_ms,hakiri.chunk.wait_ms(rate-limit waits)hakiri.backfill.eta— recomputed per chunk from the remaining queue and recent throughput
The MCP server exposes pipeline.status(id):
{ "pipeline_id": "github-issues-historical", "phase": "backfilling", "chunks": { "done": 47, "running": 4, "pending": 261, "total": 312 }, "throughput": { "rows_per_sec": 850, "bytes_per_sec": 1240000 }, "eta_seconds": 3690, "started_at": "2026-05-12T08:14:00Z", "rate_budget": "60/min", "rate_budget_consumed_pct": 87}An agent can poll this between MCP calls to know when to stop waiting and start querying.
Re-backfill (“rewind”)
Section titled “Re-backfill (“rewind”)”Operators occasionally need to re-pull a window — schema changed retroactively, source had bad data, retention reset. The runtime supports it without rewriting code:
hakiri backfill rewind github-issues \ --from 2024-01-01 --to 2024-06-30 \ --reason "vendor-side data correction RT-481"This:
- Marks chunks overlapping the window as
pending, preserving the original chunks for audit. - Writes new Parquet under a fresh
run_id; old files remain for the retention window. - Compaction (see
04-context-store.md§ Compaction) merges new and old on the next pass, with the newer_ingested_atwinning under last-write-wins. The rewind reason is recorded in the catalog and surfaced in OTel lineage edges, so an auditor can trace why a snapshot row differs from its earlier shape.
Backfill vs streaming under one schedule
Section titled “Backfill vs streaming under one schedule”A pipeline’s schedule = "every 15m" is honored differently per phase:
backfilling— the cron tick is a progress probe, not a re-run trigger. It dispatches up tomax_chunks_per_tickpending chunks (if worker capacity allows) and otherwise no-ops. The phase advances when the last chunk completes.streaming— normal incremental pull from the latest cursor.
This means an operator writing schedule = "every 15m" against a fresh 10-year backfill gets correct behavior automatically: chunks fill the worker pool, the backfill completes over hours-to-days, and the 15-minute cadence kicks in the moment the cursor catches up — no manifest rewrite, no “mark as initialized” step.
Idempotency & deduplication
Section titled “Idempotency & deduplication”- Every record gets a content hash on write.
- The destination’s
prepare()accepts aprimary_key(from the source schema); if set,write()is upsert-by-PK. - Without a PK, writes are append-only and dedup is the consumer’s job.
The context destination implements PK semantics by writing each batch to a numbered Parquet file under runs/<run_id>/<table>/ and exposing a DuckDB view that does last-write-wins per PK at query time. Compaction collapses files into snapshots on a schedule.
Open questions
Section titled “Open questions”- Singer compatibility shim. Worth shipping a
hakiri singer <tap>that runs a Singer tap as a subprocess and adapts the JSONL stream? Cheap path to ecosystem coverage. Probably yes in M2. - dbt integration. If the destination is the context store, post-load transforms in dbt-style would be natural. Defer until a user asks.
- Watermarks for late-arriving data. Skipped in v0; the cursor model assumes mostly-monotonic sources.