primkit

Architecture

Primkit is a monorepo containing four primitives (taskprim, stateprim, knowledgeprim, and queueprim) and a shared infrastructure library (primkit). All primitives follow identical layered architecture — the only differences are the domain model, store operations, and (for knowledgeprim) the embedding layer.

Repository Structure

primkit/
├── go.work                  # Go workspace (5 modules)
├── Makefile                 # build, test, lint, fmt, build-pi
├── config.example.yaml      # shared config format
├── primkit/                 # shared library (module: github.com/propifly/primkit/primkit)
│   ├── config/              # YAML config loader + env var interpolation
│   ├── db/                  # SQLite open + migration runner
│   ├── server/              # HTTP server, middleware, JSON helpers
│   ├── auth/                # Bearer token authentication
│   ├── mcp/                 # MCP server scaffold
│   └── replicate/           # Litestream WAL replication wrapper
├── taskprim/                # task management primitive (module: github.com/propifly/primkit/taskprim)
│   ├── cmd/taskprim/        # main.go entry point
│   └── internal/
│       ├── model/           # Task, Filter, state machine
│       ├── store/           # Store interface + SQLite implementation
│       ├── cli/             # Cobra commands (add, list, done, kill, ...)
│       ├── api/             # HTTP API handler
│       └── mcpserver/       # MCP tool registration
├── stateprim/               # state persistence primitive (module: github.com/propifly/primkit/stateprim)
│   ├── cmd/stateprim/       # main.go entry point
│   └── internal/
│       ├── model/           # Record, QueryFilter
│       ├── store/           # Store interface + SQLite implementation
│       ├── cli/             # Cobra commands (set, get, append, query, ...)
│       ├── api/             # HTTP API handler
│       └── mcpserver/       # MCP tool registration
├── knowledgeprim/           # knowledge graph primitive (module: github.com/propifly/primkit/knowledgeprim)
│   ├── cmd/knowledgeprim/   # main.go entry point
│   └── internal/
│       ├── model/           # Entity, Edge, SearchFilter, TraversalOpts, DiscoverOpts
│       ├── store/           # Store interface + SQLite implementation (FTS5, vectors)
│       ├── embed/           # Embedding provider abstraction (Gemini, OpenAI, custom)
│       ├── cli/             # Cobra commands (capture, search, connect, discover, ...)
│       ├── api/             # HTTP API handler
│       └── mcpserver/       # MCP tool registration
└── queueprim/               # work queue primitive (module: github.com/propifly/primkit/queueprim)
    ├── cmd/queueprim/       # main.go entry point
    └── internal/
        ├── model/           # Job, Filter, Priority, Status, QueueInfo, Stats
        ├── store/           # Store interface + SQLite implementation
        ├── cli/             # Cobra commands (enqueue, dequeue, complete, fail, ...)
        ├── api/             # HTTP API handler
        └── mcpserver/       # MCP tool registration

Layered Design

Dependencies flow strictly downward. No lateral dependencies between sibling layers.

┌──────────────────────────────────────────────────────────┐
│                    Access Interfaces                      │
│  ┌───────┐    ┌──────────┐    ┌───────────────┐          │
│  │  CLI  │    │ HTTP API │    │  MCP Server   │          │
│  │(cobra)│    │ (net/http)│   │  (mcp-go)     │          │
│  └───┬───┘    └────┬─────┘    └──────┬────────┘          │
│      │             │                 │                    │
│      └─────────────┼─────────────────┘                    │
│                    │                                      │
│    ┌───────────────┼───────────────────┐                  │
│    │               │                   │                  │
│    │         ┌─────▼─────┐     ┌───────▼────────┐        │
│    │         │   Store    │     │    Embedder    │        │
│    │         │ (interface)│     │  (interface)   │        │
│    │         └─────┬─────┘     │ knowledgeprim  │        │
│    │               │           │     only       │        │
│    │         ┌─────▼─────┐     └────────────────┘        │
│    │         │   Model   │ ◄── structs,                  │
│    │         │           │     validation,               │
│    │         └─────┬─────┘     state machine             │
│    │               │                                      │
│    └───────┬───────┼────────────────┐                     │
│            │       │                │                     │
│  ┌─────────▼┐ ┌────▼──────┐  ┌─────▼──────┐             │
│  │  config  │ │     db     │  │  replicate │             │
│  │  (YAML)  │ │  (SQLite)  │  │(Litestream)│             │
│  └──────────┘ └────────────┘  └────────────┘             │
│                                                          │
│                 primkit (shared library)                  │
└──────────────────────────────────────────────────────────┘

