dealspace/EMBED-SPEC.md

30 KiB

Dealspace — AI Matching & Embedding Specification

Version: 0.1 — 2026-02-28
Status: Pre-implementation. Addresses SPEC-REVIEW.md section 3 (race conditions) and section 7.4 (magic threshold).


1. Embedding Model Selection

1.1 Candidates

Model Provider Dimensions Context Latency Cost Retention
nomic-embed-text-v1.5 Fireworks 768 8192 ~50ms $0.008/1M Zero
voyage-finance-2 Voyage AI 1024 16000 ~80ms $0.12/1M 30 days

1.2 Domain Analysis

M&A requests contain:

  • Financial terminology: EBITDA, working capital adjustments, earnout provisions, rep & warranty
  • Legal terminology: indemnification, IP assignments, material adverse change, disclosure schedules
  • Industry-specific terms: varies by deal (tech: ARR, churn; healthcare: HIPAA, 340B; manufacturing: capex, inventory turns)

Voyage-finance-2 was trained specifically on financial documents (10-Ks, credit agreements, M&A filings). It shows ~8% improvement on financial similarity benchmarks vs. general-purpose models.

Nomic-embed-text-v1.5 is general-purpose but performs well on semantic matching. Zero retention is critical for M&A confidentiality.

1.3 Recommendation

Use Fireworks nomic-embed-text-v1.5 for MVP:

  1. Zero retention — Voyage's 30-day retention is unacceptable for M&A data
  2. 15x cheaper — allows generous matching without cost concerns
  3. Proven stack — same infrastructure as inou (known good)
  4. Good enough — general semantic similarity works for request matching; we're matching "audited financials" to "FY2024 audit report," not parsing covenant calculations

Revisit voyage-finance-2 when:

  • Voyage offers a zero-retention API option
  • Match quality metrics show <85% human confirmation rate
  • We expand to quantitative matching (finding answers by numerical similarity)

1.4 Fireworks Configuration

const (
    EmbedModel     = "nomic-embed-text-v1.5"
    EmbedEndpoint  = "https://api.fireworks.ai/inference/v1/embeddings"
    EmbedDimension = 768
    EmbedMaxTokens = 8192  // model context limit
)

API key stored in environment: FIREWORKS_API_KEY


2. What to Embed

2.1 Request Embedding

Embed the semantic intent, not raw fields.

func BuildRequestEmbedText(r *Request, ws *Workstream) string {
    var b strings.Builder
    
    // Workstream context (aids cross-workstream relevance)
    b.WriteString("Workstream: ")
    b.WriteString(ws.Name)
    b.WriteString("\n\n")
    
    // Title is high-signal
    b.WriteString("Request: ")
    b.WriteString(r.Title)
    b.WriteString("\n\n")
    
    // Body provides detail
    if r.Body != "" {
        b.WriteString("Details: ")
        b.WriteString(r.Body)
    }
    
    return strings.TrimSpace(b.String())
}

Do NOT embed:

  • Request ID, ref numbers (non-semantic)
  • Due dates, priority (operational, not semantic)
  • Assignee names (PII, not relevant to matching)
  • Status (changes frequently, embedding is point-in-time)

2.2 Answer Embedding

Answers may be long (multi-document explanations). Use chunking with overlap.

const (
    AnswerChunkSize    = 1500  // tokens (~6000 chars)
    AnswerChunkOverlap = 200   // tokens overlap for context continuity
)

func BuildAnswerEmbedTexts(a *Answer, ws *Workstream) []string {
    var chunks []string
    
    // Prefix every chunk with context
    prefix := fmt.Sprintf("Workstream: %s\nAnswer: %s\n\n", ws.Name, a.Title)
    
    body := a.Body
    if len(body) <= AnswerChunkSize*4 { // ~6000 chars = 1 chunk
        return []string{prefix + body}
    }
    
    // Chunk long answers
    for start := 0; start < len(body); {
        end := start + AnswerChunkSize*4
        if end > len(body) {
            end = len(body)
        }
        
        // Find sentence boundary near end
        if end < len(body) {
            if idx := strings.LastIndex(body[start:end], ". "); idx > 0 {
                end = start + idx + 1
            }
        }
        
        chunks = append(chunks, prefix+body[start:end])
        start = end - AnswerChunkOverlap*4
        if start < 0 {
            start = 0
        }
    }
    
    return chunks
}

