dealspace/EMBED-SPEC.md

983 lines
30 KiB
Markdown

# Dealspace — AI Matching & Embedding Specification
**Version:** 0.1 — 2026-02-28
**Status:** Pre-implementation. Addresses SPEC-REVIEW.md section 3 (race conditions) and section 7.4 (magic threshold).
---
## 1. Embedding Model Selection
### 1.1 Candidates
| Model | Provider | Dimensions | Context | Latency | Cost | Retention |
|-------|----------|------------|---------|---------|------|-----------|
| `nomic-embed-text-v1.5` | Fireworks | 768 | 8192 | ~50ms | $0.008/1M | Zero |
| `voyage-finance-2` | Voyage AI | 1024 | 16000 | ~80ms | $0.12/1M | 30 days |
### 1.2 Domain Analysis
M&A requests contain:
- **Financial terminology:** EBITDA, working capital adjustments, earnout provisions, rep & warranty
- **Legal terminology:** indemnification, IP assignments, material adverse change, disclosure schedules
- **Industry-specific terms:** varies by deal (tech: ARR, churn; healthcare: HIPAA, 340B; manufacturing: capex, inventory turns)
**Voyage-finance-2** was trained specifically on financial documents (10-Ks, credit agreements, M&A filings). It shows ~8% improvement on financial similarity benchmarks vs. general-purpose models.
**Nomic-embed-text-v1.5** is general-purpose but performs well on semantic matching. Zero retention is critical for M&A confidentiality.
### 1.3 Recommendation
**Use Fireworks nomic-embed-text-v1.5** for MVP:
1. **Zero retention** — Voyage's 30-day retention is unacceptable for M&A data
2. **15x cheaper** — allows generous matching without cost concerns
3. **Proven stack** — same infrastructure as inou (known good)
4. **Good enough** — general semantic similarity works for request matching; we're matching "audited financials" to "FY2024 audit report," not parsing covenant calculations
**Revisit voyage-finance-2 when:**
- Voyage offers a zero-retention API option
- Match quality metrics show <85% human confirmation rate
- We expand to quantitative matching (finding answers by numerical similarity)
### 1.4 Fireworks Configuration
```go
const (
EmbedModel = "nomic-embed-text-v1.5"
EmbedEndpoint = "https://api.fireworks.ai/inference/v1/embeddings"
EmbedDimension = 768
EmbedMaxTokens = 8192 // model context limit
)
```
API key stored in environment: `FIREWORKS_API_KEY`
---
## 2. What to Embed
### 2.1 Request Embedding
Embed the **semantic intent**, not raw fields.
```go
func BuildRequestEmbedText(r *Request, ws *Workstream) string {
var b strings.Builder
// Workstream context (aids cross-workstream relevance)
b.WriteString("Workstream: ")
b.WriteString(ws.Name)
b.WriteString("\n\n")
// Title is high-signal
b.WriteString("Request: ")
b.WriteString(r.Title)
b.WriteString("\n\n")
// Body provides detail
if r.Body != "" {
b.WriteString("Details: ")
b.WriteString(r.Body)
}
return strings.TrimSpace(b.String())
}
```
**Do NOT embed:**
- Request ID, ref numbers (non-semantic)
- Due dates, priority (operational, not semantic)
- Assignee names (PII, not relevant to matching)
- Status (changes frequently, embedding is point-in-time)
### 2.2 Answer Embedding
Answers may be long (multi-document explanations). Use chunking with overlap.
```go
const (
AnswerChunkSize = 1500 // tokens (~6000 chars)
AnswerChunkOverlap = 200 // tokens overlap for context continuity
)
func BuildAnswerEmbedTexts(a *Answer, ws *Workstream) []string {
var chunks []string
// Prefix every chunk with context
prefix := fmt.Sprintf("Workstream: %s\nAnswer: %s\n\n", ws.Name, a.Title)
body := a.Body
if len(body) <= AnswerChunkSize*4 { // ~6000 chars = 1 chunk
return []string{prefix + body}
}
// Chunk long answers
for start := 0; start < len(body); {
end := start + AnswerChunkSize*4
if end > len(body) {
end = len(body)
}
// Find sentence boundary near end
if end < len(body) {
if idx := strings.LastIndex(body[start:end], ". "); idx > 0 {
end = start + idx + 1
}
}
chunks = append(chunks, prefix+body[start:end])
start = end - AnswerChunkOverlap*4
if start < 0 {
start = 0
}
}
return chunks
}
```
**Do NOT embed:**
- File contents (privacy: see section 10)
- File names (may contain sensitive identifiers)
- Rejection reasons (operational, not semantic)
- Internal comments (IB-only context)
### 2.3 Embedding Storage
```sql
CREATE TABLE embeddings (
id TEXT PRIMARY KEY, -- UUID
entry_id TEXT NOT NULL, -- request or answer entry_id
chunk_idx INTEGER NOT NULL DEFAULT 0, -- 0 for single-chunk, 0..N for multi-chunk
vector BLOB NOT NULL, -- 768 float32 = 3072 bytes
text_hash TEXT NOT NULL, -- SHA-256 of embedded text (dedup check)
model TEXT NOT NULL, -- "nomic-embed-text-v1.5"
created_at INTEGER NOT NULL,
UNIQUE(entry_id, chunk_idx)
);
CREATE INDEX idx_embeddings_entry ON embeddings(entry_id);
```
**Note:** Vector stored as raw `float32` bytes (little-endian). Cosine similarity computed in Go, not SQL.
---
## 3. Retroactive Matching (Bidirectional)
### 3.1 The Problem (from SPEC-REVIEW §3.1)
The original spec only describes matching when a buyer submits a request. But:
- **New request** should search existing published answers
- **New answer** (when published) should search open requests
Both directions are required for complete coverage.
### 3.2 Matching Directions
```
Direction 1: Request → Answers
Trigger: Request created OR request text updated
Search: All published answers in accessible workstreams
Output: Suggested answer_links with ai_score
Direction 2: Answer → Requests
Trigger: Answer published (stage = dataroom)
Search: All open requests in accessible workstreams
Output: Suggested answer_links with ai_score
```
### 3.3 Implementation
```go
// Called when request is created or body/title changes
func MatchRequestToAnswers(ctx context.Context, requestID string) ([]AnswerMatch, error) {
// 1. Get request embedding (create if missing)
// 2. Load all published answer embeddings in same workstream
// 3. Cosine similarity against each
// 4. Return matches above threshold
}
// Called when answer is published
func MatchAnswerToRequests(ctx context.Context, answerID string) ([]RequestMatch, error) {
// 1. Get answer embedding(s) (multi-chunk: use max score across chunks)
// 2. Load all open request embeddings in same workstream
// 3. Cosine similarity against each
// 4. Return matches above threshold
}
```
### 3.4 Matching on Update
If a request body is edited:
1. Recompute embedding (check `text_hash` skip if unchanged)
2. Re-run matching
3. New suggestions appear; existing confirmed links preserved
If an answer body is edited before publish:
- No action (draft state)
If an answer is re-published (correction):
- Re-run matching
- Flag for human review if new requests match
---
## 4. Broadcast Idempotency
### 4.1 The Problem (from SPEC-REVIEW §3.2)
Multiple requests can link to the same answer. Without idempotency:
- Confirming R1A1 sends broadcast to Buyer A
- Confirming R2A1 sends another broadcast to Buyer B
- If Buyer A also asked R2... they get two notifications
### 4.2 Broadcasts Table
```sql
CREATE TABLE broadcasts (
id TEXT PRIMARY KEY,
answer_id TEXT NOT NULL REFERENCES entries(entry_id),
request_id TEXT NOT NULL REFERENCES entries(entry_id),
recipient_id TEXT NOT NULL REFERENCES users(id),
channel TEXT NOT NULL, -- "web" | "email" | "slack" | "teams"
sent_at INTEGER NOT NULL,
UNIQUE(answer_id, request_id, recipient_id, channel)
);
CREATE INDEX idx_broadcasts_answer ON broadcasts(answer_id);
CREATE INDEX idx_broadcasts_recipient ON broadcasts(recipient_id);
```
### 4.3 Broadcast Logic
```go
func BroadcastAnswer(ctx context.Context, tx *sql.Tx, answerID string) error {
// 1. Get all confirmed answer_links for this answer
links, err := getConfirmedLinks(tx, answerID)
// 2. For each link, get the request's origin_id (ultimate requester)
recipients := make(map[string][]string) // user_id -> []request_id
for _, link := range links {
req, _ := getRequest(tx, link.RequestID)
recipients[req.OriginID] = append(recipients[req.OriginID], link.RequestID)
}
// 3. For each recipient, check idempotency and send
for userID, requestIDs := range recipients {
for _, reqID := range requestIDs {
// Check if already sent
exists, _ := broadcastExists(tx, answerID, reqID, userID, "web")
if exists {
continue // idempotent skip
}
// Record broadcast
err := insertBroadcast(tx, answerID, reqID, userID, "web", time.Now().UnixMilli())
if err != nil {
return err
}
// Queue notification (outside transaction)
NotifyQueue.Push(Notification{
UserID: userID,
AnswerID: answerID,
RequestID: reqID,
})
}
}
return nil
}
```
### 4.4 Notification Deduplication
Even with idempotency per (answer, request, recipient), a user might get multiple notifications if they asked 5 equivalent questions.
**User-facing behavior:** Collapse into single notification:
```
"Answer published: [Title] — resolves 5 of your requests"
```
This is a presentation concern, not a data model change. The `broadcasts` table tracks each link; the notification renderer collapses them.
---
## 5. Configurable Similarity Threshold
### 5.1 The Problem (from SPEC-REVIEW §7.4)
Hardcoded 0.72 is a magic number that:
- May be too strict for some workstreams (legal requests are verbose)
- May be too loose for others (financial requests are terse)
- Cannot be tuned without code changes
### 5.2 Per-Workstream Configuration
Add to workstream entry's Data:
```json
{
"name": "Finance",
"match_config": {
"threshold": 0.72,
"auto_confirm_threshold": 0.95,
"cross_workstream": ["Legal"]
}
}
```
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `threshold` | float | 0.72 | Minimum score to suggest match |
| `auto_confirm_threshold` | float | null | If set, scores above this auto-confirm (no human review) |
| `cross_workstream` | []string | [] | Workstream slugs to include in matching (see section 6) |
### 5.3 Threshold Tuning Guidance
| Workstream Type | Recommended Threshold | Rationale |
|-----------------|----------------------|-----------|
| Finance | 0.72 | Standard M&A requests, well-defined terminology |
| Legal | 0.68 | Verbose requests with boilerplate, semantic core is smaller |
| IT | 0.75 | Technical specificity matters, false positives costly |
| HR | 0.70 | Mix of standard and org-specific terms |
| Operations | 0.72 | General business terminology |
### 5.4 Calibration Process
After initial deal data:
1. Export all (request, answer, human_confirmed) tuples
2. Compute score distribution for confirmed vs. rejected matches
3. Adjust threshold to maximize F1 score per workstream
4. Log threshold changes to audit for compliance
---
## 6. Cross-Workstream Matching
### 6.1 Use Case
An IT request ("Describe cybersecurity insurance coverage") may be answered by a Legal answer (cyber liability policy document).
Without cross-workstream matching, the IT buyer never sees the Legal answer.
### 6.2 Opt-In Per Workstream Pair
Configured in each workstream's `match_config.cross_workstream`:
```json
// IT workstream
{
"match_config": {
"cross_workstream": ["Legal"] // IT requests search Legal answers
}
}
// Legal workstream
{
"match_config": {
"cross_workstream": ["IT", "Finance"] // Legal requests search IT and Finance
}
}
```
**Relationship is directional:** IT searching Legal doesn't imply Legal searches IT.
### 6.3 RBAC Interaction
Cross-workstream matching only returns answers the requester can access:
```go
func GetMatchableAnswers(ctx context.Context, actorID, requestWorkstreamID string) ([]Answer, error) {
// 1. Get workstream config
ws, _ := getWorkstream(requestWorkstreamID)
// 2. Build workstream list (self + cross)
workstreams := []string{ws.ID}
workstreams = append(workstreams, ws.MatchConfig.CrossWorkstream...)
// 3. Filter by access (RBAC)
var accessible []string
for _, wsID := range workstreams {
if hasAccess(actorID, wsID, "read") {
accessible = append(accessible, wsID)
}
}
// 4. Get published answers from accessible workstreams
return getPublishedAnswers(accessible)
}
```
---
## 7. Request Deduplication (Auto-Suggest Existing Answers)
### 7.1 The Problem
Buyer B asks the same question Buyer A already got answered. Without dedup:
- Seller does duplicate work
- IB reviews duplicate request
- Buyer B waits when answer already exists
### 7.2 Dedup Flow
```
Buyer B submits request
→ Embed request
→ Search published answers (same logic as section 3)
→ If match score ≥ threshold:
→ Show Buyer B: "Similar answer already exists — view it?"
→ If Buyer B accepts: link request to existing answer, mark resolved
→ If Buyer B declines: proceed with normal request flow
```
### 7.3 UX Considerations
**Don't block submission.** Show suggestion after submit, not as a gate:
```
┌─────────────────────────────────────────────────────────────┐
│ Your request has been submitted. │
│ │
│ 💡 We found a similar published answer that may help: │
│ │
│ "FY2024 Audited Financial Statements" │
│ Published: 2026-02-15 | Similarity: 89% │
│ │
│ [View Answer] [This doesn't answer my question] │
└─────────────────────────────────────────────────────────────┘
```
### 7.4 Data Model
When buyer accepts the suggestion:
```sql
INSERT INTO answer_links (answer_id, request_id, linked_by, linked_at, status, ai_score)
VALUES (?, ?, ?, ?, 'self_confirmed', ?);
-- status = 'self_confirmed' means buyer accepted the AI suggestion
-- no IB review required
```
When buyer declines:
```sql
INSERT INTO answer_links (answer_id, request_id, linked_by, linked_at, status, ai_score)
VALUES (?, ?, ?, ?, 'rejected_by_requester', ?);
-- Prevents suggesting this answer again for this request
-- Request proceeds to normal IB/Seller workflow
```
---
## 8. Race Condition Fixes (DB Transactions)
### 8.1 The Problem (from SPEC-REVIEW §3)
Without transactions:
1. IB confirms match R1A1
2. Concurrent: IB publishes A1
3. Broadcast fires during confirm
4. Confirm completes, tries to broadcast again
5. Duplicate notifications or worse inconsistent state
### 8.2 Transaction Boundaries
**Atomic operation 1: Confirm Match**
```go
func ConfirmMatch(ctx context.Context, answerID, requestID, actorID string) error {
return db.Transaction(func(tx *sql.Tx) error {
// 1. Verify answer exists and is published
answer, err := getAnswer(tx, answerID)
if err != nil || answer.Status != "published" {
return ErrAnswerNotPublished
}
// 2. Update answer_link status
err = updateAnswerLink(tx, answerID, requestID, "confirmed", actorID)
if err != nil {
return err
}
// 3. Broadcast (idempotent)
err = BroadcastAnswer(ctx, tx, answerID)
if err != nil {
return err
}
return nil
})
}
```
**Atomic operation 2: Publish Answer**
```go
func PublishAnswer(ctx context.Context, answerID, actorID string) error {
return db.Transaction(func(tx *sql.Tx) error {
// 1. Update answer status
err := updateAnswerStatus(tx, answerID, "published", actorID)
if err != nil {
return err
}
// 2. Update entry stage to dataroom
err = updateEntryStage(tx, answerID, "dataroom")
if err != nil {
return err
}
// 3. Run retroactive matching (creates pending answer_links)
matches, err := MatchAnswerToRequests(ctx, answerID)
if err != nil {
return err
}
for _, m := range matches {
err = insertAnswerLink(tx, answerID, m.RequestID, "pending", m.Score)
if err != nil {
return err
}
}
// 4. Broadcast already-confirmed links (if any pre-existed)
err = BroadcastAnswer(ctx, tx, answerID)
if err != nil {
return err
}
return nil
})
}
```
### 8.3 Optimistic Locking
Add version column to prevent concurrent modifications:
```sql
ALTER TABLE entries ADD COLUMN version INTEGER NOT NULL DEFAULT 1;
```
```go
func updateAnswerStatus(tx *sql.Tx, answerID, status string, expectedVersion int) (int, error) {
result, err := tx.Exec(`
UPDATE entries
SET data = json_set(data, '$.status', ?),
version = version + 1,
updated_at = ?
WHERE entry_id = ? AND version = ?
`, status, time.Now().UnixMilli(), answerID, expectedVersion)
if err != nil {
return 0, err
}
rows, _ := result.RowsAffected()
if rows == 0 {
return 0, ErrConcurrentModification
}
return expectedVersion + 1, nil
}
```
---
## 9. SQLite Cosine Similarity & Qdrant Migration
### 9.1 Pure Go Cosine Similarity
SQLite doesn't have native vector operations. Compute in Go:
```go
// CosineSimilarity computes similarity between two vectors.
// Vectors must be same length. Returns value in [-1, 1].
func CosineSimilarity(a, b []float32) float32 {
if len(a) != len(b) {
panic("vector length mismatch")
}
var dotProduct, normA, normB float32
for i := range a {
dotProduct += a[i] * b[i]
normA += a[i] * a[i]
normB += b[i] * b[i]
}
if normA == 0 || normB == 0 {
return 0
}
return dotProduct / (float32(math.Sqrt(float64(normA))) * float32(math.Sqrt(float64(normB))))
}
// BatchCosineSimilarity computes query vs all candidates.
// Uses SIMD via Go compiler optimizations.
func BatchCosineSimilarity(query []float32, candidates [][]float32) []float32 {
scores := make([]float32, len(candidates))
// Pre-compute query norm
var queryNorm float32
for _, v := range query {
queryNorm += v * v
}
queryNorm = float32(math.Sqrt(float64(queryNorm)))
for i, candidate := range candidates {
var dot, candNorm float32
for j := range query {
dot += query[j] * candidate[j]
candNorm += candidate[j] * candidate[j]
}
candNorm = float32(math.Sqrt(float64(candNorm)))
if queryNorm == 0 || candNorm == 0 {
scores[i] = 0
} else {
scores[i] = dot / (queryNorm * candNorm)
}
}
return scores
}
```
### 9.2 Performance Characteristics (SQLite + Go)
| Embeddings | Load Time | Search Time | Memory |
|------------|-----------|-------------|--------|
| 1,000 | 50ms | 2ms | 3 MB |
| 10,000 | 500ms | 20ms | 30 MB |
| 100,000 | 5s | 200ms | 300 MB |
| 1,000,000 | 50s | 2s | 3 GB |
**Acceptable for MVP:** Most deals have <10,000 documents. Search under 100ms is fine.
### 9.3 Qdrant Migration Threshold
Migrate to Qdrant when:
1. **Embedding count > 100,000** search latency exceeds 200ms
2. **Memory pressure** embeddings consume >500MB RAM
3. **Multi-tenancy** — need isolated collections per client (compliance)
### 9.4 Qdrant Integration (Future)
```go
type VectorStore interface {
Upsert(id string, vector []float32, metadata map[string]any) error
Search(query []float32, filter map[string]any, limit int) ([]SearchResult, error)
Delete(id string) error
}
// SQLiteVectorStore implements VectorStore using embeddings table
type SQLiteVectorStore struct { ... }
// QdrantVectorStore implements VectorStore using Qdrant API
type QdrantVectorStore struct { ... }
```
Abstract behind interface now; swap implementation later without code changes.
### 9.5 Hybrid Mode (Transition)
During migration:
1. Write to both SQLite and Qdrant
2. Read from Qdrant (with SQLite fallback)
3. Validate results match for first 1000 queries
4. Drop SQLite embeddings table after validation
---
## 10. Privacy: Plaintext Only, Never Files
### 10.1 Embedding Content Policy
**ALLOWED to embed:**
- Request title
- Request body text
- Answer title
- Answer body text (the explanation, not file contents)
- Workstream name (context)
**NEVER embed:**
- File contents (PDF, DOCX, XLSX, images)
- File names (may contain deal names, party names)
- Internal comments
- Routing/assignment metadata
- User names or email addresses
### 10.2 Why No File Embedding?
1. **Privacy:** M&A documents contain material non-public information. Sending to ANY external API (even zero-retention) creates compliance risk.
2. **Size:** A single PDF may be 100+ pages. Embedding would require chunking, storage, and search across potentially millions of chunks. Overkill for request-matching.
3. **Semantic mismatch:** Request asks "audited financials for FY2024." The answer body says "Please find attached the FY2024 audited financial statements." The body text + title is sufficient for matching — we don't need to embed page 47 of the PDF.
### 10.3 Future: On-Premise OCR + Embedding
If file-level search becomes required:
1. Run OCR on-premise (GLM-OCR on forge, not external API)
2. Store extracted text in `entry.data` (encrypted at rest)
3. Embed extracted text (still goes to Fireworks, but it's our extracted text, not raw file)
This is out of scope for MVP.
### 10.4 Audit Trail
Log every embedding request for compliance:
```sql
CREATE TABLE embed_audit (
id TEXT PRIMARY KEY,
entry_id TEXT NOT NULL,
text_hash TEXT NOT NULL, -- SHA-256 of text sent
text_len INTEGER NOT NULL, -- character count
model TEXT NOT NULL,
requested_at INTEGER NOT NULL,
latency_ms INTEGER,
success INTEGER NOT NULL
);
```
**Do NOT log the actual text** — that defeats the privacy purpose. Log the hash for correlation if needed.
---
## 11. lib/embed.go — Function Signatures
### 11.1 Public API
```go
package lib
import (
"context"
)
// EmbedConfig holds embedding service configuration.
type EmbedConfig struct {
APIKey string // FIREWORKS_API_KEY
Endpoint string // defaults to Fireworks endpoint
Model string // defaults to nomic-embed-text-v1.5
Timeout time.Duration
MaxRetries int
}
// EmbedResult contains the embedding and metadata.
type EmbedResult struct {
Vector []float32
TextHash string // SHA-256 of input text
Model string
TokenCount int
LatencyMs int64
}
// MatchResult represents a potential match with score.
type MatchResult struct {
EntryID string
ChunkIdx int
Score float32
EntryType string // "request" | "answer"
}
// Embed generates an embedding for the given text.
// Returns ErrTextTooLong if text exceeds model context.
// Returns ErrEmptyText if text is empty or whitespace only.
func Embed(ctx context.Context, cfg *EmbedConfig, text string) (*EmbedResult, error)
// EmbedBatch generates embeddings for multiple texts.
// More efficient than calling Embed in a loop (single API call).
// Max 100 texts per batch.
func EmbedBatch(ctx context.Context, cfg *EmbedConfig, texts []string) ([]*EmbedResult, error)
// EmbedRequest creates and stores embedding for a request entry.
// Idempotent: skips if embedding exists and text_hash matches.
func EmbedRequest(ctx context.Context, db *sql.DB, cfg *EmbedConfig, requestID string) error
// EmbedAnswer creates and stores embedding(s) for an answer entry.
// May produce multiple chunks for long answers.
// Idempotent: skips chunks where text_hash matches.
func EmbedAnswer(ctx context.Context, db *sql.DB, cfg *EmbedConfig, answerID string) error
// MatchRequestToAnswers finds published answers matching the request.
// Returns matches above the workstream's configured threshold.
// Respects cross-workstream config and RBAC.
func MatchRequestToAnswers(ctx context.Context, db *sql.DB, actorID, requestID string) ([]MatchResult, error)
// MatchAnswerToRequests finds open requests matching the answer.
// Returns matches above the workstream's configured threshold.
// Respects cross-workstream config and RBAC.
func MatchAnswerToRequests(ctx context.Context, db *sql.DB, actorID, answerID string) ([]MatchResult, error)
// FindDuplicateRequests finds existing requests similar to the given text.
// Used for deduplication suggestions before/after submission.
func FindDuplicateRequests(ctx context.Context, db *sql.DB, actorID, workstreamID, text string) ([]MatchResult, error)
// CosineSimilarity computes similarity between two vectors.
func CosineSimilarity(a, b []float32) float32
// DeleteEmbeddings removes all embeddings for an entry.
// Called when entry is deleted.
func DeleteEmbeddings(ctx context.Context, db *sql.DB, entryID string) error
// RefreshEmbedding re-embeds an entry if content changed.
// Compares text_hash to detect changes.
// Returns true if embedding was updated.
func RefreshEmbedding(ctx context.Context, db *sql.DB, cfg *EmbedConfig, entryID string) (bool, error)
```
### 11.2 Async Embedding on Publish
Embedding should not block the user action. Use async processing:
```go
// EmbedQueue is a background worker that processes embedding requests.
type EmbedQueue struct {
cfg *EmbedConfig
db *sql.DB
queue chan embedJob
wg sync.WaitGroup
}
type embedJob struct {
EntryID string
EntryType string // "request" | "answer"
Priority int // 0 = normal, 1 = high (new request needs matching)
}
// Start begins processing the embedding queue.
// Workers defaults to 2 (Fireworks rate limit friendly).
func (q *EmbedQueue) Start(workers int)
// Stop gracefully shuts down the queue.
func (q *EmbedQueue) Stop()
// Enqueue adds an entry for embedding.
// Non-blocking; returns immediately.
func (q *EmbedQueue) Enqueue(entryID, entryType string, priority int)
```
### 11.3 Integration Points
**On Request Create:**
```go
func HandleCreateRequest(w http.ResponseWriter, r *http.Request) {
// ... validation, RBAC, insert entry ...
// Queue embedding (non-blocking)
embedQueue.Enqueue(request.ID, "request", 1) // high priority
// Return success immediately
respondJSON(w, request)
}
```
**On Answer Publish:**
```go
func HandlePublishAnswer(w http.ResponseWriter, r *http.Request) {
err := db.Transaction(func(tx *sql.Tx) error {
// ... update status, stage ...
// Embedding happens inline for matching (within transaction timeout)
err := EmbedAnswer(ctx, tx, cfg, answer.ID)
if err != nil {
// Log but don't fail — matching can happen later
log.Warn("embedding failed, will retry", "error", err)
}
// Match and create answer_links
matches, _ := MatchAnswerToRequests(ctx, tx, actorID, answer.ID)
for _, m := range matches {
insertAnswerLink(tx, answer.ID, m.EntryID, "pending", m.Score)
}
// Broadcast confirmed links
return BroadcastAnswer(ctx, tx, answer.ID)
})
respondJSON(w, answer)
}
```
### 11.4 Error Handling
```go
var (
ErrTextTooLong = errors.New("text exceeds model context limit")
ErrEmptyText = errors.New("text is empty or whitespace only")
ErrEmbeddingFailed = errors.New("embedding API call failed")
ErrRateLimited = errors.New("embedding API rate limited")
ErrNoEmbedding = errors.New("entry has no embedding")
ErrWorkstreamConfig = errors.New("workstream missing match configuration")
)
```
Retry policy for transient errors:
- `ErrRateLimited`: exponential backoff, max 3 retries
- `ErrEmbeddingFailed`: retry once after 1s
- All others: fail immediately
---
## 12. answer_links Table (Updated)
Incorporates SPEC-REVIEW feedback on rejection tracking:
```sql
CREATE TABLE answer_links (
answer_id TEXT NOT NULL REFERENCES entries(entry_id),
request_id TEXT NOT NULL REFERENCES entries(entry_id),
-- Who created the link
linked_by TEXT NOT NULL,
linked_at INTEGER NOT NULL,
-- AI matching metadata
ai_score REAL, -- cosine similarity at time of match
ai_model TEXT, -- model used for matching
-- Review status
status TEXT NOT NULL DEFAULT 'pending',
-- 'pending': AI suggested, awaiting human review
-- 'confirmed': IB confirmed the match
-- 'rejected': IB rejected the match
-- 'self_confirmed': requester accepted dedup suggestion
-- 'rejected_by_requester': requester declined dedup suggestion
reviewed_by TEXT, -- who reviewed (if status != pending)
reviewed_at INTEGER, -- when reviewed
reject_reason TEXT, -- why rejected (if status = rejected)
PRIMARY KEY (answer_id, request_id)
);
CREATE INDEX idx_links_answer ON answer_links(answer_id);
CREATE INDEX idx_links_request ON answer_links(request_id);
CREATE INDEX idx_links_status ON answer_links(status);
```
---
## 13. Summary: What Gets Built
| Component | Location | Purpose |
|-----------|----------|---------|
| `lib/embed.go` | Core embedding logic | API calls, similarity, storage |
| `embeddings` table | Schema | Vector storage |
| `broadcasts` table | Schema | Idempotency |
| `answer_links` | Schema update | Status + rejection tracking |
| `embed_audit` table | Schema | Compliance logging |
| `EmbedQueue` | Background worker | Async processing |
| Workstream config | Entry.Data | Per-workstream thresholds |
**Not built (future):**
- Qdrant integration (interface defined, impl deferred)
- File content embedding (privacy: out of scope)
- Auto-confirm (threshold defined, feature disabled for MVP)
---
*This document extends SPEC.md. If conflicts exist, discuss before implementing.*