Note: The Embedder interface is unique to knowledgeprim. The background sweeper goroutine (for expired claim release) is unique to queueprim. taskprim, stateprim, and queueprim do not have an embedding layer.

Key Constraint

CLI, API, and MCP are sibling consumers of the Store interface. They never depend on each other. This means:

  • You can use CLI without the HTTP server
  • You can use the API without MCP
  • Any new access interface just imports the Store

Store Interface

The Store is the central abstraction. Each primitive defines its own interface in internal/store/store.go.

taskprim Store (21 operations)

OperationDescription
CreateTaskPersist a new task (store assigns ID, state, timestamps)
GetTaskRetrieve a single task by ID
ListTasksFilter and list tasks (by list, state, labels, source, etc.)
UpdateTaskPartial update to mutable fields
DoneTaskMark task as done (sets resolved_at)
KillTaskMark task as killed with reason
MarkSeenRecord that an agent has seen a task
MarkAllSeenMark all open tasks in a list as seen by an agent
AddDepAdd a dependency edge (with cycle detection via recursive CTE)
RemoveDepRemove a dependency edge
DepsList tasks that a given task depends on
DependentsList tasks that depend on a given task (reverse lookup)
FrontierOpen tasks with all dependencies resolved or no dependencies
DepEdgesRaw dependency edges, optionally filtered by list
ListLabelsAll labels with count of open tasks per label
ClearLabelRemove a label from all tasks
ListListsAll lists with task counts by state
StatsAggregate counts (open, done, killed)
ExportTasksFull export for data portability
ImportTasksBulk import preserving IDs
CloseRelease database connection

stateprim Store (13 operations)

OperationDescription
SetCreate or update a record (upsert)
GetRetrieve by namespace + key
HasCheck existence
SetIfNewCreate only if key doesn't exist
AppendCreate immutable record with auto-generated key
DeleteRemove by namespace + key
QueryRecords matching filter (namespace, key prefix, time window)
PurgeDelete records older than a duration
ListNamespacesAll namespaces with record counts
StatsAggregate counts
ExportRecordsFull export, optionally filtered
ImportRecordsBulk import preserving keys
CloseRelease database connection

knowledgeprim Store (23 operations)

OperationDescription
CaptureEntityPersist a new entity with optional embedding vector
GetEntityRetrieve a single entity by ID (includes edges)
UpdateEntityPartial update to mutable fields
DeleteEntityRemove entity and all connected edges
CreateEdgeCreate a weighted, contextualized connection
UpdateEdgeUpdate edge context or weight
StrengthenEdgeIncrement an edge's weight by 1.0
DeleteEdgeRemove a connection between entities
SearchFTSFull-text search via FTS5 (BM25 ranking)
SearchVectorSemantic search via cosine distance on embeddings
SearchHybridCombined FTS + vector via Reciprocal Rank Fusion (k=60)
RelatedMulti-hop graph traversal with direction and weight filters
DiscoverPattern detection: orphans, clusters, bridges, temporal groups, weak edges
ListTypesAll entity types with counts
ListRelationshipsAll relationship types with counts
StatsAggregate counts (entities, edges, vectors, orphans, DB size)
ExportEntitiesFull export with optional type filter
ImportEntitiesBulk import preserving IDs
GetEmbeddingMetaFetch the stored embedding provider/model metadata for this database
SetEmbeddingMetaWrite or overwrite the embedding metadata record
StripVectorsDelete all embedding vectors and metadata (reverts to FTS5-only)
UpdateEntityVectorUpsert a single entity's embedding vector (used by re-embed)
CloseRelease database connection