Do NOT embed:

  • File contents (privacy: see section 10)
  • File names (may contain sensitive identifiers)
  • Rejection reasons (operational, not semantic)
  • Internal comments (IB-only context)

2.3 Embedding Storage

CREATE TABLE embeddings (
    id         TEXT PRIMARY KEY,           -- UUID
    entry_id   TEXT NOT NULL,              -- request or answer entry_id
    chunk_idx  INTEGER NOT NULL DEFAULT 0, -- 0 for single-chunk, 0..N for multi-chunk
    vector     BLOB NOT NULL,              -- 768 float32 = 3072 bytes
    text_hash  TEXT NOT NULL,              -- SHA-256 of embedded text (dedup check)
    model      TEXT NOT NULL,              -- "nomic-embed-text-v1.5"
    created_at INTEGER NOT NULL,
    UNIQUE(entry_id, chunk_idx)
);

CREATE INDEX idx_embeddings_entry ON embeddings(entry_id);

Note: Vector stored as raw float32 bytes (little-endian). Cosine similarity computed in Go, not SQL.


3. Retroactive Matching (Bidirectional)

3.1 The Problem (from SPEC-REVIEW §3.1)

The original spec only describes matching when a buyer submits a request. But:

  • New request should search existing published answers
  • New answer (when published) should search open requests

Both directions are required for complete coverage.

3.2 Matching Directions

Direction 1: Request → Answers
  Trigger: Request created OR request text updated
  Search:  All published answers in accessible workstreams
  Output:  Suggested answer_links with ai_score

Direction 2: Answer → Requests  
  Trigger: Answer published (stage = dataroom)
  Search:  All open requests in accessible workstreams
  Output:  Suggested answer_links with ai_score

3.3 Implementation

// Called when request is created or body/title changes
func MatchRequestToAnswers(ctx context.Context, requestID string) ([]AnswerMatch, error) {
    // 1. Get request embedding (create if missing)
    // 2. Load all published answer embeddings in same workstream
    // 3. Cosine similarity against each
    // 4. Return matches above threshold
}

// Called when answer is published
func MatchAnswerToRequests(ctx context.Context, answerID string) ([]RequestMatch, error) {
    // 1. Get answer embedding(s) (multi-chunk: use max score across chunks)
    // 2. Load all open request embeddings in same workstream
    // 3. Cosine similarity against each
    // 4. Return matches above threshold
}

3.4 Matching on Update

If a request body is edited:

  1. Recompute embedding (check text_hash — skip if unchanged)
  2. Re-run matching
  3. New suggestions appear; existing confirmed links preserved

If an answer body is edited before publish:

  • No action (draft state)

If an answer is re-published (correction):

  • Re-run matching
  • Flag for human review if new requests match

4. Broadcast Idempotency

4.1 The Problem (from SPEC-REVIEW §3.2)

Multiple requests can link to the same answer. Without idempotency:

  • Confirming R1↔A1 sends broadcast to Buyer A
  • Confirming R2↔A1 sends another broadcast to Buyer B
  • If Buyer A also asked R2... they get two notifications

4.2 Broadcasts Table

CREATE TABLE broadcasts (
    id           TEXT PRIMARY KEY,
    answer_id    TEXT NOT NULL REFERENCES entries(entry_id),
    request_id   TEXT NOT NULL REFERENCES entries(entry_id),
    recipient_id TEXT NOT NULL REFERENCES users(id),
    channel      TEXT NOT NULL,       -- "web" | "email" | "slack" | "teams"
    sent_at      INTEGER NOT NULL,
    UNIQUE(answer_id, request_id, recipient_id, channel)
);

CREATE INDEX idx_broadcasts_answer ON broadcasts(answer_id);
CREATE INDEX idx_broadcasts_recipient ON broadcasts(recipient_id);

4.3 Broadcast Logic

