Files
pedestrian-simulator/server/step_manager.go

406 lines
12 KiB
Go
Raw Normal View History

2026-01-11 17:16:59 -07:00
package main
import (
"database/sql"
2026-01-11 17:16:59 -07:00
"fmt"
"sync"
"time"
)
const METERS_PER_STEP = 0.762 // Match frontend
2026-01-11 17:16:59 -07:00
type TripState struct {
StartDate string `json:"start_date"` // YYYY-MM-DD
StartTime time.Time `json:"start_time"` // Exact start time
StartDayInitialSteps int `json:"start_day_initial_steps"` // Steps on the tracker when trip started
DailyCache map[string]int `json:"daily_cache"` // Cache of steps for past days
// Trip Metadata
TripType string `json:"trip_type"`
RouteName string `json:"route_name"`
StartAddress string `json:"start_address"`
EndAddress string `json:"end_address"`
KmlID *int `json:"kml_id"`
TotalDistance float64 `json:"total_distance"` // in km
2026-01-11 17:16:59 -07:00
}
type StepManager struct {
mu sync.Mutex
userID string // Fitbit user ID
tripState TripState
// Smoothing State
previousTotalSteps int // What we last told the client (or where we started smoothing from)
targetTotalSteps int // The actual total steps we just fetched/calculated
lastSyncTime time.Time
nextSyncTime time.Time
syncInterval time.Duration
}
func NewStepManager(userID string) *StepManager {
now := time.Now()
2026-01-11 17:16:59 -07:00
sm := &StepManager{
userID: userID,
syncInterval: 15 * time.Minute,
lastSyncTime: now.Add(-15 * time.Minute),
nextSyncTime: now.Add(24 * time.Hour), // Don't sync unless a trip is started or loaded
2026-01-11 17:16:59 -07:00
}
sm.LoadTripState()
2026-01-11 17:16:59 -07:00
return sm
}
func (sm *StepManager) LoadTripState() error {
sm.mu.Lock()
defer sm.mu.Unlock()
return sm.loadTripStateLocked()
}
func (sm *StepManager) loadTripStateLocked() error {
var startTime time.Time
var kmlID sql.NullInt64
err := db.QueryRow(`
SELECT start_date, start_time, start_day_initial_steps, previous_total_steps, target_total_steps, last_sync_time, next_sync_time,
trip_type, route_name, start_address, end_address, kml_id, total_distance
FROM trips WHERE user_id = ?
`, sm.userID).Scan(
&sm.tripState.StartDate, &startTime, &sm.tripState.StartDayInitialSteps,
&sm.previousTotalSteps, &sm.targetTotalSteps, &sm.lastSyncTime, &sm.nextSyncTime,
&sm.tripState.TripType, &sm.tripState.RouteName, &sm.tripState.StartAddress, &sm.tripState.EndAddress, &kmlID, &sm.tripState.TotalDistance,
)
if err == nil && kmlID.Valid {
id := int(kmlID.Int64)
sm.tripState.KmlID = &id
}
2026-01-11 17:16:59 -07:00
if err != nil {
if err == sql.ErrNoRows {
// Initialize with defaults if no trip exists
now := time.Now()
sm.tripState.StartDate = "" // No active trip
sm.nextSyncTime = now.Add(24 * time.Hour)
return nil
2026-01-11 17:16:59 -07:00
}
return err
}
sm.tripState.StartTime = startTime
2026-01-11 17:16:59 -07:00
// Load daily cache from DB
rows, err := db.Query("SELECT date, steps FROM daily_steps WHERE user_id = ?", sm.userID)
if err != nil {
return err
2026-01-11 17:16:59 -07:00
}
defer rows.Close()
2026-01-11 17:16:59 -07:00
sm.tripState.DailyCache = make(map[string]int)
for rows.Next() {
var date string
var steps int
if err := rows.Scan(&date, &steps); err != nil {
return err
}
sm.tripState.DailyCache[date] = steps
2026-01-11 17:16:59 -07:00
}
2026-01-11 17:16:59 -07:00
return nil
}
func (sm *StepManager) SaveTripState() {
sm.mu.Lock()
defer sm.mu.Unlock()
sm.saveTripStateLocked()
}
func (sm *StepManager) saveTripStateLocked() {
_, err := db.Exec(`
INSERT INTO trips (user_id, start_date, start_time, start_day_initial_steps, previous_total_steps, target_total_steps, last_sync_time, next_sync_time,
trip_type, route_name, start_address, end_address, kml_id, total_distance)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
start_date = VALUES(start_date),
start_time = VALUES(start_time),
start_day_initial_steps = VALUES(start_day_initial_steps),
previous_total_steps = VALUES(previous_total_steps),
target_total_steps = VALUES(target_total_steps),
last_sync_time = VALUES(last_sync_time),
next_sync_time = VALUES(next_sync_time),
trip_type = VALUES(trip_type),
route_name = VALUES(route_name),
start_address = VALUES(start_address),
end_address = VALUES(end_address),
kml_id = VALUES(kml_id),
total_distance = VALUES(total_distance)
`, sm.userID, sm.tripState.StartDate, sm.tripState.StartTime, sm.tripState.StartDayInitialSteps,
sm.previousTotalSteps, sm.targetTotalSteps, sm.lastSyncTime, sm.nextSyncTime,
sm.tripState.TripType, sm.tripState.RouteName, sm.tripState.StartAddress, sm.tripState.EndAddress, sm.tripState.KmlID, sm.tripState.TotalDistance)
if err != nil {
fmt.Printf("Error saving trip state: %v\n", err)
}
// Save daily cache
for date, steps := range sm.tripState.DailyCache {
_, err := db.Exec(`
INSERT INTO daily_steps (user_id, date, steps)
VALUES (?, ?, ?)
ON DUPLICATE KEY UPDATE steps = VALUES(steps)
`, sm.userID, date, steps)
if err != nil {
fmt.Printf("Error saving daily steps for %s: %v\n", date, err)
}
2026-01-11 17:16:59 -07:00
}
}
func (sm *StepManager) StartNewTrip(metadata TripState) {
2026-01-11 17:16:59 -07:00
sm.mu.Lock()
defer sm.mu.Unlock()
now := time.Now()
initialSteps, err := GetDailySteps(sm.userID, now.Format("2006-01-02"))
if err != nil {
initialSteps = 0
fmt.Printf("Error fetching initial steps: %v\n", err)
}
sm.tripState = TripState{
StartDate: now.Format("2006-01-02"),
StartTime: now,
StartDayInitialSteps: initialSteps,
DailyCache: make(map[string]int),
TripType: metadata.TripType,
RouteName: metadata.RouteName,
StartAddress: metadata.StartAddress,
EndAddress: metadata.EndAddress,
KmlID: metadata.KmlID,
TotalDistance: metadata.TotalDistance,
2026-01-11 17:16:59 -07:00
}
// On new trip, previous total is 0
sm.previousTotalSteps = 0
sm.targetTotalSteps = 0
sm.saveTripStateLocked()
2026-01-11 17:16:59 -07:00
// Trigger immediate sync to set baseline
go sm.Sync()
}
// Sync fetches data for all days in the trip using the default interval
func (sm *StepManager) Sync() {
sm.performSync(sm.syncInterval)
}
// Drain fetches data and sets a short sync interval to fast-forward interpolation
func (sm *StepManager) Drain() {
sm.performSync(30 * time.Second)
}
// performSync implementation
func (sm *StepManager) performSync(interval time.Duration) {
sm.mu.Lock()
tripStateCopy := sm.tripState
// Deep copy the map to avoid data races during async network calls
newDailyCache := make(map[string]int)
for k, v := range tripStateCopy.DailyCache {
newDailyCache[k] = v
}
sm.mu.Unlock()
totalSteps := 0
today := time.Now().Format("2006-01-02")
// Parse start date in local time
if tripStateCopy.StartDate == "" {
fmt.Println("[StepManager] Skipping sync: No active trip start date")
sm.mu.Lock()
sm.nextSyncTime = time.Now().Add(1 * time.Hour) // Check again in an hour
sm.mu.Unlock()
return
}
start, err := time.ParseInLocation("2006-01-02", tripStateCopy.StartDate, time.Local)
if err != nil {
fmt.Printf("[StepManager] ERROR: Invalid start date '%s': %v\n", tripStateCopy.StartDate, err)
return
}
end, _ := time.ParseInLocation("2006-01-02", today, time.Local)
2026-01-11 17:16:59 -07:00
// Iterate from Start Date to Today
for d := start; !d.After(end); d = d.AddDate(0, 0, 1) {
dateStr := d.Format("2006-01-02")
var steps int
var err error
// Check cache first for past days
// For today, we always fetch.
// Ideally we might trust cache for past days, but re-checking isn't bad if we want to catch up.
// The current logic trusts cache for past days.
shouldFetch := (dateStr == today)
if !shouldFetch {
if cached, ok := newDailyCache[dateStr]; ok {
steps = cached
} else {
shouldFetch = true
}
}
if shouldFetch {
steps, err = GetDailySteps(sm.userID, dateStr)
if err != nil {
fmt.Printf("Error fetching steps for %s: %v\n", dateStr, err)
return // Don't proceed with sync if fetch fails
}
// Update the local cache
newDailyCache[dateStr] = steps
}
// Calculate contribution to total
if dateStr == tripStateCopy.StartDate {
// Substract the steps that were already there when we started
contribution := steps - tripStateCopy.StartDayInitialSteps
if contribution < 0 {
contribution = 0
}
totalSteps += contribution
} else {
totalSteps += steps
}
}
sm.mu.Lock()
defer sm.mu.Unlock()
// Update State
sm.tripState.DailyCache = newDailyCache
sm.saveTripStateLocked()
2026-01-11 17:16:59 -07:00
// Update Smoothing Targets
sm.previousTotalSteps = sm.calculateSmoothedTokenAt(time.Now()) // Snapshot current interpolated value as new start
sm.targetTotalSteps = totalSteps
sm.lastSyncTime = time.Now()
sm.nextSyncTime = time.Now().Add(interval)
_, err = db.Exec(`
UPDATE trips SET
last_sync_time = NOW(),
next_sync_time = DATE_ADD(NOW(), INTERVAL ? SECOND),
previous_total_steps = ?,
target_total_steps = ?
WHERE user_id = ?
`, interval.Seconds(), sm.previousTotalSteps, sm.targetTotalSteps, sm.userID)
if err != nil {
fmt.Printf("Error saving sync completion: %v\n", err)
}
// Check for Trip Completion
if sm.tripState.TotalDistance > 0 {
currentDistance := float64(sm.targetTotalSteps) * METERS_PER_STEP / 1000.0
if currentDistance >= sm.tripState.TotalDistance {
fmt.Printf("[StepManager] Trip Fulfillment Detected! %.2f / %.2f km\n", currentDistance, sm.tripState.TotalDistance)
sm.recordTripCompletionLocked()
}
}
2026-01-11 17:16:59 -07:00
fmt.Printf("Sync Complete. Total Trip Steps: %d\n", sm.targetTotalSteps)
}
func (sm *StepManager) recordTripCompletionLocked() {
// 1. Insert into completed_trips
_, err := db.Exec(`
INSERT INTO completed_trips (user_id, trip_type, route_name, start_address, end_address, kml_id, distance)
VALUES (?, ?, ?, ?, ?, ?, ?)
`, sm.userID, sm.tripState.TripType, sm.tripState.RouteName, sm.tripState.StartAddress, sm.tripState.EndAddress, sm.tripState.KmlID, sm.tripState.TotalDistance)
if err != nil {
fmt.Printf("[StepManager] Error recording completion: %v\n", err)
return
}
// 2. Delete from active trips
_, err = db.Exec("DELETE FROM trips WHERE user_id = ?", sm.userID)
if err != nil {
fmt.Printf("[StepManager] Error clearing active trip: %v\n", err)
}
// 3. Reset local state
sm.tripState.StartDate = ""
sm.previousTotalSteps = 0
sm.targetTotalSteps = 0
}
2026-01-11 17:16:59 -07:00
// calculateSmoothedTokenAt returns the interpolated step count at a given time
func (sm *StepManager) calculateSmoothedTokenAt(t time.Time) int {
totalDuration := sm.nextSyncTime.Sub(sm.lastSyncTime)
elapsed := t.Sub(sm.lastSyncTime)
if totalDuration <= 0 {
return sm.targetTotalSteps
}
progress := float64(elapsed) / float64(totalDuration)
if progress < 0 {
progress = 0
}
if progress > 1 {
progress = 1
}
// Linear interpolation from Previous -> Target
delta := sm.targetTotalSteps - sm.previousTotalSteps
return sm.previousTotalSteps + int(float64(delta)*progress)
}
func (sm *StepManager) GetStatus() map[string]interface{} {
// Reload from DB to get latest sync results from other instances
sm.LoadTripState()
sm.mu.Lock()
shouldSync := time.Now().After(sm.nextSyncTime)
nextSyncMilli := sm.nextSyncTime.UnixMilli()
2026-01-11 17:16:59 -07:00
currentSmoothed := sm.calculateSmoothedTokenAt(time.Now())
sm.mu.Unlock()
// Auto-trigger sync if needed (out of initial lock to avoid deadlock)
if shouldSync {
go sm.Sync() // Async sync
}
2026-01-11 17:16:59 -07:00
res := map[string]interface{}{
2026-01-11 17:16:59 -07:00
"tripSteps": currentSmoothed,
"nextSyncTime": nextSyncMilli,
2026-01-11 17:16:59 -07:00
}
sm.mu.Lock()
if sm.tripState.StartDate != "" {
res["activeTrip"] = map[string]interface{}{
"trip_type": sm.tripState.TripType,
"route_name": sm.tripState.RouteName,
"start_address": sm.tripState.StartAddress,
"end_address": sm.tripState.EndAddress,
"kml_id": sm.tripState.KmlID,
"total_distance": sm.tripState.TotalDistance,
"start_time": sm.tripState.StartTime,
}
}
sm.mu.Unlock()
return res
2026-01-11 17:16:59 -07:00
}
// RecalculateTotalFromState sums up the steps from the DailyCache without making external API calls
func (sm *StepManager) RecalculateTotalFromState() int {
total := 0
for dateStr, steps := range sm.tripState.DailyCache {
// YYYY-MM-DD string comparison works for chronological order
if dateStr < sm.tripState.StartDate {
continue
}
if dateStr == sm.tripState.StartDate {
contribution := steps - sm.tripState.StartDayInitialSteps
if contribution < 0 {
contribution = 0
}
total += contribution
} else {
total += steps
}
}
return total
}