queueprim Store (16 operations)

OperationDescription
EnqueueJobPersist a new job (store assigns ID, status, timestamps)
DequeueJobAtomically claim the next available job in a queue (status=pending AND visible_after ≤ now)
CompleteJobMark a claimed job as done; optionally store output payload
FailJobMark a claimed job as failed; retries if retries remain, otherwise moves to dead
ReleaseJobReturn a claimed job to pending immediately (unclaim)
ExtendJobExtend a claimed job's visibility timeout to prevent auto-release
PeekJobInspect the next available job without claiming it
GetJobRetrieve a single job by ID
ListJobsFilter and list jobs (by queue, status, type, age)
ListQueuesAll named queues with job counts by status
StatsAggregate counts across all queues
PurgeJobsDelete jobs matching queue + status + age criteria; returns count
ExportJobsFull export of all jobs in a queue
ImportJobsBulk import preserving original IDs
SweepExpiredClaimsRelease claimed jobs whose visibility_after has passed; called by background sweeper
CloseRelease database connection

Domain Models

taskprim: Task

Task {
    ID             string      // t_<nanoid>, assigned by store
    List           string      // required: which list
    What           string      // required: task description
    Source         string      // required: who created it
    State          State       // open → done | killed
    WaitingOn      *string     // optional: blocking dependency
    ParentID       *string     // optional: subtask relationship
    Context        *string     // optional: background info
    Labels         []string    // freeform tags
    Created        time.Time   // assigned by store
    Updated        time.Time   // assigned by store
    ResolvedAt     *time.Time  // set on done/kill
    ResolvedReason *string     // why it was killed
}

State machine:

          done()
  open ──────────► done

    │  kill(reason)
    └──────────► killed

Tasks start as open. Transitions to done or killed are one-way. There is no restore/reopen.

Dependency graph:

DepEdge {
    TaskID    string  // the task that is blocked
    DependsOn string  // the task it depends on
}

Stored in task_deps table with composite primary key (task_id, depends_on) and a self-reference check (task_id != depends_on). Cycle detection is enforced via recursive CTE on AddDep. waiting_on (freeform text for external/human blockers) and task_deps (structural task-to-task edges) coexist — they serve different purposes.

stateprim: Record

Record {
    Namespace  string           // required: scope
    Key        string           // required: identifier
    Value      json.RawMessage  // required: JSON payload
    Immutable  bool             // true for append records
    CreatedAt  time.Time        // assigned by store
    UpdatedAt  time.Time        // assigned by store
}

Three access patterns share the same model:

  1. Key-value state (set/get): current state, updatable. Immutable=false.
  2. Dedup lookups (has/set-if-new): existence checks, create-once semantics.
  3. Append log (append): immutable, timestamped entries. Immutable=true, auto-generated key.

knowledgeprim: Entity + Edge

Entity {
    ID             string           // e_<nanoid>, assigned by store
    Type           string           // required: entity type (article, concept, pattern, etc.)
    Title          string           // required: entity title
    Body           *string          // optional: entity body text
    URL            *string          // optional: source URL
    Source         string           // required: who captured it
    Properties     json.RawMessage  // optional: custom JSON
    CreatedAt      time.Time        // assigned by store
    UpdatedAt      time.Time        // assigned by store
    Edges          []*Edge          // populated on GetEntity
}

Edge {
    SourceID       string           // required: source entity ID
    TargetID       string           // required: target entity ID
    Relationship   string           // required: relationship type (relates_to, extends, etc.)
    Weight         float64          // starts at 1.0, grows via strengthen
    Context        *string          // optional: WHY this connection exists
    CreatedAt      time.Time        // assigned by store
    UpdatedAt      time.Time        // assigned by store
}

EmbeddingMeta {
    Provider   string     // embedding provider name (e.g., "gemini", "openai")
    Model      string     // model name (e.g., "text-embedding-004")
    Dimensions int        // vector dimensions produced by this model
    CreatedAt  time.Time  // when the metadata was first recorded
}

