// Clavitor Generic Dual-Write Abstraction Layer // JSON in → writes to our DB + Paddle API automatically package main import ( "bytes" "database/sql" "encoding/json" "fmt" "io" "net" "net/http" "strings" "time" ) // isProductionServer checks if we're running on the production clavitor.ai server func isProductionServer() bool { prodIPs, err := net.LookupIP("clavitor.ai") if err != nil || len(prodIPs) == 0 { return false } addrs, err := net.InterfaceAddrs() if err != nil { return false } for _, addr := range addrs { if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { if ipnet.IP.To4() != nil { for _, prodIP := range prodIPs { if ipnet.IP.Equal(prodIP) { return true } } } } } return false } // EntityMap defines how each entity maps between our DB and Paddle type EntityMap struct { PaddleEndpoint string // e.g., "/customers" IDPrefix string // e.g., "ctm" DBTable string // e.g., "customers" FieldMap map[string]string // our_field → paddle_field (if different) RequiredFields []string // Must be present in input } // Registry of all known entities var EntityRegistry = map[string]EntityMap{ "customers": { PaddleEndpoint: "/customers", IDPrefix: "ctm", DBTable: "customers", FieldMap: map[string]string{ "id": "", // we generate this "paddle_id": "id", // store Paddle's ID separately "created_at": "created_at", "updated_at": "updated_at", }, RequiredFields: []string{"email"}, }, "addresses": { PaddleEndpoint: "/customers/{customer_id}/addresses", IDPrefix: "add", DBTable: "addresses", FieldMap: map[string]string{}, RequiredFields: []string{"customer_id", "country_code"}, }, "businesses": { PaddleEndpoint: "/customers/{customer_id}/businesses", IDPrefix: "biz", DBTable: "businesses", FieldMap: map[string]string{}, RequiredFields: []string{"customer_id", "name"}, }, "subscriptions": { PaddleEndpoint: "/subscriptions", IDPrefix: "sub", DBTable: "subscriptions", FieldMap: map[string]string{ "id": "", "paddle_id": "id", "created_at": "created_at", "updated_at": "updated_at", }, RequiredFields: []string{"customer_id", "address_id", "items"}, }, "transactions": { PaddleEndpoint: "/transactions", IDPrefix: "txn", DBTable: "transactions", FieldMap: map[string]string{}, RequiredFields: []string{"customer_id", "address_id", "items"}, }, "discounts": { PaddleEndpoint: "/discounts", IDPrefix: "dsc", DBTable: "discounts", FieldMap: map[string]string{ "id": "", "paddle_id": "id", "created_at": "created_at", "updated_at": "updated_at", }, RequiredFields: []string{"description", "type", "amount"}, }, } type SyncLayer struct { apiKey string baseURL string db *sql.DB httpClient *http.Client } // Default API key - in production load from env // For sandbox: get from https://sandbox-vendors.paddle.com/authentication const DefaultAPIKey = "pdl_sdbx_apikey_01knegw36v6cvybp2y5652xpmq_weT2XzhV6Qk0rGEYDY0V5X_Aig" // NewSyncLayer creates a sync layer with auto-detected sandbox mode // Uses isProductionServer() from main.go to determine environment func NewSyncLayer(apiKey string, db *sql.DB) *SyncLayer { if apiKey == "" { apiKey = DefaultAPIKey } // Auto-detect: if not on production server, use sandbox sandbox := !isProductionServer() if sandbox { fmt.Println("Using PADDLE SANDBOX environment") } else { fmt.Println("Using PADDLE LIVE environment") } baseURL := "https://api.paddle.com" if sandbox { baseURL = "https://sandbox-api.paddle.com" } return &SyncLayer{ apiKey: apiKey, baseURL: baseURL, db: db, httpClient: &http.Client{Timeout: 30 * time.Second}, } } // SyncResult from dual-write operation type SyncResult struct { Success bool OurID string // Our internal ID (generated) PaddleID string // Paddle's returned ID Entity string // Entity type Data map[string]interface{} // Final merged data DBError error PaddleError error } // Create syncs a new entity to both our DB and Paddle func (s *SyncLayer) Create(entity string, data map[string]interface{}) SyncResult { mapping, ok := EntityRegistry[entity] if !ok { return SyncResult{Success: false, Entity: entity, DBError: fmt.Errorf("unknown entity: %s", entity)} } // 1. Validate required fields for _, field := range mapping.RequiredFields { if _, ok := data[field]; !ok { return SyncResult{Success: false, Entity: entity, DBError: fmt.Errorf("missing required field: %s", field)} } } // 2. Generate our internal ID ourID := fmt.Sprintf("%s_%d", mapping.IDPrefix, time.Now().UnixNano()) data["id"] = ourID data["created_at"] = time.Now().Unix() data["updated_at"] = time.Now().Unix() data["sync_status"] = "pending" data["pending_since"] = time.Now().Unix() // Track for retry logic // 3. Build Paddle payload (map our fields to Paddle's if needed) paddlePayload := s.toPaddlePayload(data, mapping.FieldMap) // 4. Write to our DB first (always local first) if err := s.insertToDB(mapping.DBTable, data); err != nil { return SyncResult{Success: false, Entity: entity, OurID: ourID, DBError: err} } // 5. Determine endpoint (handle path params like {customer_id}) endpoint := s.resolveEndpoint(mapping.PaddleEndpoint, data) // 6. Call Paddle API paddleResp, err := s.callPaddle("POST", endpoint, paddlePayload) if err != nil { // Update DB with error, will retry later s.db.Exec(fmt.Sprintf("UPDATE %s SET sync_status = 'error', sync_error = ? WHERE id = ?", mapping.DBTable), err.Error(), ourID) return SyncResult{Success: false, Entity: entity, OurID: ourID, PaddleError: err} } // 7. Extract Paddle ID from response paddleID := s.extractID(paddleResp) if paddleID == "" { // Strange - Paddle didn't return ID s.db.Exec(fmt.Sprintf("UPDATE %s SET sync_status = 'error', sync_error = ? WHERE id = ?", mapping.DBTable), "paddle did not return id", ourID) return SyncResult{Success: false, Entity: entity, OurID: ourID, PaddleError: fmt.Errorf("no id in paddle response")} } // 8. Update our DB with Paddle ID and synced status _, dbErr := s.db.Exec(fmt.Sprintf("UPDATE %s SET paddle_id = ?, sync_status = 'synced', sync_error = NULL WHERE id = ?", mapping.DBTable), paddleID, ourID) if dbErr != nil { // This is the worst case - Paddle created but we lost the ID return SyncResult{Success: false, Entity: entity, OurID: ourID, PaddleID: paddleID, DBError: fmt.Errorf("paddle created (%s) but DB update failed: %v", paddleID, dbErr)} } // 9. Merge Paddle's response data back into our record (includes IDs, timestamps, computed fields) mergedData := s.mergePaddleResponse(data, paddleResp, mapping.FieldMap) data["paddle_id"] = paddleID data["sync_status"] = "synced" data["last_paddle_sync_at"] = time.Now().Unix() data["pending_since"] = nil // Clear pending since we're synced // 10. Update our DB with merged data (Paddle may have added/modified fields) if err := s.updateDBWithMergedData(mapping.DBTable, ourID, mergedData); err != nil { // Log but don't fail - we have the core data fmt.Printf("Warning: failed to update merged data for %s: %v\n", ourID, err) } return SyncResult{Success: true, Entity: entity, OurID: ourID, PaddleID: paddleID, Data: mergedData} } // Update syncs changes to both systems func (s *SyncLayer) Update(entity string, ourID string, data map[string]interface{}) SyncResult { mapping := EntityRegistry[entity] // Get current record to find Paddle ID var paddleID string row := s.db.QueryRow(fmt.Sprintf("SELECT paddle_id FROM %s WHERE id = ?", mapping.DBTable), ourID) row.Scan(&paddleID) if paddleID == "" { return SyncResult{Success: false, Entity: entity, OurID: ourID, PaddleError: fmt.Errorf("no paddle_id found, cannot update")} } // Update our DB data["updated_at"] = time.Now().Unix() if err := s.updateDB(mapping.DBTable, ourID, data); err != nil { return SyncResult{Success: false, Entity: entity, OurID: ourID, DBError: err} } // Update Paddle paddlePayload := s.toPaddlePayload(data, mapping.FieldMap) endpoint := mapping.PaddleEndpoint + "/" + paddleID _, err := s.callPaddle("PATCH", endpoint, paddlePayload) if err != nil { return SyncResult{Success: false, Entity: entity, OurID: ourID, PaddleID: paddleID, PaddleError: err} } return SyncResult{Success: true, Entity: entity, OurID: ourID, PaddleID: paddleID} } // RetryAll retries all pending/error syncs that are stale (> 5 minutes) func (s *SyncLayer) RetryAll() { for entity := range EntityRegistry { // Only retry items pending for more than 5 minutes ids, err := s.FindStalePending(entity, 5) if err != nil { continue } for _, id := range ids { s.retryEntity(entity, id) } } } // RetryImmediate retries a specific entity immediately (for manual retry) func (s *SyncLayer) RetryImmediate(entity, ourID string) { // Reset status to trigger retry s.db.Exec(fmt.Sprintf("UPDATE %s SET sync_status = 'pending', pending_since = ? WHERE id = ?", EntityRegistry[entity].DBTable), time.Now().Unix(), ourID) // Use the regular retry s.retryEntity(entity, ourID) } // Private helpers func (s *SyncLayer) insertToDB(table string, data map[string]interface{}) error { columns := []string{} placeholders := []string{} values := []interface{}{} for col, val := range data { columns = append(columns, col) placeholders = append(placeholders, "?") values = append(values, val) } sql := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)", table, strings.Join(columns, ", "), strings.Join(placeholders, ", ")) _, err := s.db.Exec(sql, values...) return err } func (s *SyncLayer) updateDB(table string, id string, data map[string]interface{}) error { sets := []string{} values := []interface{}{} for col, val := range data { sets = append(sets, fmt.Sprintf("%s = ?", col)) values = append(values, val) } values = append(values, id) sql := fmt.Sprintf("UPDATE %s SET %s WHERE id = ?", table, strings.Join(sets, ", ")) _, err := s.db.Exec(sql, values...) return err } func (s *SyncLayer) toPaddlePayload(data map[string]interface{}, fieldMap map[string]string) map[string]interface{} { result := make(map[string]interface{}) for ourField, val := range data { // Skip internal fields if ourField == "id" || ourField == "paddle_id" || ourField == "sync_status" || ourField == "sync_error" { continue } if ourField == "created_at" || ourField == "updated_at" || ourField == "pending_since" { continue } if ourField == "last_paddle_sync_at" || ourField == "paddle_event_id" || ourField == "internal_notes" { continue } // Map field name if needed paddleField := ourField if mapped, ok := fieldMap[ourField]; ok && mapped != "" { paddleField = mapped } result[paddleField] = val } return result } func (s *SyncLayer) resolveEndpoint(endpoint string, data map[string]interface{}) string { // Replace {customer_id} etc. with actual values result := endpoint for key, val := range data { placeholder := "{" + key + "}" if strVal, ok := val.(string); ok { result = strings.ReplaceAll(result, placeholder, strVal) } } return result } func (s *SyncLayer) callPaddle(method, endpoint string, payload map[string]interface{}) (map[string]interface{}, error) { url := s.baseURL + endpoint body, _ := json.Marshal(payload) req, err := http.NewRequest(method, url, bytes.NewBuffer(body)) if err != nil { return nil, err } req.Header.Set("Authorization", "Bearer "+s.apiKey) req.Header.Set("Content-Type", "application/json") resp, err := s.httpClient.Do(req) if err != nil { return nil, err } defer resp.Body.Close() respBody, _ := io.ReadAll(resp.Body) if resp.StatusCode >= 400 { return nil, fmt.Errorf("paddle %s %d: %s", method, resp.StatusCode, string(respBody)) } var result map[string]interface{} json.Unmarshal(respBody, &result) return result, nil } func (s *SyncLayer) extractID(response map[string]interface{}) string { // Paddle wraps response in "data": { "id": "..." } if data, ok := response["data"].(map[string]interface{}); ok { if id, ok := data["id"].(string); ok { return id } } return "" } func (s *SyncLayer) retryEntity(entity, ourID string) { // Simplified: just mark for retry by resetting status mapping := EntityRegistry[entity] s.db.Exec(fmt.Sprintf("UPDATE %s SET sync_status = 'pending' WHERE id = ?", mapping.DBTable), ourID) } // mergePaddleResponse merges Paddle's response data with our original data // Maps Paddle field names back to our field names using reverse mapping func (s *SyncLayer) mergePaddleResponse(ourData map[string]interface{}, paddleResp map[string]interface{}, fieldMap map[string]string) map[string]interface{} { result := make(map[string]interface{}) // Start with our original data for k, v := range ourData { result[k] = v } // Extract Paddle's data object var paddleData map[string]interface{} if data, ok := paddleResp["data"].(map[string]interface{}); ok { paddleData = data } else { paddleData = paddleResp // Some responses aren't wrapped } // Build reverse field map (paddle_field → our_field) reverseMap := make(map[string]string) for ourField, paddleField := range fieldMap { if paddleField != "" { reverseMap[paddleField] = ourField } } // Merge Paddle fields, mapping names back to ours for paddleField, val := range paddleData { ourField := paddleField if mapped, ok := reverseMap[paddleField]; ok { ourField = mapped } // Don't overwrite our internal fields if ourField == "id" || ourField == "sync_status" || ourField == "sync_error" { continue } result[ourField] = val } return result } // updateDBWithMergedData updates our DB record with merged data from Paddle func (s *SyncLayer) updateDBWithMergedData(table, ourID string, mergedData map[string]interface{}) error { sets := []string{} values := []interface{}{} for col, val := range mergedData { // Skip the ID field itself if col == "id" { continue } sets = append(sets, fmt.Sprintf("%s = ?", col)) values = append(values, val) } values = append(values, ourID) sql := fmt.Sprintf("UPDATE %s SET %s WHERE id = ?", table, strings.Join(sets, ", ")) _, err := s.db.Exec(sql, values...) return err } // HandleWebhook processes inbound updates from Paddle (subscriptions, status changes, etc.) // Updates our DB to match Paddle's state func (s *SyncLayer) HandleWebhook(entity string, paddleData map[string]interface{}) error { mapping, ok := EntityRegistry[entity] if !ok { return fmt.Errorf("unknown entity for webhook: %s", entity) } // Extract Paddle ID paddleID, ok := paddleData["id"].(string) if !ok { return fmt.Errorf("no id in webhook data") } // Extract event ID if present (for idempotency) eventID, _ := paddleData["event_id"].(string) // Find our record by Paddle ID var ourID string var existingEventID string err := s.db.QueryRow( fmt.Sprintf("SELECT id, paddle_event_id FROM %s WHERE paddle_id = ?", mapping.DBTable), paddleID, ).Scan(&ourID, &existingEventID) if err != nil { return fmt.Errorf("no local record for paddle %s: %v", paddleID, err) } // Idempotency check: skip if we already processed this event if eventID != "" && existingEventID == eventID { return nil // Already processed } // Map Paddle fields to our fields ourData := make(map[string]interface{}) for paddleField, val := range paddleData { ourField := paddleField // Check if this Paddle field maps to one of ours for ourF, paddleF := range mapping.FieldMap { if paddleF == paddleField && paddleF != "" { ourField = ourF break } } // Don't overwrite internal fields if ourField == "id" || ourField == "paddle_id" { continue } ourData[ourField] = val } // Update our record with webhook data ourData["updated_at"] = time.Now().Unix() ourData["sync_status"] = "synced" // Webhook confirms sync ourData["sync_error"] = nil ourData["last_paddle_sync_at"] = time.Now().Unix() ourData["pending_since"] = nil // Clear pending if eventID != "" { ourData["paddle_event_id"] = eventID } return s.updateDBWithMergedData(mapping.DBTable, ourID, ourData) } // FindStalePending finds records pending for more than X minutes func (s *SyncLayer) FindStalePending(entity string, minutes int) ([]string, error) { mapping, ok := EntityRegistry[entity] if !ok { return nil, fmt.Errorf("unknown entity: %s", entity) } cutoff := time.Now().Add(-time.Duration(minutes) * time.Minute).Unix() rows, err := s.db.Query( fmt.Sprintf("SELECT id FROM %s WHERE sync_status = 'pending' AND pending_since < ?", mapping.DBTable), cutoff, ) if err != nil { return nil, err } defer rows.Close() var ids []string for rows.Next() { var id string rows.Scan(&id) ids = append(ids, id) } return ids, nil }