302 lines
7.9 KiB
Go
302 lines
7.9 KiB
Go
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"inou/lib"
|
|
)
|
|
|
|
type Variant struct {
|
|
RSID string
|
|
Genotype string
|
|
}
|
|
|
|
// detectGenomeFormat returns format name based on first data line
|
|
func detectGenomeFormat(firstLine string) string {
|
|
if strings.Contains(firstLine, "\"") {
|
|
return "myheritage"
|
|
}
|
|
if strings.Contains(firstLine, "\t") {
|
|
parts := strings.Split(firstLine, "\t")
|
|
if len(parts) >= 5 {
|
|
return "ancestry"
|
|
}
|
|
return "23andme"
|
|
}
|
|
return "ftdna"
|
|
}
|
|
|
|
// parseGenomeVariant extracts rsid and genotype from a line
|
|
func parseGenomeVariant(line, format string) (string, string, bool) {
|
|
if strings.HasPrefix(line, "#") || strings.HasPrefix(line, "rsid") || strings.HasPrefix(line, "RSID") || (strings.HasPrefix(line, "\"") && strings.Contains(line, "RSID")) {
|
|
return "", "", false
|
|
}
|
|
|
|
var parts []string
|
|
var rsid, genotype string
|
|
|
|
switch format {
|
|
case "ancestry":
|
|
parts = strings.Split(line, "\t")
|
|
if len(parts) < 5 {
|
|
return "", "", false
|
|
}
|
|
rsid = parts[0]
|
|
allele1, allele2 := parts[3], parts[4]
|
|
if allele1 == "0" || allele2 == "0" {
|
|
return "", "", false
|
|
}
|
|
genotype = allele1 + allele2
|
|
|
|
case "23andme":
|
|
parts = strings.Split(line, "\t")
|
|
if len(parts) < 4 {
|
|
return "", "", false
|
|
}
|
|
rsid = parts[0]
|
|
genotype = parts[3]
|
|
if genotype == "--" {
|
|
return "", "", false
|
|
}
|
|
|
|
case "myheritage":
|
|
line = strings.ReplaceAll(line, "\"", "")
|
|
parts = strings.Split(line, ",")
|
|
if len(parts) < 4 {
|
|
return "", "", false
|
|
}
|
|
rsid = parts[0]
|
|
genotype = parts[3]
|
|
|
|
case "ftdna":
|
|
parts = strings.Split(line, ",")
|
|
if len(parts) < 4 {
|
|
return "", "", false
|
|
}
|
|
rsid = parts[0]
|
|
genotype = parts[3]
|
|
}
|
|
|
|
if !strings.HasPrefix(rsid, "rs") {
|
|
return "", "", false
|
|
}
|
|
|
|
// Normalize: sort alleles (GA -> AG)
|
|
if len(genotype) == 2 && genotype[0] > genotype[1] {
|
|
genotype = string(genotype[1]) + string(genotype[0])
|
|
}
|
|
|
|
return rsid, genotype, true
|
|
}
|
|
|
|
// updateUploadStatus updates the status in the upload entry Data JSON
|
|
func updateUploadStatus(uploadID string, status string, details string) {
|
|
entry, err := lib.EntryGet(nil, uploadID) // nil ctx - internal operation
|
|
if err != nil || entry == nil {
|
|
return
|
|
}
|
|
// Parse existing data, update status, preserve other fields
|
|
var data map[string]interface{}
|
|
json.Unmarshal([]byte(entry.Data), &data)
|
|
if data == nil {
|
|
data = make(map[string]interface{})
|
|
}
|
|
data["status"] = status
|
|
if details != "" {
|
|
data["details"] = details
|
|
}
|
|
newData, _ := json.Marshal(data)
|
|
entry.Data = string(newData)
|
|
lib.EntryModify(entry)
|
|
}
|
|
|
|
// processGenomeUpload processes a genetics file in the background
|
|
func processGenomeUpload(uploadID string, dossierID string, filePath string) {
|
|
updateUploadStatus(uploadID, "processing", "")
|
|
startTime := time.Now()
|
|
|
|
// Read and decrypt
|
|
encrypted, err := os.ReadFile(filePath)
|
|
if err != nil {
|
|
updateUploadStatus(uploadID, "failed", "Read failed: "+err.Error())
|
|
return
|
|
}
|
|
|
|
decrypted, err := lib.CryptoDecryptBytes(encrypted)
|
|
if err != nil {
|
|
updateUploadStatus(uploadID, "failed", "Decrypt failed: "+err.Error())
|
|
return
|
|
}
|
|
|
|
// Parse variants
|
|
scanner := bufio.NewScanner(bytes.NewReader(decrypted))
|
|
scanner.Buffer(make([]byte, 1024*1024), 1024*1024)
|
|
|
|
var format string
|
|
var firstDataLine string
|
|
for scanner.Scan() {
|
|
line := scanner.Text()
|
|
if !strings.HasPrefix(line, "#") && len(line) > 0 {
|
|
firstDataLine = line
|
|
break
|
|
}
|
|
}
|
|
format = detectGenomeFormat(firstDataLine)
|
|
|
|
variants := make([]Variant, 0, 800000)
|
|
|
|
if rsid, geno, ok := parseGenomeVariant(firstDataLine, format); ok {
|
|
variants = append(variants, Variant{rsid, geno})
|
|
}
|
|
|
|
for scanner.Scan() {
|
|
if rsid, geno, ok := parseGenomeVariant(scanner.Text(), format); ok {
|
|
variants = append(variants, Variant{rsid, geno})
|
|
}
|
|
}
|
|
|
|
// Sort by rsid
|
|
sort.Slice(variants, func(i, j int) bool {
|
|
return variants[i].RSID < variants[j].RSID
|
|
})
|
|
|
|
// Load SNPedia data
|
|
snpediaPath := "/home/johan/dev/inou/snpedia-genotypes/genotypes.db"
|
|
type CatInfo struct {
|
|
Category string
|
|
Subcategory string
|
|
Gene string
|
|
Magnitude float64
|
|
Repute string
|
|
Summary string
|
|
}
|
|
// Key: rsid+genotype -> slice of category associations
|
|
snpediaMap := make(map[string][]CatInfo, 50000)
|
|
snpediaRsids := make(map[string]bool, 15000)
|
|
|
|
if snpDB, err := lib.OpenReadOnly(snpediaPath); err == nil {
|
|
rows, _ := snpDB.Query("SELECT rsid, genotype_norm, gene, magnitude, repute, summary, category, subcategory FROM genotypes")
|
|
if rows != nil {
|
|
for rows.Next() {
|
|
var rsid, geno string
|
|
var gene, repute, summary, cat, subcat sql.NullString
|
|
var mag float64
|
|
rows.Scan(&rsid, &geno, &gene, &mag, &repute, &summary, &cat, &subcat)
|
|
if cat.String == "" {
|
|
continue // skip entries without category
|
|
}
|
|
key := rsid + ":" + geno
|
|
snpediaMap[key] = append(snpediaMap[key], CatInfo{
|
|
Category: cat.String,
|
|
Subcategory: subcat.String,
|
|
Gene: gene.String,
|
|
Magnitude: mag,
|
|
Repute: repute.String,
|
|
Summary: summary.String,
|
|
})
|
|
snpediaRsids[rsid] = true
|
|
}
|
|
rows.Close()
|
|
}
|
|
snpDB.Close()
|
|
}
|
|
|
|
// Match variants (only those with rsid in SNPedia)
|
|
matched := make([]Variant, 0, len(snpediaRsids))
|
|
for _, v := range variants {
|
|
if snpediaRsids[v.RSID] {
|
|
matched = append(matched, v)
|
|
}
|
|
}
|
|
|
|
// Delete existing genome entries (all genome data uses CategoryGenome with different Types)
|
|
lib.EntryDeleteByCategory(dossierID, lib.CategoryGenome)
|
|
|
|
// Create extraction entry (tier 1)
|
|
now := time.Now().Unix()
|
|
totalAssoc := 0
|
|
for _, v := range matched {
|
|
totalAssoc += len(snpediaMap[v.RSID+":"+v.Genotype])
|
|
}
|
|
|
|
parentEntry := &lib.Entry{
|
|
DossierID: dossierID,
|
|
Category: lib.CategoryGenome,
|
|
Type: "extraction",
|
|
Value: format,
|
|
Timestamp: now,
|
|
Data: fmt.Sprintf(`{"upload_id":"%s","variants":%d,"associations":%d,"total_parsed":%d}`, uploadID, len(matched), totalAssoc, len(variants)),
|
|
}
|
|
lib.EntryAdd(parentEntry)
|
|
extractionID := parentEntry.EntryID
|
|
|
|
// Create category tiers (tier 2) - category name in Value, Type="tier"
|
|
tierMap := make(map[string]string) // category -> tier entry_id
|
|
|
|
for _, v := range matched {
|
|
for _, info := range snpediaMap[v.RSID+":"+v.Genotype] {
|
|
if _, exists := tierMap[info.Category]; !exists {
|
|
tierEntry := &lib.Entry{
|
|
DossierID: dossierID,
|
|
ParentID: extractionID,
|
|
Category: lib.CategoryGenome,
|
|
Type: "tier",
|
|
Value: info.Category, // "medication", "cardiovascular", etc.
|
|
Timestamp: now,
|
|
}
|
|
lib.EntryAdd(tierEntry)
|
|
tierMap[info.Category] = tierEntry.EntryID
|
|
}
|
|
}
|
|
}
|
|
|
|
// Batch insert variants (tier 3) - Type="rsid", Value=genotype
|
|
var batch []*lib.Entry
|
|
insertCount := 0
|
|
|
|
for _, v := range matched {
|
|
for _, info := range snpediaMap[v.RSID+":"+v.Genotype] {
|
|
tierID := tierMap[info.Category]
|
|
|
|
// data includes subcategory (plain text - lib.EntryAddBatch encrypts automatically)
|
|
data := fmt.Sprintf(`{"mag":%.1f,"rep":"%s","sum":"%s","sub":"%s"}`,
|
|
info.Magnitude, info.Repute, strings.ReplaceAll(info.Summary, `"`, `\"`), info.Subcategory)
|
|
|
|
batch = append(batch, &lib.Entry{
|
|
DossierID: dossierID,
|
|
ParentID: tierID,
|
|
Category: lib.CategoryGenome,
|
|
Type: v.RSID, // lib.EntryAddBatch encrypts
|
|
Value: v.Genotype, // lib.EntryAddBatch encrypts
|
|
Ordinal: insertCount,
|
|
Timestamp: now,
|
|
Tags: info.Gene, // lib.EntryAddBatch encrypts
|
|
Data: data, // lib.EntryAddBatch encrypts
|
|
})
|
|
insertCount++
|
|
|
|
if len(batch) >= 500 {
|
|
lib.EntryAddBatch(batch)
|
|
batch = batch[:0] // Reset slice
|
|
}
|
|
}
|
|
}
|
|
// Insert remaining entries
|
|
if len(batch) > 0 {
|
|
lib.EntryAddBatch(batch)
|
|
}
|
|
|
|
elapsed := time.Since(startTime)
|
|
details := fmt.Sprintf("parent=%s variants=%d parsed=%d elapsed=%v", extractionID, len(matched), len(variants), elapsed)
|
|
updateUploadStatus(uploadID, "completed", details)
|
|
lib.AuditLog(dossierID, "genome_import", dossierID, fmt.Sprintf("%d", len(matched)))
|
|
}
|