EmbeddingMeta is a single-row record (enforced by CHECK (id = 1) in SQLite) that tracks which embedding provider and model generated the vectors in this database. One row per .db file. Used by CheckEmbeddingMeta to prevent silent degradation when the configured provider changes.

Entity types are freeform strings — agents define their own vocabulary (e.g., article, thought, concept, pattern, observation, decision, bug).

Relationship types are also freeform (e.g., relates_to, contradicts, extends, inspired_by, applies_to, similar_to).

Three search modes:

  1. FTS — keyword search via SQLite FTS5, BM25 ranking
  2. Vector — semantic search via cosine distance on embeddings
  3. Hybrid — combines FTS + vector results via Reciprocal Rank Fusion (k=60)

Discovery operations surface non-obvious patterns:

  • Orphans — entities with no edges
  • Clusters — densely connected entity groups
  • Bridges — high-degree entities connecting separate clusters
  • Temporal — entity type distribution over time periods
  • Weak edges — edges missing context prose

queueprim: Job

Job {
    ID            string           // q_<nanoid>, assigned by store
    Queue         string           // required: named queue (slashes allowed, e.g., infra/prod)
    Type          string           // optional: job type category for type-filtered dequeue
    Priority      Priority         // high | normal (default) | low
    Payload       json.RawMessage  // required: arbitrary JSON work description
    Status        Status           // pending → claimed → done | failed | dead
    ClaimedBy     *string          // set on dequeue: worker name
    ClaimedAt     *time.Time       // set on dequeue
    VisibleAfter  time.Time        // delayed jobs: not visible until this time
    CompletedAt   *time.Time       // set on complete
    Output        json.RawMessage  // optional: worker result payload
    FailureReason *string          // set on fail
    AttemptCount  int              // incremented on each dequeue
    MaxRetries    int              // 0 = one-shot; >0 = retry up to N times before dead
    CreatedAt     time.Time        // assigned by store
    UpdatedAt     time.Time        // assigned by store
}

State machine:

  enqueue()           dequeue()           complete()
  ─────────► pending ──────────► claimed ──────────► done

                                    │  fail() + retries remain
                                    ├──────────────────────── → pending (re-queued)

                                    │  fail() + retries exhausted
                                    │  fail(--dead)
                                    └──────────────────────── → dead

Priority ordering: high → normal → low. Within a priority level, ordering is FIFO.

Visibility timeout: Claimed jobs hold a visible_after lock. If a worker crashes without completing, a background sweeper goroutine releases the claim once visible_after passes, returning the job to pending.

Data Flow

CLI Command

User → cobra command → parse flags → store.Operation() → format output → stdout

HTTP API Request

Client → HTTP request
  → RequestID middleware (assigns/propagates X-Request-ID)
  → Logging middleware (logs method, path, status, duration)
  → Recovery middleware (catches panics → 500)
  → Auth middleware (validates Bearer token → 401 if invalid)
  → API handler → store.Operation()
  → JSON response

MCP Tool Call

Agent → MCP protocol (stdio or SSE)
  → mcp-go framework → tool handler → store.Operation()
  → MCP response

Replication

Litestream is embedded as a Go library (not a sidecar process). WAL frames are continuously streamed to S3-compatible object storage (R2, S3, B2, GCS).

Lifecycle

Replication runs for every command — CLI, serve, and MCP alike. This is managed in root.go via Cobra's PersistentPreRunE and PersistentPostRunE:

PersistentPreRunE:
  1. Resolve database path (flag → env var → home default)
  2. Load config (YAML + env overrides)
  3. RestoreIfNeeded: if replication enabled and local DB missing,
     download from replica before opening
  4. Open SQLite database
  5. Start Litestream replication (continuous WAL streaming)

Command executes (add, list, serve, mcp, etc.)

PersistentPostRunE:
  6. Stop replication with final sync

For short-lived CLI commands, the final sync ensures the last WAL changes reach the replica. For long-running serve/MCP commands, replication streams continuously.

Restore

Two restore paths:

  • Auto-restore (RestoreIfNeeded): On startup, if the local DB file doesn't exist but replication is configured, the DB is automatically downloaded from the replica. This enables stateless deployments.
  • Manual restore (taskprim restore / stateprim restore / knowledgeprim restore / queueprim restore): Point-in-time recovery. Overwrites the local database with the latest replica.

