clavitor/clavitor.ai/admin/sync.go

581 lines
17 KiB
Go

// Clavitor Generic Dual-Write Abstraction Layer
// JSON in → writes to our DB + Paddle API automatically
package main
import (
"bytes"
"database/sql"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"strings"
"time"
)
// isProductionServer checks if we're running on the production clavitor.ai server
func isProductionServer() bool {
prodIPs, err := net.LookupIP("clavitor.ai")
if err != nil || len(prodIPs) == 0 {
return false
}
addrs, err := net.InterfaceAddrs()
if err != nil {
return false
}
for _, addr := range addrs {
if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
for _, prodIP := range prodIPs {
if ipnet.IP.Equal(prodIP) {
return true
}
}
}
}
}
return false
}
// EntityMap defines how each entity maps between our DB and Paddle
type EntityMap struct {
PaddleEndpoint string // e.g., "/customers"
IDPrefix string // e.g., "ctm"
DBTable string // e.g., "customers"
FieldMap map[string]string // our_field → paddle_field (if different)
RequiredFields []string // Must be present in input
}
// Registry of all known entities
var EntityRegistry = map[string]EntityMap{
"customers": {
PaddleEndpoint: "/customers",
IDPrefix: "ctm",
DBTable: "customers",
FieldMap: map[string]string{
"id": "", // we generate this
"paddle_id": "id", // store Paddle's ID separately
"created_at": "created_at",
"updated_at": "updated_at",
},
RequiredFields: []string{"email"},
},
"addresses": {
PaddleEndpoint: "/customers/{customer_id}/addresses",
IDPrefix: "add",
DBTable: "addresses",
FieldMap: map[string]string{},
RequiredFields: []string{"customer_id", "country_code"},
},
"businesses": {
PaddleEndpoint: "/customers/{customer_id}/businesses",
IDPrefix: "biz",
DBTable: "businesses",
FieldMap: map[string]string{},
RequiredFields: []string{"customer_id", "name"},
},
"subscriptions": {
PaddleEndpoint: "/subscriptions",
IDPrefix: "sub",
DBTable: "subscriptions",
FieldMap: map[string]string{
"id": "",
"paddle_id": "id",
"created_at": "created_at",
"updated_at": "updated_at",
},
RequiredFields: []string{"customer_id", "address_id", "items"},
},
"transactions": {
PaddleEndpoint: "/transactions",
IDPrefix: "txn",
DBTable: "transactions",
FieldMap: map[string]string{},
RequiredFields: []string{"customer_id", "address_id", "items"},
},
"discounts": {
PaddleEndpoint: "/discounts",
IDPrefix: "dsc",
DBTable: "discounts",
FieldMap: map[string]string{
"id": "",
"paddle_id": "id",
"created_at": "created_at",
"updated_at": "updated_at",
},
RequiredFields: []string{"description", "type", "amount"},
},
}
type SyncLayer struct {
apiKey string
baseURL string
db *sql.DB
httpClient *http.Client
}
// Default API key - in production load from env
// For sandbox: get from https://sandbox-vendors.paddle.com/authentication
const DefaultAPIKey = "pdl_sdbx_apikey_01knegw36v6cvybp2y5652xpmq_weT2XzhV6Qk0rGEYDY0V5X_Aig"
// NewSyncLayer creates a sync layer with auto-detected sandbox mode
// Uses isProductionServer() from main.go to determine environment
func NewSyncLayer(apiKey string, db *sql.DB) *SyncLayer {
if apiKey == "" {
apiKey = DefaultAPIKey
}
// Auto-detect: if not on production server, use sandbox
sandbox := !isProductionServer()
if sandbox {
fmt.Println("Using PADDLE SANDBOX environment")
} else {
fmt.Println("Using PADDLE LIVE environment")
}
baseURL := "https://api.paddle.com"
if sandbox {
baseURL = "https://sandbox-api.paddle.com"
}
return &SyncLayer{
apiKey: apiKey,
baseURL: baseURL,
db: db,
httpClient: &http.Client{Timeout: 30 * time.Second},
}
}
// SyncResult from dual-write operation
type SyncResult struct {
Success bool
OurID string // Our internal ID (generated)
PaddleID string // Paddle's returned ID
Entity string // Entity type
Data map[string]interface{} // Final merged data
DBError error
PaddleError error
}
// Create syncs a new entity to both our DB and Paddle
func (s *SyncLayer) Create(entity string, data map[string]interface{}) SyncResult {
mapping, ok := EntityRegistry[entity]
if !ok {
return SyncResult{Success: false, Entity: entity, DBError: fmt.Errorf("unknown entity: %s", entity)}
}
// 1. Validate required fields
for _, field := range mapping.RequiredFields {
if _, ok := data[field]; !ok {
return SyncResult{Success: false, Entity: entity, DBError: fmt.Errorf("missing required field: %s", field)}
}
}
// 2. Generate our internal ID
ourID := fmt.Sprintf("%s_%d", mapping.IDPrefix, time.Now().UnixNano())
data["id"] = ourID
data["created_at"] = time.Now().Unix()
data["updated_at"] = time.Now().Unix()
data["sync_status"] = "pending"
data["pending_since"] = time.Now().Unix() // Track for retry logic
// 3. Build Paddle payload (map our fields to Paddle's if needed)
paddlePayload := s.toPaddlePayload(data, mapping.FieldMap)
// 4. Write to our DB first (always local first)
if err := s.insertToDB(mapping.DBTable, data); err != nil {
return SyncResult{Success: false, Entity: entity, OurID: ourID, DBError: err}
}
// 5. Determine endpoint (handle path params like {customer_id})
endpoint := s.resolveEndpoint(mapping.PaddleEndpoint, data)
// 6. Call Paddle API
paddleResp, err := s.callPaddle("POST", endpoint, paddlePayload)
if err != nil {
// Update DB with error, will retry later
s.db.Exec(fmt.Sprintf("UPDATE %s SET sync_status = 'error', sync_error = ? WHERE id = ?", mapping.DBTable),
err.Error(), ourID)
return SyncResult{Success: false, Entity: entity, OurID: ourID, PaddleError: err}
}
// 7. Extract Paddle ID from response
paddleID := s.extractID(paddleResp)
if paddleID == "" {
// Strange - Paddle didn't return ID
s.db.Exec(fmt.Sprintf("UPDATE %s SET sync_status = 'error', sync_error = ? WHERE id = ?", mapping.DBTable),
"paddle did not return id", ourID)
return SyncResult{Success: false, Entity: entity, OurID: ourID, PaddleError: fmt.Errorf("no id in paddle response")}
}
// 8. Update our DB with Paddle ID and synced status
_, dbErr := s.db.Exec(fmt.Sprintf("UPDATE %s SET paddle_id = ?, sync_status = 'synced', sync_error = NULL WHERE id = ?", mapping.DBTable),
paddleID, ourID)
if dbErr != nil {
// This is the worst case - Paddle created but we lost the ID
return SyncResult{Success: false, Entity: entity, OurID: ourID, PaddleID: paddleID,
DBError: fmt.Errorf("paddle created (%s) but DB update failed: %v", paddleID, dbErr)}
}
// 9. Merge Paddle's response data back into our record (includes IDs, timestamps, computed fields)
mergedData := s.mergePaddleResponse(data, paddleResp, mapping.FieldMap)
data["paddle_id"] = paddleID
data["sync_status"] = "synced"
data["last_paddle_sync_at"] = time.Now().Unix()
data["pending_since"] = nil // Clear pending since we're synced
// 10. Update our DB with merged data (Paddle may have added/modified fields)
if err := s.updateDBWithMergedData(mapping.DBTable, ourID, mergedData); err != nil {
// Log but don't fail - we have the core data
fmt.Printf("Warning: failed to update merged data for %s: %v\n", ourID, err)
}
return SyncResult{Success: true, Entity: entity, OurID: ourID, PaddleID: paddleID, Data: mergedData}
}
// Update syncs changes to both systems
func (s *SyncLayer) Update(entity string, ourID string, data map[string]interface{}) SyncResult {
mapping := EntityRegistry[entity]
// Get current record to find Paddle ID
var paddleID string
row := s.db.QueryRow(fmt.Sprintf("SELECT paddle_id FROM %s WHERE id = ?", mapping.DBTable), ourID)
row.Scan(&paddleID)
if paddleID == "" {
return SyncResult{Success: false, Entity: entity, OurID: ourID,
PaddleError: fmt.Errorf("no paddle_id found, cannot update")}
}
// Update our DB
data["updated_at"] = time.Now().Unix()
if err := s.updateDB(mapping.DBTable, ourID, data); err != nil {
return SyncResult{Success: false, Entity: entity, OurID: ourID, DBError: err}
}
// Update Paddle
paddlePayload := s.toPaddlePayload(data, mapping.FieldMap)
endpoint := mapping.PaddleEndpoint + "/" + paddleID
_, err := s.callPaddle("PATCH", endpoint, paddlePayload)
if err != nil {
return SyncResult{Success: false, Entity: entity, OurID: ourID, PaddleID: paddleID, PaddleError: err}
}
return SyncResult{Success: true, Entity: entity, OurID: ourID, PaddleID: paddleID}
}
// RetryAll retries all pending/error syncs that are stale (> 5 minutes)
func (s *SyncLayer) RetryAll() {
for entity := range EntityRegistry {
// Only retry items pending for more than 5 minutes
ids, err := s.FindStalePending(entity, 5)
if err != nil {
continue
}
for _, id := range ids {
s.retryEntity(entity, id)
}
}
}
// RetryImmediate retries a specific entity immediately (for manual retry)
func (s *SyncLayer) RetryImmediate(entity, ourID string) {
// Reset status to trigger retry
s.db.Exec(fmt.Sprintf("UPDATE %s SET sync_status = 'pending', pending_since = ? WHERE id = ?", EntityRegistry[entity].DBTable),
time.Now().Unix(), ourID)
// Use the regular retry
s.retryEntity(entity, ourID)
}
// Private helpers
func (s *SyncLayer) insertToDB(table string, data map[string]interface{}) error {
columns := []string{}
placeholders := []string{}
values := []interface{}{}
for col, val := range data {
columns = append(columns, col)
placeholders = append(placeholders, "?")
values = append(values, val)
}
sql := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)",
table,
strings.Join(columns, ", "),
strings.Join(placeholders, ", "))
_, err := s.db.Exec(sql, values...)
return err
}
func (s *SyncLayer) updateDB(table string, id string, data map[string]interface{}) error {
sets := []string{}
values := []interface{}{}
for col, val := range data {
sets = append(sets, fmt.Sprintf("%s = ?", col))
values = append(values, val)
}
values = append(values, id)
sql := fmt.Sprintf("UPDATE %s SET %s WHERE id = ?",
table,
strings.Join(sets, ", "))
_, err := s.db.Exec(sql, values...)
return err
}
func (s *SyncLayer) toPaddlePayload(data map[string]interface{}, fieldMap map[string]string) map[string]interface{} {
result := make(map[string]interface{})
for ourField, val := range data {
// Skip internal fields
if ourField == "id" || ourField == "paddle_id" || ourField == "sync_status" || ourField == "sync_error" {
continue
}
if ourField == "created_at" || ourField == "updated_at" || ourField == "pending_since" {
continue
}
if ourField == "last_paddle_sync_at" || ourField == "paddle_event_id" || ourField == "internal_notes" {
continue
}
// Map field name if needed
paddleField := ourField
if mapped, ok := fieldMap[ourField]; ok && mapped != "" {
paddleField = mapped
}
result[paddleField] = val
}
return result
}
func (s *SyncLayer) resolveEndpoint(endpoint string, data map[string]interface{}) string {
// Replace {customer_id} etc. with actual values
result := endpoint
for key, val := range data {
placeholder := "{" + key + "}"
if strVal, ok := val.(string); ok {
result = strings.ReplaceAll(result, placeholder, strVal)
}
}
return result
}
func (s *SyncLayer) callPaddle(method, endpoint string, payload map[string]interface{}) (map[string]interface{}, error) {
url := s.baseURL + endpoint
body, _ := json.Marshal(payload)
req, err := http.NewRequest(method, url, bytes.NewBuffer(body))
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+s.apiKey)
req.Header.Set("Content-Type", "application/json")
resp, err := s.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
respBody, _ := io.ReadAll(resp.Body)
if resp.StatusCode >= 400 {
return nil, fmt.Errorf("paddle %s %d: %s", method, resp.StatusCode, string(respBody))
}
var result map[string]interface{}
json.Unmarshal(respBody, &result)
return result, nil
}
func (s *SyncLayer) extractID(response map[string]interface{}) string {
// Paddle wraps response in "data": { "id": "..." }
if data, ok := response["data"].(map[string]interface{}); ok {
if id, ok := data["id"].(string); ok {
return id
}
}
return ""
}
func (s *SyncLayer) retryEntity(entity, ourID string) {
// Simplified: just mark for retry by resetting status
mapping := EntityRegistry[entity]
s.db.Exec(fmt.Sprintf("UPDATE %s SET sync_status = 'pending' WHERE id = ?", mapping.DBTable), ourID)
}
// mergePaddleResponse merges Paddle's response data with our original data
// Maps Paddle field names back to our field names using reverse mapping
func (s *SyncLayer) mergePaddleResponse(ourData map[string]interface{}, paddleResp map[string]interface{}, fieldMap map[string]string) map[string]interface{} {
result := make(map[string]interface{})
// Start with our original data
for k, v := range ourData {
result[k] = v
}
// Extract Paddle's data object
var paddleData map[string]interface{}
if data, ok := paddleResp["data"].(map[string]interface{}); ok {
paddleData = data
} else {
paddleData = paddleResp // Some responses aren't wrapped
}
// Build reverse field map (paddle_field → our_field)
reverseMap := make(map[string]string)
for ourField, paddleField := range fieldMap {
if paddleField != "" {
reverseMap[paddleField] = ourField
}
}
// Merge Paddle fields, mapping names back to ours
for paddleField, val := range paddleData {
ourField := paddleField
if mapped, ok := reverseMap[paddleField]; ok {
ourField = mapped
}
// Don't overwrite our internal fields
if ourField == "id" || ourField == "sync_status" || ourField == "sync_error" {
continue
}
result[ourField] = val
}
return result
}
// updateDBWithMergedData updates our DB record with merged data from Paddle
func (s *SyncLayer) updateDBWithMergedData(table, ourID string, mergedData map[string]interface{}) error {
sets := []string{}
values := []interface{}{}
for col, val := range mergedData {
// Skip the ID field itself
if col == "id" {
continue
}
sets = append(sets, fmt.Sprintf("%s = ?", col))
values = append(values, val)
}
values = append(values, ourID)
sql := fmt.Sprintf("UPDATE %s SET %s WHERE id = ?",
table,
strings.Join(sets, ", "))
_, err := s.db.Exec(sql, values...)
return err
}
// HandleWebhook processes inbound updates from Paddle (subscriptions, status changes, etc.)
// Updates our DB to match Paddle's state
func (s *SyncLayer) HandleWebhook(entity string, paddleData map[string]interface{}) error {
mapping, ok := EntityRegistry[entity]
if !ok {
return fmt.Errorf("unknown entity for webhook: %s", entity)
}
// Extract Paddle ID
paddleID, ok := paddleData["id"].(string)
if !ok {
return fmt.Errorf("no id in webhook data")
}
// Extract event ID if present (for idempotency)
eventID, _ := paddleData["event_id"].(string)
// Find our record by Paddle ID
var ourID string
var existingEventID string
err := s.db.QueryRow(
fmt.Sprintf("SELECT id, paddle_event_id FROM %s WHERE paddle_id = ?", mapping.DBTable),
paddleID,
).Scan(&ourID, &existingEventID)
if err != nil {
return fmt.Errorf("no local record for paddle %s: %v", paddleID, err)
}
// Idempotency check: skip if we already processed this event
if eventID != "" && existingEventID == eventID {
return nil // Already processed
}
// Map Paddle fields to our fields
ourData := make(map[string]interface{})
for paddleField, val := range paddleData {
ourField := paddleField
// Check if this Paddle field maps to one of ours
for ourF, paddleF := range mapping.FieldMap {
if paddleF == paddleField && paddleF != "" {
ourField = ourF
break
}
}
// Don't overwrite internal fields
if ourField == "id" || ourField == "paddle_id" {
continue
}
ourData[ourField] = val
}
// Update our record with webhook data
ourData["updated_at"] = time.Now().Unix()
ourData["sync_status"] = "synced" // Webhook confirms sync
ourData["sync_error"] = nil
ourData["last_paddle_sync_at"] = time.Now().Unix()
ourData["pending_since"] = nil // Clear pending
if eventID != "" {
ourData["paddle_event_id"] = eventID
}
return s.updateDBWithMergedData(mapping.DBTable, ourID, ourData)
}
// FindStalePending finds records pending for more than X minutes
func (s *SyncLayer) FindStalePending(entity string, minutes int) ([]string, error) {
mapping, ok := EntityRegistry[entity]
if !ok {
return nil, fmt.Errorf("unknown entity: %s", entity)
}
cutoff := time.Now().Add(-time.Duration(minutes) * time.Minute).Unix()
rows, err := s.db.Query(
fmt.Sprintf("SELECT id FROM %s WHERE sync_status = 'pending' AND pending_since < ?", mapping.DBTable),
cutoff,
)
if err != nil {
return nil, err
}
defer rows.Close()
var ids []string
for rows.Next() {
var id string
rows.Scan(&id)
ids = append(ids, id)
}
return ids, nil
}