581 lines
17 KiB
Go
581 lines
17 KiB
Go
// Clavitor Generic Dual-Write Abstraction Layer
|
|
// JSON in → writes to our DB + Paddle API automatically
|
|
|
|
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
// isProductionServer checks if we're running on the production clavitor.ai server
|
|
func isProductionServer() bool {
|
|
prodIPs, err := net.LookupIP("clavitor.ai")
|
|
if err != nil || len(prodIPs) == 0 {
|
|
return false
|
|
}
|
|
|
|
addrs, err := net.InterfaceAddrs()
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
for _, addr := range addrs {
|
|
if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
|
|
if ipnet.IP.To4() != nil {
|
|
for _, prodIP := range prodIPs {
|
|
if ipnet.IP.Equal(prodIP) {
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// EntityMap defines how each entity maps between our DB and Paddle
|
|
type EntityMap struct {
|
|
PaddleEndpoint string // e.g., "/customers"
|
|
IDPrefix string // e.g., "ctm"
|
|
DBTable string // e.g., "customers"
|
|
FieldMap map[string]string // our_field → paddle_field (if different)
|
|
RequiredFields []string // Must be present in input
|
|
}
|
|
|
|
// Registry of all known entities
|
|
var EntityRegistry = map[string]EntityMap{
|
|
"customers": {
|
|
PaddleEndpoint: "/customers",
|
|
IDPrefix: "ctm",
|
|
DBTable: "customers",
|
|
FieldMap: map[string]string{
|
|
"id": "", // we generate this
|
|
"paddle_id": "id", // store Paddle's ID separately
|
|
"created_at": "created_at",
|
|
"updated_at": "updated_at",
|
|
},
|
|
RequiredFields: []string{"email"},
|
|
},
|
|
"addresses": {
|
|
PaddleEndpoint: "/customers/{customer_id}/addresses",
|
|
IDPrefix: "add",
|
|
DBTable: "addresses",
|
|
FieldMap: map[string]string{},
|
|
RequiredFields: []string{"customer_id", "country_code"},
|
|
},
|
|
"businesses": {
|
|
PaddleEndpoint: "/customers/{customer_id}/businesses",
|
|
IDPrefix: "biz",
|
|
DBTable: "businesses",
|
|
FieldMap: map[string]string{},
|
|
RequiredFields: []string{"customer_id", "name"},
|
|
},
|
|
"subscriptions": {
|
|
PaddleEndpoint: "/subscriptions",
|
|
IDPrefix: "sub",
|
|
DBTable: "subscriptions",
|
|
FieldMap: map[string]string{
|
|
"id": "",
|
|
"paddle_id": "id",
|
|
"created_at": "created_at",
|
|
"updated_at": "updated_at",
|
|
},
|
|
RequiredFields: []string{"customer_id", "address_id", "items"},
|
|
},
|
|
"transactions": {
|
|
PaddleEndpoint: "/transactions",
|
|
IDPrefix: "txn",
|
|
DBTable: "transactions",
|
|
FieldMap: map[string]string{},
|
|
RequiredFields: []string{"customer_id", "address_id", "items"},
|
|
},
|
|
"discounts": {
|
|
PaddleEndpoint: "/discounts",
|
|
IDPrefix: "dsc",
|
|
DBTable: "discounts",
|
|
FieldMap: map[string]string{
|
|
"id": "",
|
|
"paddle_id": "id",
|
|
"created_at": "created_at",
|
|
"updated_at": "updated_at",
|
|
},
|
|
RequiredFields: []string{"description", "type", "amount"},
|
|
},
|
|
}
|
|
|
|
type SyncLayer struct {
|
|
apiKey string
|
|
baseURL string
|
|
db *sql.DB
|
|
httpClient *http.Client
|
|
}
|
|
|
|
// Default API key - in production load from env
|
|
// For sandbox: get from https://sandbox-vendors.paddle.com/authentication
|
|
const DefaultAPIKey = "pdl_sdbx_apikey_01knegw36v6cvybp2y5652xpmq_weT2XzhV6Qk0rGEYDY0V5X_Aig"
|
|
|
|
// NewSyncLayer creates a sync layer with auto-detected sandbox mode
|
|
// Uses isProductionServer() from main.go to determine environment
|
|
func NewSyncLayer(apiKey string, db *sql.DB) *SyncLayer {
|
|
if apiKey == "" {
|
|
apiKey = DefaultAPIKey
|
|
}
|
|
|
|
// Auto-detect: if not on production server, use sandbox
|
|
sandbox := !isProductionServer()
|
|
if sandbox {
|
|
fmt.Println("Using PADDLE SANDBOX environment")
|
|
} else {
|
|
fmt.Println("Using PADDLE LIVE environment")
|
|
}
|
|
|
|
baseURL := "https://api.paddle.com"
|
|
if sandbox {
|
|
baseURL = "https://sandbox-api.paddle.com"
|
|
}
|
|
|
|
return &SyncLayer{
|
|
apiKey: apiKey,
|
|
baseURL: baseURL,
|
|
db: db,
|
|
httpClient: &http.Client{Timeout: 30 * time.Second},
|
|
}
|
|
}
|
|
|
|
// SyncResult from dual-write operation
|
|
type SyncResult struct {
|
|
Success bool
|
|
OurID string // Our internal ID (generated)
|
|
PaddleID string // Paddle's returned ID
|
|
Entity string // Entity type
|
|
Data map[string]interface{} // Final merged data
|
|
DBError error
|
|
PaddleError error
|
|
}
|
|
|
|
// Create syncs a new entity to both our DB and Paddle
|
|
func (s *SyncLayer) Create(entity string, data map[string]interface{}) SyncResult {
|
|
mapping, ok := EntityRegistry[entity]
|
|
if !ok {
|
|
return SyncResult{Success: false, Entity: entity, DBError: fmt.Errorf("unknown entity: %s", entity)}
|
|
}
|
|
|
|
// 1. Validate required fields
|
|
for _, field := range mapping.RequiredFields {
|
|
if _, ok := data[field]; !ok {
|
|
return SyncResult{Success: false, Entity: entity, DBError: fmt.Errorf("missing required field: %s", field)}
|
|
}
|
|
}
|
|
|
|
// 2. Generate our internal ID
|
|
ourID := fmt.Sprintf("%s_%d", mapping.IDPrefix, time.Now().UnixNano())
|
|
data["id"] = ourID
|
|
data["created_at"] = time.Now().Unix()
|
|
data["updated_at"] = time.Now().Unix()
|
|
data["sync_status"] = "pending"
|
|
data["pending_since"] = time.Now().Unix() // Track for retry logic
|
|
|
|
// 3. Build Paddle payload (map our fields to Paddle's if needed)
|
|
paddlePayload := s.toPaddlePayload(data, mapping.FieldMap)
|
|
|
|
// 4. Write to our DB first (always local first)
|
|
if err := s.insertToDB(mapping.DBTable, data); err != nil {
|
|
return SyncResult{Success: false, Entity: entity, OurID: ourID, DBError: err}
|
|
}
|
|
|
|
// 5. Determine endpoint (handle path params like {customer_id})
|
|
endpoint := s.resolveEndpoint(mapping.PaddleEndpoint, data)
|
|
|
|
// 6. Call Paddle API
|
|
paddleResp, err := s.callPaddle("POST", endpoint, paddlePayload)
|
|
if err != nil {
|
|
// Update DB with error, will retry later
|
|
s.db.Exec(fmt.Sprintf("UPDATE %s SET sync_status = 'error', sync_error = ? WHERE id = ?", mapping.DBTable),
|
|
err.Error(), ourID)
|
|
return SyncResult{Success: false, Entity: entity, OurID: ourID, PaddleError: err}
|
|
}
|
|
|
|
// 7. Extract Paddle ID from response
|
|
paddleID := s.extractID(paddleResp)
|
|
if paddleID == "" {
|
|
// Strange - Paddle didn't return ID
|
|
s.db.Exec(fmt.Sprintf("UPDATE %s SET sync_status = 'error', sync_error = ? WHERE id = ?", mapping.DBTable),
|
|
"paddle did not return id", ourID)
|
|
return SyncResult{Success: false, Entity: entity, OurID: ourID, PaddleError: fmt.Errorf("no id in paddle response")}
|
|
}
|
|
|
|
// 8. Update our DB with Paddle ID and synced status
|
|
_, dbErr := s.db.Exec(fmt.Sprintf("UPDATE %s SET paddle_id = ?, sync_status = 'synced', sync_error = NULL WHERE id = ?", mapping.DBTable),
|
|
paddleID, ourID)
|
|
if dbErr != nil {
|
|
// This is the worst case - Paddle created but we lost the ID
|
|
return SyncResult{Success: false, Entity: entity, OurID: ourID, PaddleID: paddleID,
|
|
DBError: fmt.Errorf("paddle created (%s) but DB update failed: %v", paddleID, dbErr)}
|
|
}
|
|
|
|
// 9. Merge Paddle's response data back into our record (includes IDs, timestamps, computed fields)
|
|
mergedData := s.mergePaddleResponse(data, paddleResp, mapping.FieldMap)
|
|
data["paddle_id"] = paddleID
|
|
data["sync_status"] = "synced"
|
|
data["last_paddle_sync_at"] = time.Now().Unix()
|
|
data["pending_since"] = nil // Clear pending since we're synced
|
|
|
|
// 10. Update our DB with merged data (Paddle may have added/modified fields)
|
|
if err := s.updateDBWithMergedData(mapping.DBTable, ourID, mergedData); err != nil {
|
|
// Log but don't fail - we have the core data
|
|
fmt.Printf("Warning: failed to update merged data for %s: %v\n", ourID, err)
|
|
}
|
|
|
|
return SyncResult{Success: true, Entity: entity, OurID: ourID, PaddleID: paddleID, Data: mergedData}
|
|
}
|
|
|
|
// Update syncs changes to both systems
|
|
func (s *SyncLayer) Update(entity string, ourID string, data map[string]interface{}) SyncResult {
|
|
mapping := EntityRegistry[entity]
|
|
|
|
// Get current record to find Paddle ID
|
|
var paddleID string
|
|
row := s.db.QueryRow(fmt.Sprintf("SELECT paddle_id FROM %s WHERE id = ?", mapping.DBTable), ourID)
|
|
row.Scan(&paddleID)
|
|
|
|
if paddleID == "" {
|
|
return SyncResult{Success: false, Entity: entity, OurID: ourID,
|
|
PaddleError: fmt.Errorf("no paddle_id found, cannot update")}
|
|
}
|
|
|
|
// Update our DB
|
|
data["updated_at"] = time.Now().Unix()
|
|
if err := s.updateDB(mapping.DBTable, ourID, data); err != nil {
|
|
return SyncResult{Success: false, Entity: entity, OurID: ourID, DBError: err}
|
|
}
|
|
|
|
// Update Paddle
|
|
paddlePayload := s.toPaddlePayload(data, mapping.FieldMap)
|
|
endpoint := mapping.PaddleEndpoint + "/" + paddleID
|
|
_, err := s.callPaddle("PATCH", endpoint, paddlePayload)
|
|
if err != nil {
|
|
return SyncResult{Success: false, Entity: entity, OurID: ourID, PaddleID: paddleID, PaddleError: err}
|
|
}
|
|
|
|
return SyncResult{Success: true, Entity: entity, OurID: ourID, PaddleID: paddleID}
|
|
}
|
|
|
|
// RetryAll retries all pending/error syncs that are stale (> 5 minutes)
|
|
func (s *SyncLayer) RetryAll() {
|
|
for entity := range EntityRegistry {
|
|
// Only retry items pending for more than 5 minutes
|
|
ids, err := s.FindStalePending(entity, 5)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
for _, id := range ids {
|
|
s.retryEntity(entity, id)
|
|
}
|
|
}
|
|
}
|
|
|
|
// RetryImmediate retries a specific entity immediately (for manual retry)
|
|
func (s *SyncLayer) RetryImmediate(entity, ourID string) {
|
|
// Reset status to trigger retry
|
|
s.db.Exec(fmt.Sprintf("UPDATE %s SET sync_status = 'pending', pending_since = ? WHERE id = ?", EntityRegistry[entity].DBTable),
|
|
time.Now().Unix(), ourID)
|
|
|
|
// Use the regular retry
|
|
s.retryEntity(entity, ourID)
|
|
}
|
|
|
|
// Private helpers
|
|
|
|
func (s *SyncLayer) insertToDB(table string, data map[string]interface{}) error {
|
|
columns := []string{}
|
|
placeholders := []string{}
|
|
values := []interface{}{}
|
|
|
|
for col, val := range data {
|
|
columns = append(columns, col)
|
|
placeholders = append(placeholders, "?")
|
|
values = append(values, val)
|
|
}
|
|
|
|
sql := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)",
|
|
table,
|
|
strings.Join(columns, ", "),
|
|
strings.Join(placeholders, ", "))
|
|
|
|
_, err := s.db.Exec(sql, values...)
|
|
return err
|
|
}
|
|
|
|
func (s *SyncLayer) updateDB(table string, id string, data map[string]interface{}) error {
|
|
sets := []string{}
|
|
values := []interface{}{}
|
|
|
|
for col, val := range data {
|
|
sets = append(sets, fmt.Sprintf("%s = ?", col))
|
|
values = append(values, val)
|
|
}
|
|
values = append(values, id)
|
|
|
|
sql := fmt.Sprintf("UPDATE %s SET %s WHERE id = ?",
|
|
table,
|
|
strings.Join(sets, ", "))
|
|
|
|
_, err := s.db.Exec(sql, values...)
|
|
return err
|
|
}
|
|
|
|
func (s *SyncLayer) toPaddlePayload(data map[string]interface{}, fieldMap map[string]string) map[string]interface{} {
|
|
result := make(map[string]interface{})
|
|
|
|
for ourField, val := range data {
|
|
// Skip internal fields
|
|
if ourField == "id" || ourField == "paddle_id" || ourField == "sync_status" || ourField == "sync_error" {
|
|
continue
|
|
}
|
|
if ourField == "created_at" || ourField == "updated_at" || ourField == "pending_since" {
|
|
continue
|
|
}
|
|
if ourField == "last_paddle_sync_at" || ourField == "paddle_event_id" || ourField == "internal_notes" {
|
|
continue
|
|
}
|
|
|
|
// Map field name if needed
|
|
paddleField := ourField
|
|
if mapped, ok := fieldMap[ourField]; ok && mapped != "" {
|
|
paddleField = mapped
|
|
}
|
|
|
|
result[paddleField] = val
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
func (s *SyncLayer) resolveEndpoint(endpoint string, data map[string]interface{}) string {
|
|
// Replace {customer_id} etc. with actual values
|
|
result := endpoint
|
|
for key, val := range data {
|
|
placeholder := "{" + key + "}"
|
|
if strVal, ok := val.(string); ok {
|
|
result = strings.ReplaceAll(result, placeholder, strVal)
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
func (s *SyncLayer) callPaddle(method, endpoint string, payload map[string]interface{}) (map[string]interface{}, error) {
|
|
url := s.baseURL + endpoint
|
|
body, _ := json.Marshal(payload)
|
|
|
|
req, err := http.NewRequest(method, url, bytes.NewBuffer(body))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
req.Header.Set("Authorization", "Bearer "+s.apiKey)
|
|
req.Header.Set("Content-Type", "application/json")
|
|
|
|
resp, err := s.httpClient.Do(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
respBody, _ := io.ReadAll(resp.Body)
|
|
|
|
if resp.StatusCode >= 400 {
|
|
return nil, fmt.Errorf("paddle %s %d: %s", method, resp.StatusCode, string(respBody))
|
|
}
|
|
|
|
var result map[string]interface{}
|
|
json.Unmarshal(respBody, &result)
|
|
return result, nil
|
|
}
|
|
|
|
func (s *SyncLayer) extractID(response map[string]interface{}) string {
|
|
// Paddle wraps response in "data": { "id": "..." }
|
|
if data, ok := response["data"].(map[string]interface{}); ok {
|
|
if id, ok := data["id"].(string); ok {
|
|
return id
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func (s *SyncLayer) retryEntity(entity, ourID string) {
|
|
// Simplified: just mark for retry by resetting status
|
|
mapping := EntityRegistry[entity]
|
|
s.db.Exec(fmt.Sprintf("UPDATE %s SET sync_status = 'pending' WHERE id = ?", mapping.DBTable), ourID)
|
|
}
|
|
|
|
// mergePaddleResponse merges Paddle's response data with our original data
|
|
// Maps Paddle field names back to our field names using reverse mapping
|
|
func (s *SyncLayer) mergePaddleResponse(ourData map[string]interface{}, paddleResp map[string]interface{}, fieldMap map[string]string) map[string]interface{} {
|
|
result := make(map[string]interface{})
|
|
|
|
// Start with our original data
|
|
for k, v := range ourData {
|
|
result[k] = v
|
|
}
|
|
|
|
// Extract Paddle's data object
|
|
var paddleData map[string]interface{}
|
|
if data, ok := paddleResp["data"].(map[string]interface{}); ok {
|
|
paddleData = data
|
|
} else {
|
|
paddleData = paddleResp // Some responses aren't wrapped
|
|
}
|
|
|
|
// Build reverse field map (paddle_field → our_field)
|
|
reverseMap := make(map[string]string)
|
|
for ourField, paddleField := range fieldMap {
|
|
if paddleField != "" {
|
|
reverseMap[paddleField] = ourField
|
|
}
|
|
}
|
|
|
|
// Merge Paddle fields, mapping names back to ours
|
|
for paddleField, val := range paddleData {
|
|
ourField := paddleField
|
|
if mapped, ok := reverseMap[paddleField]; ok {
|
|
ourField = mapped
|
|
}
|
|
|
|
// Don't overwrite our internal fields
|
|
if ourField == "id" || ourField == "sync_status" || ourField == "sync_error" {
|
|
continue
|
|
}
|
|
|
|
result[ourField] = val
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// updateDBWithMergedData updates our DB record with merged data from Paddle
|
|
func (s *SyncLayer) updateDBWithMergedData(table, ourID string, mergedData map[string]interface{}) error {
|
|
sets := []string{}
|
|
values := []interface{}{}
|
|
|
|
for col, val := range mergedData {
|
|
// Skip the ID field itself
|
|
if col == "id" {
|
|
continue
|
|
}
|
|
sets = append(sets, fmt.Sprintf("%s = ?", col))
|
|
values = append(values, val)
|
|
}
|
|
values = append(values, ourID)
|
|
|
|
sql := fmt.Sprintf("UPDATE %s SET %s WHERE id = ?",
|
|
table,
|
|
strings.Join(sets, ", "))
|
|
|
|
_, err := s.db.Exec(sql, values...)
|
|
return err
|
|
}
|
|
|
|
// HandleWebhook processes inbound updates from Paddle (subscriptions, status changes, etc.)
|
|
// Updates our DB to match Paddle's state
|
|
func (s *SyncLayer) HandleWebhook(entity string, paddleData map[string]interface{}) error {
|
|
mapping, ok := EntityRegistry[entity]
|
|
if !ok {
|
|
return fmt.Errorf("unknown entity for webhook: %s", entity)
|
|
}
|
|
|
|
// Extract Paddle ID
|
|
paddleID, ok := paddleData["id"].(string)
|
|
if !ok {
|
|
return fmt.Errorf("no id in webhook data")
|
|
}
|
|
|
|
// Extract event ID if present (for idempotency)
|
|
eventID, _ := paddleData["event_id"].(string)
|
|
|
|
// Find our record by Paddle ID
|
|
var ourID string
|
|
var existingEventID string
|
|
err := s.db.QueryRow(
|
|
fmt.Sprintf("SELECT id, paddle_event_id FROM %s WHERE paddle_id = ?", mapping.DBTable),
|
|
paddleID,
|
|
).Scan(&ourID, &existingEventID)
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("no local record for paddle %s: %v", paddleID, err)
|
|
}
|
|
|
|
// Idempotency check: skip if we already processed this event
|
|
if eventID != "" && existingEventID == eventID {
|
|
return nil // Already processed
|
|
}
|
|
|
|
// Map Paddle fields to our fields
|
|
ourData := make(map[string]interface{})
|
|
for paddleField, val := range paddleData {
|
|
ourField := paddleField
|
|
// Check if this Paddle field maps to one of ours
|
|
for ourF, paddleF := range mapping.FieldMap {
|
|
if paddleF == paddleField && paddleF != "" {
|
|
ourField = ourF
|
|
break
|
|
}
|
|
}
|
|
|
|
// Don't overwrite internal fields
|
|
if ourField == "id" || ourField == "paddle_id" {
|
|
continue
|
|
}
|
|
|
|
ourData[ourField] = val
|
|
}
|
|
|
|
// Update our record with webhook data
|
|
ourData["updated_at"] = time.Now().Unix()
|
|
ourData["sync_status"] = "synced" // Webhook confirms sync
|
|
ourData["sync_error"] = nil
|
|
ourData["last_paddle_sync_at"] = time.Now().Unix()
|
|
ourData["pending_since"] = nil // Clear pending
|
|
if eventID != "" {
|
|
ourData["paddle_event_id"] = eventID
|
|
}
|
|
|
|
return s.updateDBWithMergedData(mapping.DBTable, ourID, ourData)
|
|
}
|
|
|
|
// FindStalePending finds records pending for more than X minutes
|
|
func (s *SyncLayer) FindStalePending(entity string, minutes int) ([]string, error) {
|
|
mapping, ok := EntityRegistry[entity]
|
|
if !ok {
|
|
return nil, fmt.Errorf("unknown entity: %s", entity)
|
|
}
|
|
|
|
cutoff := time.Now().Add(-time.Duration(minutes) * time.Minute).Unix()
|
|
|
|
rows, err := s.db.Query(
|
|
fmt.Sprintf("SELECT id FROM %s WHERE sync_status = 'pending' AND pending_since < ?", mapping.DBTable),
|
|
cutoff,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var ids []string
|
|
for rows.Next() {
|
|
var id string
|
|
rows.Scan(&id)
|
|
ids = append(ids, id)
|
|
}
|
|
|
|
return ids, nil
|
|
}
|