Embedding (knowledgeprim only)

knowledgeprim supports optional vector embeddings for semantic search. The embedding layer is a pluggable interface:

type Embedder interface {
    Embed(ctx context.Context, text string) ([]float32, error)
    Dimensions() int
    Provider() string  // "gemini", "openai", or "custom"
    Model() string     // e.g., "text-embedding-004"
}

Provider() and Model() are used by the metadata safety layer to detect provider changes. Three implementations ship out of the box:

ProviderModelDimensions
geminitext-embedding-004768
openaitext-embedding-3-small1536
customAny OpenAI-compatible endpointConfigurable

Embedding is optional. Without it, knowledgeprim still provides:

  • Full-text search (FTS5/BM25)
  • Manual edge creation
  • Graph traversal
  • Discovery operations

You only lose vector search and auto-connect.

Embedding Metadata Safety

Each knowledgeprim database stores a single EmbeddingMeta row (in the embedding_meta table, CHECK (id = 1)) recording which provider and model generated the stored vectors. This prevents silent degradation when switching embedding providers — old 768-dimension Gemini vectors are incompatible with a new 1536-dimension OpenAI config.

Flow on capture or search --mode vector/hybrid:

CheckEmbeddingMeta(provider, model, dimensions)
  ├── No meta yet → OK (first embed will call EnsureEmbeddingMeta)
  ├── Meta matches config → OK
  └── Meta differs → ErrEmbeddingMismatch with clear message:
        "db uses gemini/text-embedding-004 (768d),
         config uses openai/text-embedding-3-small (1536d).
         Use --mode fts, run re-embed, or pass --force"

Recovery options:

OptionWhen to use
knowledgeprim re-embedSwitching to a new provider — re-generates all vectors
knowledgeprim strip-vectors --confirmDropping back to FTS5-only — removes all vectors and metadata
--force flagBypassing the check temporarily (risky — mixed-dimension vectors in one DB)
--mode fts on searchRead-only fallback that skips vector operations entirely

Auto-Connect

When embedding is configured, CaptureEntity can automatically link new entities to semantically similar existing ones:

  1. Embed the new entity's title + body
  2. Cosine distance search against all existing embeddings
  3. Entities below the threshold get automatic similar_to edges
  4. Configurable: threshold (default 0.35), max connections (default 10)

Authentication

Authentication is only active in serve and MCP SSE modes. CLI mode uses filesystem permissions.

  • API keys are configured in config.yaml and mapped to human-readable names
  • Keys are validated using constant-time comparison (prevents timing attacks)
  • When no keys are configured, the server runs in open mode (all requests allowed)
  • The authenticated key's name is injected into the request context and used as the source field for created tasks/records

HTTP Server

The HTTP server wraps net/http with:

  • Graceful shutdown: Listens for SIGINT/SIGTERM, gives in-flight requests 10 seconds to complete
  • Timeouts: 30s read, 30s write, 60s idle
  • Middleware chain: RequestID → Logging → Recovery → Auth → Handler

SQLite

Pure Go SQLite via modernc.org/sqlite (no CGo). This simplifies cross-compilation, especially for ARM64 (Raspberry Pi).

  • WAL mode: concurrent readers during writes, required for Litestream and serve mode
  • Foreign keys: enforced for referential integrity
  • Busy timeout: 5 seconds to prevent lock contention errors
  • Embedded migrations: SQL files are embedded via embed.FS for single-binary deployment
  • In-memory mode: available for tests (db.OpenInMemory())

Build

The monorepo uses a Go workspace (go.work) with five modules. The Makefile provides:

TargetDescription
make buildCompile bin/taskprim, bin/stateprim, bin/knowledgeprim, and bin/queueprim
make build-piCross-compile for ARM64 Linux
make testRun all tests with race detector
make lintRun go vet across all modules
make fmtFormat all code with gofmt
make tidyRun go mod tidy for all modules
make alltidy → fmt → lint → test → build