func BroadcastAnswer(ctx context.Context, tx *sql.Tx, answerID string) error {
    // 1. Get all confirmed answer_links for this answer
    links, err := getConfirmedLinks(tx, answerID)
    
    // 2. For each link, get the request's origin_id (ultimate requester)
    recipients := make(map[string][]string) // user_id -> []request_id
    for _, link := range links {
        req, _ := getRequest(tx, link.RequestID)
        recipients[req.OriginID] = append(recipients[req.OriginID], link.RequestID)
    }
    
    // 3. For each recipient, check idempotency and send
    for userID, requestIDs := range recipients {
        for _, reqID := range requestIDs {
            // Check if already sent
            exists, _ := broadcastExists(tx, answerID, reqID, userID, "web")
            if exists {
                continue // idempotent skip
            }
            
            // Record broadcast
            err := insertBroadcast(tx, answerID, reqID, userID, "web", time.Now().UnixMilli())
            if err != nil {
                return err
            }
            
            // Queue notification (outside transaction)
            NotifyQueue.Push(Notification{
                UserID:    userID,
                AnswerID:  answerID,
                RequestID: reqID,
            })
        }
    }
    
    return nil
}

4.4 Notification Deduplication

Even with idempotency per (answer, request, recipient), a user might get multiple notifications if they asked 5 equivalent questions.

User-facing behavior: Collapse into single notification:

"Answer published: [Title] — resolves 5 of your requests"

This is a presentation concern, not a data model change. The broadcasts table tracks each link; the notification renderer collapses them.


5. Configurable Similarity Threshold

5.1 The Problem (from SPEC-REVIEW §7.4)

Hardcoded 0.72 is a magic number that:

  • May be too strict for some workstreams (legal requests are verbose)
  • May be too loose for others (financial requests are terse)
  • Cannot be tuned without code changes

5.2 Per-Workstream Configuration

Add to workstream entry's Data:

{
  "name": "Finance",
  "match_config": {
    "threshold": 0.72,
    "auto_confirm_threshold": 0.95,
    "cross_workstream": ["Legal"]
  }
}
Field Type Default Description
threshold float 0.72 Minimum score to suggest match
auto_confirm_threshold float null If set, scores above this auto-confirm (no human review)
cross_workstream []string [] Workstream slugs to include in matching (see section 6)

5.3 Threshold Tuning Guidance

Workstream Type Recommended Threshold Rationale
Finance 0.72 Standard M&A requests, well-defined terminology
Legal 0.68 Verbose requests with boilerplate, semantic core is smaller
IT 0.75 Technical specificity matters, false positives costly
HR 0.70 Mix of standard and org-specific terms
Operations 0.72 General business terminology

5.4 Calibration Process

After initial deal data:

  1. Export all (request, answer, human_confirmed) tuples
  2. Compute score distribution for confirmed vs. rejected matches
  3. Adjust threshold to maximize F1 score per workstream
  4. Log threshold changes to audit for compliance

6. Cross-Workstream Matching

6.1 Use Case

An IT request ("Describe cybersecurity insurance coverage") may be answered by a Legal answer (cyber liability policy document).

Without cross-workstream matching, the IT buyer never sees the Legal answer.

6.2 Opt-In Per Workstream Pair

Configured in each workstream's match_config.cross_workstream:

// IT workstream
{
  "match_config": {
    "cross_workstream": ["Legal"]  // IT requests search Legal answers
  }
}

// Legal workstream  
{
  "match_config": {
    "cross_workstream": ["IT", "Finance"]  // Legal requests search IT and Finance
  }
}

Relationship is directional: IT searching Legal doesn't imply Legal searches IT.

6.3 RBAC Interaction

Cross-workstream matching only returns answers the requester can access:

func GetMatchableAnswers(ctx context.Context, actorID, requestWorkstreamID string) ([]Answer, error) {
    // 1. Get workstream config
    ws, _ := getWorkstream(requestWorkstreamID)
    
    // 2. Build workstream list (self + cross)
    workstreams := []string{ws.ID}
    workstreams = append(workstreams, ws.MatchConfig.CrossWorkstream...)
    
    // 3. Filter by access (RBAC)
    var accessible []string
    for _, wsID := range workstreams {
        if hasAccess(actorID, wsID, "read") {
            accessible = append(accessible, wsID)
        }
    }
    
    // 4. Get published answers from accessible workstreams
    return getPublishedAnswers(accessible)
}

7. Request Deduplication (Auto-Suggest Existing Answers)

7.1 The Problem

Buyer B asks the same question Buyer A already got answered. Without dedup:

  • Seller does duplicate work
  • IB reviews duplicate request
  • Buyer B waits when answer already exists

