clavitor/clavitor.ai/tlw.go

310 lines
10 KiB
Go

// Two-Level Write (TLW) — writes to corporate.db AND Paddle in one call.
//
// NOTE: This is a minimal subset of admin/sync.go for the public web server's
// onboarding flow. Both files share the same corporate.db and Paddle API.
// When the public web server and admin server merge, this file goes away and
// admin/sync.go becomes the canonical implementation.
package main
import (
"bytes"
"database/sql"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"strings"
"time"
)
// corpDB is the connection to corporate.db (customers, addresses, subscriptions).
// It's separate from `db` (clavitor.db, which holds POPs/telemetry/etc).
var corpDB *sql.DB
// tlwSandbox is true when we should hit the Paddle sandbox API instead of live.
// Auto-detected from CLAVITOR_PADDLE_LIVE env var (default: sandbox).
var tlwSandbox = os.Getenv("CLAVITOR_PADDLE_LIVE") != "1"
// paddleAPIKey is loaded from PADDLE_API_KEY env var. Empty = TLW writes to DB
// only and skips Paddle (useful for local dev without sandbox credentials).
var paddleAPIKey = os.Getenv("PADDLE_API_KEY")
// initTLW opens corporate.db and configures the TLW.
// Refuses to create the file or the schema — both must already exist.
// If either is missing, returns an error and onboarding stays disabled.
// Called from main() after the regular db is opened.
func initTLW() error {
// `mode=rw` makes sqlite refuse to create a missing file (otherwise
// the driver would silently create an empty one — exactly the
// foot-gun we want to avoid).
if _, err := os.Stat("corporate.db"); err != nil {
return fmt.Errorf("corporate.db not found (must be provisioned externally): %w", err)
}
var err error
corpDB, err = sql.Open("sqlite3", "corporate.db?mode=rw&_busy_timeout=5000")
if err != nil {
return fmt.Errorf("open corporate.db: %w", err)
}
if err := corpDB.Ping(); err != nil {
return fmt.Errorf("ping corporate.db: %w", err)
}
// Verify the schema is present. We do not create it.
var hasCustomers int
corpDB.QueryRow(`SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='customers'`).Scan(&hasCustomers)
if hasCustomers == 0 {
corpDB.Close()
corpDB = nil
return fmt.Errorf("corporate.db is missing the customers table — schema must be applied externally")
}
if tlwSandbox {
log.Println("TLW: Paddle SANDBOX mode")
} else {
log.Println("TLW: Paddle LIVE mode")
}
if paddleAPIKey == "" {
log.Println("TLW: PADDLE_API_KEY not set — DB-only mode (Paddle calls skipped)")
}
return nil
}
// paddleBaseURL returns the API base URL based on sandbox mode.
func paddleBaseURL() string {
if tlwSandbox {
return "https://sandbox-api.paddle.com"
}
return "https://api.paddle.com"
}
// tlwResult is what every TLW Create call returns.
type tlwResult struct {
OurID string // Our internal ID (we generate this)
PaddleID string // Paddle's returned ID (empty if Paddle skipped or failed)
Synced bool // True if Paddle accepted the write
Err error // DB error or last Paddle error
}
// tlwCreateCustomer writes a new customer to corporate.db and Paddle.
// Required fields in data: email. Optional: name, locale, marketing_consent.
func tlwCreateCustomer(data map[string]interface{}) tlwResult {
if data["email"] == nil || data["email"] == "" {
return tlwResult{Err: fmt.Errorf("email required")}
}
now := time.Now().Unix()
ourID := fmt.Sprintf("ctm_%d", time.Now().UnixNano())
row := map[string]interface{}{
"id": ourID,
"email": data["email"],
"name": data["name"],
"locale": firstNonEmpty(data["locale"], "en"),
"status": "active",
"level": 1,
"created_at": now,
"updated_at": now,
"sync_status": "pending",
"pending_since": now,
}
if err := tlwInsert("customers", row); err != nil {
return tlwResult{OurID: ourID, Err: fmt.Errorf("db insert: %w", err)}
}
// Push to Paddle
if paddleAPIKey == "" {
return tlwResult{OurID: ourID, Synced: false}
}
payload := map[string]interface{}{
"email": data["email"],
"locale": row["locale"],
}
if data["name"] != nil && data["name"] != "" {
payload["name"] = data["name"]
}
resp, err := paddleCall("POST", "/customers", payload)
if err != nil {
corpDB.Exec(`UPDATE customers SET sync_status='error', sync_error=? WHERE id=?`, err.Error(), ourID)
return tlwResult{OurID: ourID, Err: err}
}
paddleID := extractPaddleID(resp)
corpDB.Exec(`UPDATE customers SET paddle_id=?, sync_status='synced', sync_error=NULL, last_paddle_sync_at=? WHERE id=?`,
paddleID, time.Now().Unix(), ourID)
return tlwResult{OurID: ourID, PaddleID: paddleID, Synced: true}
}
// tlwCreateAddress writes a billing address tied to a customer.
// Required: customer_id, country_code. Optional: city, region, zip_code, description.
func tlwCreateAddress(data map[string]interface{}) tlwResult {
if data["customer_id"] == nil || data["customer_id"] == "" {
return tlwResult{Err: fmt.Errorf("customer_id required")}
}
if data["country_code"] == nil || data["country_code"] == "" {
return tlwResult{Err: fmt.Errorf("country_code required")}
}
now := time.Now().Unix()
ourID := fmt.Sprintf("add_%d", time.Now().UnixNano())
row := map[string]interface{}{
"id": ourID,
"customer_id": data["customer_id"],
"description": data["description"],
"city": data["city"],
"region": data["region"],
"country_code": data["country_code"],
"zip_code": data["zip_code"],
"status": "active",
"created_at": now,
"updated_at": now,
"sync_status": "pending",
"pending_since": now,
}
if err := tlwInsert("addresses", row); err != nil {
return tlwResult{OurID: ourID, Err: fmt.Errorf("db insert: %w", err)}
}
if paddleAPIKey == "" {
return tlwResult{OurID: ourID, Synced: false}
}
// Need the Paddle customer ID to call /customers/{id}/addresses
var paddleCustomerID string
corpDB.QueryRow(`SELECT paddle_id FROM customers WHERE id = ?`, data["customer_id"]).Scan(&paddleCustomerID)
if paddleCustomerID == "" {
corpDB.Exec(`UPDATE addresses SET sync_status='error', sync_error=? WHERE id=?`,
"customer not synced to paddle yet", ourID)
return tlwResult{OurID: ourID, Err: fmt.Errorf("customer has no paddle_id yet")}
}
payload := map[string]interface{}{
"country_code": data["country_code"],
}
for _, k := range []string{"city", "region", "zip_code", "description"} {
if v, ok := data[k]; ok && v != "" {
payload[k] = v
}
}
resp, err := paddleCall("POST", "/customers/"+paddleCustomerID+"/addresses", payload)
if err != nil {
corpDB.Exec(`UPDATE addresses SET sync_status='error', sync_error=? WHERE id=?`, err.Error(), ourID)
return tlwResult{OurID: ourID, Err: err}
}
paddleID := extractPaddleID(resp)
corpDB.Exec(`UPDATE addresses SET paddle_id=?, sync_status='synced', sync_error=NULL, last_paddle_sync_at=? WHERE id=?`,
paddleID, time.Now().Unix(), ourID)
return tlwResult{OurID: ourID, PaddleID: paddleID, Synced: true}
}
// tlwCreateTransaction creates a Paddle transaction (used to generate a checkout URL).
// Returns the Paddle transaction ID and the hosted checkout URL.
// Unlike customer/address, transactions are not stored in corporate.db until the
// webhook fires — we just need the checkout URL to redirect the user.
func tlwCreateTransaction(paddleCustomerID, paddleAddressID, priceID string) (txnID, checkoutURL string, err error) {
if paddleAPIKey == "" {
// Dev/local mode: synthesize a placeholder
return "txn_local_dev", "/onboarding/done?dev=1", nil
}
payload := map[string]interface{}{
"customer_id": paddleCustomerID,
"address_id": paddleAddressID,
"items": []map[string]interface{}{
{"price_id": priceID, "quantity": 1},
},
"collection_mode": "automatic",
}
resp, err := paddleCall("POST", "/transactions", payload)
if err != nil {
return "", "", err
}
if data, ok := resp["data"].(map[string]interface{}); ok {
if id, ok := data["id"].(string); ok {
txnID = id
}
if cp, ok := data["checkout"].(map[string]interface{}); ok {
if u, ok := cp["url"].(string); ok {
checkoutURL = u
}
}
}
if txnID == "" {
return "", "", fmt.Errorf("paddle did not return transaction id")
}
return txnID, checkoutURL, nil
}
// findCustomerByEmail returns the local customer ID and Paddle ID (if any).
// Used to identify accounts by email — the primary lookup.
func findCustomerByEmail(email string) (ourID, paddleID string, found bool) {
row := corpDB.QueryRow(`SELECT id, COALESCE(paddle_id,'') FROM customers WHERE email = ? LIMIT 1`, email)
if err := row.Scan(&ourID, &paddleID); err != nil {
return "", "", false
}
return ourID, paddleID, true
}
// --- private helpers ---
func tlwInsert(table string, data map[string]interface{}) error {
cols := []string{}
placeholders := []string{}
values := []interface{}{}
for k, v := range data {
if v == nil {
continue
}
cols = append(cols, k)
placeholders = append(placeholders, "?")
values = append(values, v)
}
stmt := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)",
table, strings.Join(cols, ","), strings.Join(placeholders, ","))
_, err := corpDB.Exec(stmt, values...)
return err
}
func paddleCall(method, endpoint string, payload map[string]interface{}) (map[string]interface{}, error) {
url := paddleBaseURL() + endpoint
body, _ := json.Marshal(payload)
req, err := http.NewRequest(method, url, bytes.NewBuffer(body))
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+paddleAPIKey)
req.Header.Set("Content-Type", "application/json")
client := &http.Client{Timeout: 30 * time.Second}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
respBody, _ := io.ReadAll(resp.Body)
if resp.StatusCode >= 400 {
return nil, fmt.Errorf("paddle %s %s -> %d: %s", method, endpoint, resp.StatusCode, string(respBody))
}
var result map[string]interface{}
json.Unmarshal(respBody, &result)
return result, nil
}
func extractPaddleID(resp map[string]interface{}) string {
if data, ok := resp["data"].(map[string]interface{}); ok {
if id, ok := data["id"].(string); ok {
return id
}
}
return ""
}
func firstNonEmpty(vals ...interface{}) interface{} {
for _, v := range vals {
if v != nil && v != "" {
return v
}
}
return ""
}