Two independent changes: per-job DynamoDB task sharding and idempotency replay (SPEC §7.2).
All of a job's task + event rows lived under one partition (Job#<id>). DynamoDB caps a single partition at ~1000 WCU/s, so a 50k+ task submit couldn't drain inside the 60 s ALB idle window → 504s. (The api is on Fargate now, so the old Lambda body/time limits are gone; the ALB wall is what remains.)
flowchart LR T["tasks (submit / add_tasks /
rerun / scheduled-fire)"] T -->|"fnv32a(task_id) %% N"| B0[("Job#id#TS0")] T --> B1[("Job#id#TS1")] T --> Bn[("Job#id#TS…N-1")] T -.->|"N ≤ 1 (legacy)"| J[("Job#id (JobPK)")] B0 & B1 & Bn -->|"parallel BatchWriteItem
(errgroup, bucketed by shard)"| OK["≈ N × 1000 WCU/s"] T ==>|"ShardCount on every TaskRef"| Q[["SQS task queue"]] Q --> W["worker derives partition
from ref.ShardCount
(no extra GetJob)"]
N is pinned on the job row at submit (task_shard_count, default 16 via CONVEYOR_TASK_SHARD_COUNT). Every reader/writer consults the on-row value — never the live config — so a job's layout is immutable for its life.
flowchart TD
R["list_results · collectRunTasks (rerun) · delete scrub"] --> FO{"shardCount?"}
FO -->|"≤ 1"| SP["single-partition Query
plain task_id cursor"]
FO -->|"> 1"| FAN["N parallel Queries
each ≤ targetItems"]
FAN --> MERGE["k-way merge by task_id
→ dense page of targetItems"]
MERGE --> CUR["composite cursor
sh:base64([per-shard last id])"]
CUR -.->|"in-flight plain cursor"| S0["resumes on shard 0 only"]
Each shard is already SK-sorted by task_id, so picking the global min repeatedly reproduces the exact lexicographic order pre-shard callers saw. Cursors that don't carry the sh: magic prefix are treated as legacy plain task_ids.
stateDiagram-v2 [*] --> First: PutIfAbsent succeeds First --> Replay: CommitResponse,
then retry (same fp + snapshot) [*] --> Conflict_fp: existing row,
fingerprint differs / empty (legacy) [*] --> Conflict_inflight: existing row, same fp,
no snapshot (in-flight / crashed) Replay --> [*]: echo stored status + body verbatim Conflict_fp --> [*]: 409 Conflict_inflight --> [*]: 409
First submit stores the request fingerprint on claim and the response snapshot on commit; a matching retry replays the cached status + body. Everything else 409s — same as the old "duplicate key → 409", now scoped to genuine conflicts. Also fixes a latent bug: idemToRow dropped the TTL, so idempotency rows never expired.
task_shard_count, response_status) are undocumented. Repo rule: a storage-layout/field change updates SPEC.md in the same PR, and the spec wins on disagreement.
targetItems from each of N shards, merges down to targetItems, and the next page re-reads the discarded tail. Full enumeration of a T-task job costs ≈ N × T row-reads (16× at the default) — paid on every list_results and on collectRunTasks during rerun. Inherent to scatter-gather; worth a metric and maybe a smaller default N.
response_status 202 comment is forward-looking) · decodeBodyAndHash buffers the whole body for the raw-bytes fingerprint — fine off Lambda, no size cap (neither had one) · a failed best-effort commit turns a would-be replay into a 409 (no worse than before) · the legacy-plain-cursor → shard-0 branch is effectively dead but harmless.
ShardCount on TaskRef avoids a per-task GetJob · solid tests (distribution, cursor round-trip, shard-count drift, idempotency outcomes) · the TTL fix is a real catch.
go build ./... ✓ go vet ./internal/... ✓go test green: idempotency, storage/ddb, storage/memstore, worker/{control,task,dispatch}