7.2 Dedup Flow

Buyer B submits request
  → Embed request
  → Search published answers (same logic as section 3)
  → If match score ≥ threshold:
      → Show Buyer B: "Similar answer already exists — view it?"
      → If Buyer B accepts: link request to existing answer, mark resolved
      → If Buyer B declines: proceed with normal request flow

7.3 UX Considerations

Don't block submission. Show suggestion after submit, not as a gate:

┌─────────────────────────────────────────────────────────────┐
│ Your request has been submitted.                            │
│                                                              │
│ 💡 We found a similar published answer that may help:        │
│                                                              │
│    "FY2024 Audited Financial Statements"                    │
│    Published: 2026-02-15 | Similarity: 89%                  │
│                                                              │
│    [View Answer]  [This doesn't answer my question]         │
└─────────────────────────────────────────────────────────────┘

7.4 Data Model

When buyer accepts the suggestion:

INSERT INTO answer_links (answer_id, request_id, linked_by, linked_at, status, ai_score)
VALUES (?, ?, ?, ?, 'self_confirmed', ?);
-- status = 'self_confirmed' means buyer accepted the AI suggestion
-- no IB review required

When buyer declines:

INSERT INTO answer_links (answer_id, request_id, linked_by, linked_at, status, ai_score)
VALUES (?, ?, ?, ?, 'rejected_by_requester', ?);
-- Prevents suggesting this answer again for this request
-- Request proceeds to normal IB/Seller workflow

8. Race Condition Fixes (DB Transactions)

8.1 The Problem (from SPEC-REVIEW §3)

Without transactions:

  1. IB confirms match R1↔A1
  2. Concurrent: IB publishes A1
  3. Broadcast fires during confirm
  4. Confirm completes, tries to broadcast again
  5. Duplicate notifications or worse — inconsistent state

8.2 Transaction Boundaries

Atomic operation 1: Confirm Match

func ConfirmMatch(ctx context.Context, answerID, requestID, actorID string) error {
    return db.Transaction(func(tx *sql.Tx) error {
        // 1. Verify answer exists and is published
        answer, err := getAnswer(tx, answerID)
        if err != nil || answer.Status != "published" {
            return ErrAnswerNotPublished
        }
        
        // 2. Update answer_link status
        err = updateAnswerLink(tx, answerID, requestID, "confirmed", actorID)
        if err != nil {
            return err
        }
        
        // 3. Broadcast (idempotent)
        err = BroadcastAnswer(ctx, tx, answerID)
        if err != nil {
            return err
        }
        
        return nil
    })
}

Atomic operation 2: Publish Answer

func PublishAnswer(ctx context.Context, answerID, actorID string) error {
    return db.Transaction(func(tx *sql.Tx) error {
        // 1. Update answer status
        err := updateAnswerStatus(tx, answerID, "published", actorID)
        if err != nil {
            return err
        }
        
        // 2. Update entry stage to dataroom
        err = updateEntryStage(tx, answerID, "dataroom")
        if err != nil {
            return err
        }
        
        // 3. Run retroactive matching (creates pending answer_links)
        matches, err := MatchAnswerToRequests(ctx, answerID)
        if err != nil {
            return err
        }
        
        for _, m := range matches {
            err = insertAnswerLink(tx, answerID, m.RequestID, "pending", m.Score)
            if err != nil {
                return err
            }
        }
        
        // 4. Broadcast already-confirmed links (if any pre-existed)
        err = BroadcastAnswer(ctx, tx, answerID)
        if err != nil {
            return err
        }
        
        return nil
    })
}

8.3 Optimistic Locking

Add version column to prevent concurrent modifications:

ALTER TABLE entries ADD COLUMN version INTEGER NOT NULL DEFAULT 1;
func updateAnswerStatus(tx *sql.Tx, answerID, status string, expectedVersion int) (int, error) {
    result, err := tx.Exec(`
        UPDATE entries 
        SET data = json_set(data, '$.status', ?),
            version = version + 1,
            updated_at = ?
        WHERE entry_id = ? AND version = ?
    `, status, time.Now().UnixMilli(), answerID, expectedVersion)
    
    if err != nil {
        return 0, err
    }
    
    rows, _ := result.RowsAffected()
    if rows == 0 {
        return 0, ErrConcurrentModification
    }
    
    return expectedVersion + 1, nil
}

9. SQLite Cosine Similarity & Qdrant Migration

9.1 Pure Go Cosine Similarity

SQLite doesn't have native vector operations. Compute in Go:

// CosineSimilarity computes similarity between two vectors.
// Vectors must be same length. Returns value in [-1, 1].
func CosineSimilarity(a, b []float32) float32 {
    if len(a) != len(b) {
        panic("vector length mismatch")
    }
    
    var dotProduct, normA, normB float32
    for i := range a {
        dotProduct += a[i] * b[i]
        normA += a[i] * a[i]
        normB += b[i] * b[i]
    }
    
    if normA == 0 || normB == 0 {
        return 0
    }
    
    return dotProduct / (float32(math.Sqrt(float64(normA))) * float32(math.Sqrt(float64(normB))))
}

// BatchCosineSimilarity computes query vs all candidates.
// Uses SIMD via Go compiler optimizations.
func BatchCosineSimilarity(query []float32, candidates [][]float32) []float32 {
    scores := make([]float32, len(candidates))
    
    // Pre-compute query norm
    var queryNorm float32
    for _, v := range query {
        queryNorm += v * v
    }
    queryNorm = float32(math.Sqrt(float64(queryNorm)))
    
    for i, candidate := range candidates {
        var dot, candNorm float32
        for j := range query {
            dot += query[j] * candidate[j]
            candNorm += candidate[j] * candidate[j]
        }
        candNorm = float32(math.Sqrt(float64(candNorm)))
        
        if queryNorm == 0 || candNorm == 0 {
            scores[i] = 0
        } else {
            scores[i] = dot / (queryNorm * candNorm)
        }
    }
    
    return scores
}

9.2 Performance Characteristics (SQLite + Go)

Embeddings Load Time Search Time Memory
1,000 50ms 2ms 3 MB
10,000 500ms 20ms 30 MB
100,000 5s 200ms 300 MB
1,000,000 50s 2s 3 GB

Acceptable for MVP: Most deals have <10,000 documents. Search under 100ms is fine.

9.3 Qdrant Migration Threshold

Migrate to Qdrant when:

  1. Embedding count > 100,000 — search latency exceeds 200ms
  2. Memory pressure — embeddings consume >500MB RAM
  3. Multi-tenancy — need isolated collections per client (compliance)

9.4 Qdrant Integration (Future)

type VectorStore interface {
    Upsert(id string, vector []float32, metadata map[string]any) error
    Search(query []float32, filter map[string]any, limit int) ([]SearchResult, error)
    Delete(id string) error
}

// SQLiteVectorStore implements VectorStore using embeddings table
type SQLiteVectorStore struct { ... }

// QdrantVectorStore implements VectorStore using Qdrant API
type QdrantVectorStore struct { ... }

Abstract behind interface now; swap implementation later without code changes.

9.5 Hybrid Mode (Transition)

During migration:

  1. Write to both SQLite and Qdrant
  2. Read from Qdrant (with SQLite fallback)
  3. Validate results match for first 1000 queries
  4. Drop SQLite embeddings table after validation

10. Privacy: Plaintext Only, Never Files

10.1 Embedding Content Policy

ALLOWED to embed:

  • Request title
  • Request body text
  • Answer title
  • Answer body text (the explanation, not file contents)
  • Workstream name (context)

NEVER embed:

  • File contents (PDF, DOCX, XLSX, images)
  • File names (may contain deal names, party names)
  • Internal comments
  • Routing/assignment metadata
  • User names or email addresses

10.2 Why No File Embedding?

  1. Privacy: M&A documents contain material non-public information. Sending to ANY external API (even zero-retention) creates compliance risk.

  2. Size: A single PDF may be 100+ pages. Embedding would require chunking, storage, and search across potentially millions of chunks. Overkill for request-matching.

  3. Semantic mismatch: Request asks "audited financials for FY2024." The answer body says "Please find attached the FY2024 audited financial statements." The body text + title is sufficient for matching — we don't need to embed page 47 of the PDF.

10.3 Future: On-Premise OCR + Embedding

If file-level search becomes required:

  1. Run OCR on-premise (GLM-OCR on forge, not external API)
  2. Store extracted text in entry.data (encrypted at rest)
  3. Embed extracted text (still goes to Fireworks, but it's our extracted text, not raw file)

This is out of scope for MVP.

10.4 Audit Trail

Log every embedding request for compliance:

CREATE TABLE embed_audit (
    id         TEXT PRIMARY KEY,
    entry_id   TEXT NOT NULL,
    text_hash  TEXT NOT NULL,     -- SHA-256 of text sent
    text_len   INTEGER NOT NULL,  -- character count
    model      TEXT NOT NULL,
    requested_at INTEGER NOT NULL,
    latency_ms INTEGER,
    success    INTEGER NOT NULL
);

Do NOT log the actual text — that defeats the privacy purpose. Log the hash for correlation if needed.


11. lib/embed.go — Function Signatures

11.1 Public API

package lib

import (
    "context"
)

// EmbedConfig holds embedding service configuration.
type EmbedConfig struct {
    APIKey      string  // FIREWORKS_API_KEY
    Endpoint    string  // defaults to Fireworks endpoint
    Model       string  // defaults to nomic-embed-text-v1.5
    Timeout     time.Duration
    MaxRetries  int
}

// EmbedResult contains the embedding and metadata.
type EmbedResult struct {
    Vector    []float32
    TextHash  string  // SHA-256 of input text
    Model     string
    TokenCount int
    LatencyMs int64
}

// MatchResult represents a potential match with score.
type MatchResult struct {
    EntryID   string
    ChunkIdx  int
    Score     float32
    EntryType string  // "request" | "answer"
}

// Embed generates an embedding for the given text.
// Returns ErrTextTooLong if text exceeds model context.
// Returns ErrEmptyText if text is empty or whitespace only.
func Embed(ctx context.Context, cfg *EmbedConfig, text string) (*EmbedResult, error)

// EmbedBatch generates embeddings for multiple texts.
// More efficient than calling Embed in a loop (single API call).
// Max 100 texts per batch.
func EmbedBatch(ctx context.Context, cfg *EmbedConfig, texts []string) ([]*EmbedResult, error)

// EmbedRequest creates and stores embedding for a request entry.
// Idempotent: skips if embedding exists and text_hash matches.
func EmbedRequest(ctx context.Context, db *sql.DB, cfg *EmbedConfig, requestID string) error

// EmbedAnswer creates and stores embedding(s) for an answer entry.
// May produce multiple chunks for long answers.
// Idempotent: skips chunks where text_hash matches.
func EmbedAnswer(ctx context.Context, db *sql.DB, cfg *EmbedConfig, answerID string) error

// MatchRequestToAnswers finds published answers matching the request.
// Returns matches above the workstream's configured threshold.
// Respects cross-workstream config and RBAC.
func MatchRequestToAnswers(ctx context.Context, db *sql.DB, actorID, requestID string) ([]MatchResult, error)

// MatchAnswerToRequests finds open requests matching the answer.
// Returns matches above the workstream's configured threshold.
// Respects cross-workstream config and RBAC.
func MatchAnswerToRequests(ctx context.Context, db *sql.DB, actorID, answerID string) ([]MatchResult, error)

// FindDuplicateRequests finds existing requests similar to the given text.
// Used for deduplication suggestions before/after submission.
func FindDuplicateRequests(ctx context.Context, db *sql.DB, actorID, workstreamID, text string) ([]MatchResult, error)

// CosineSimilarity computes similarity between two vectors.
func CosineSimilarity(a, b []float32) float32

// DeleteEmbeddings removes all embeddings for an entry.
// Called when entry is deleted.
func DeleteEmbeddings(ctx context.Context, db *sql.DB, entryID string) error

// RefreshEmbedding re-embeds an entry if content changed.
// Compares text_hash to detect changes.
// Returns true if embedding was updated.
func RefreshEmbedding(ctx context.Context, db *sql.DB, cfg *EmbedConfig, entryID string) (bool, error)

11.2 Async Embedding on Publish

Embedding should not block the user action. Use async processing:

// EmbedQueue is a background worker that processes embedding requests.
type EmbedQueue struct {
    cfg    *EmbedConfig
    db     *sql.DB
    queue  chan embedJob
    wg     sync.WaitGroup
}

type embedJob struct {
    EntryID   string
    EntryType string  // "request" | "answer"
    Priority  int     // 0 = normal, 1 = high (new request needs matching)
}

// Start begins processing the embedding queue.
// Workers defaults to 2 (Fireworks rate limit friendly).
func (q *EmbedQueue) Start(workers int)

// Stop gracefully shuts down the queue.
func (q *EmbedQueue) Stop()

// Enqueue adds an entry for embedding.
// Non-blocking; returns immediately.
func (q *EmbedQueue) Enqueue(entryID, entryType string, priority int)

11.3 Integration Points

On Request Create:

func HandleCreateRequest(w http.ResponseWriter, r *http.Request) {
    // ... validation, RBAC, insert entry ...
    
    // Queue embedding (non-blocking)
    embedQueue.Enqueue(request.ID, "request", 1)  // high priority
    
    // Return success immediately
    respondJSON(w, request)
}

On Answer Publish:

func HandlePublishAnswer(w http.ResponseWriter, r *http.Request) {
    err := db.Transaction(func(tx *sql.Tx) error {
        // ... update status, stage ...
        
        // Embedding happens inline for matching (within transaction timeout)
        err := EmbedAnswer(ctx, tx, cfg, answer.ID)
        if err != nil {
            // Log but don't fail — matching can happen later
            log.Warn("embedding failed, will retry", "error", err)
        }
        
        // Match and create answer_links
        matches, _ := MatchAnswerToRequests(ctx, tx, actorID, answer.ID)
        for _, m := range matches {
            insertAnswerLink(tx, answer.ID, m.EntryID, "pending", m.Score)
        }
        
        // Broadcast confirmed links
        return BroadcastAnswer(ctx, tx, answer.ID)
    })
    
    respondJSON(w, answer)
}

11.4 Error Handling

var (
    ErrTextTooLong       = errors.New("text exceeds model context limit")
    ErrEmptyText         = errors.New("text is empty or whitespace only")
    ErrEmbeddingFailed   = errors.New("embedding API call failed")
    ErrRateLimited       = errors.New("embedding API rate limited")
    ErrNoEmbedding       = errors.New("entry has no embedding")
    ErrWorkstreamConfig  = errors.New("workstream missing match configuration")
)

Retry policy for transient errors:

  • ErrRateLimited: exponential backoff, max 3 retries
  • ErrEmbeddingFailed: retry once after 1s
  • All others: fail immediately

Incorporates SPEC-REVIEW feedback on rejection tracking:

CREATE TABLE answer_links (
    answer_id    TEXT NOT NULL REFERENCES entries(entry_id),
    request_id   TEXT NOT NULL REFERENCES entries(entry_id),
    
    -- Who created the link
    linked_by    TEXT NOT NULL,
    linked_at    INTEGER NOT NULL,
    
    -- AI matching metadata
    ai_score     REAL,           -- cosine similarity at time of match
    ai_model     TEXT,           -- model used for matching
    
    -- Review status
    status       TEXT NOT NULL DEFAULT 'pending',
        -- 'pending': AI suggested, awaiting human review
        -- 'confirmed': IB confirmed the match
        -- 'rejected': IB rejected the match
        -- 'self_confirmed': requester accepted dedup suggestion
        -- 'rejected_by_requester': requester declined dedup suggestion
    
    reviewed_by  TEXT,           -- who reviewed (if status != pending)
    reviewed_at  INTEGER,        -- when reviewed
    reject_reason TEXT,          -- why rejected (if status = rejected)
    
    PRIMARY KEY (answer_id, request_id)
);

CREATE INDEX idx_links_answer ON answer_links(answer_id);
CREATE INDEX idx_links_request ON answer_links(request_id);
CREATE INDEX idx_links_status ON answer_links(status);

13. Summary: What Gets Built

Component Location Purpose
lib/embed.go Core embedding logic API calls, similarity, storage
embeddings table Schema Vector storage
broadcasts table Schema Idempotency
answer_links Schema update Status + rejection tracking
embed_audit table Schema Compliance logging
EmbedQueue Background worker Async processing
Workstream config Entry.Data Per-workstream thresholds

Not built (future):

  • Qdrant integration (interface defined, impl deferred)
  • File content embedding (privacy: out of scope)
  • Auto-confirm (threshold defined, feature disabled for MVP)

This document extends SPEC.md. If conflicts exist, discuss before implementing.