ScuttleBot

scuttlebot / internal / bots / scribe / store.go
Source Blame History 373 lines
c12ba92… lmata 1 package scribe
c12ba92… lmata 2
c12ba92… lmata 3 import (
5ac549c… lmata 4 "bufio"
5ac549c… lmata 5 "encoding/json"
5ac549c… lmata 6 "fmt"
5ac549c… lmata 7 "os"
5ac549c… lmata 8 "path/filepath"
5ac549c… lmata 9 "strings"
c12ba92… lmata 10 "sync"
c12ba92… lmata 11 "time"
c12ba92… lmata 12 )
c12ba92… lmata 13
c12ba92… lmata 14 // EntryKind describes how a log entry was parsed.
c12ba92… lmata 15 type EntryKind string
c12ba92… lmata 16
c12ba92… lmata 17 const (
c12ba92… lmata 18 EntryKindEnvelope EntryKind = "envelope" // parsed as a valid JSON envelope
f7eb47b… lmata 19 EntryKindRaw EntryKind = "raw" // could not be parsed, logged as-is
c12ba92… lmata 20 )
c12ba92… lmata 21
c12ba92… lmata 22 // Entry is a single structured log record written by scribe.
c12ba92… lmata 23 type Entry struct {
c12ba92… lmata 24 At time.Time `json:"at"`
c12ba92… lmata 25 Channel string `json:"channel"`
c12ba92… lmata 26 Nick string `json:"nick"`
c12ba92… lmata 27 Kind EntryKind `json:"kind"`
c12ba92… lmata 28 MessageType string `json:"message_type,omitempty"` // envelope type if Kind == envelope
c12ba92… lmata 29 MessageID string `json:"message_id,omitempty"` // envelope ID if Kind == envelope
c12ba92… lmata 30 Raw string `json:"raw"`
c12ba92… lmata 31 }
c12ba92… lmata 32
c12ba92… lmata 33 // Store is the storage backend for scribe log entries.
c12ba92… lmata 34 type Store interface {
c12ba92… lmata 35 Append(entry Entry) error
c12ba92… lmata 36 Query(channel string, limit int) ([]Entry, error)
c12ba92… lmata 37 }
c12ba92… lmata 38
c12ba92… lmata 39 // MemoryStore is an in-memory Store used for testing.
c12ba92… lmata 40 type MemoryStore struct {
c12ba92… lmata 41 mu sync.RWMutex
c12ba92… lmata 42 entries []Entry
c12ba92… lmata 43 }
c12ba92… lmata 44
c12ba92… lmata 45 func (s *MemoryStore) Append(entry Entry) error {
c12ba92… lmata 46 s.mu.Lock()
c12ba92… lmata 47 defer s.mu.Unlock()
c12ba92… lmata 48 s.entries = append(s.entries, entry)
c12ba92… lmata 49 return nil
c12ba92… lmata 50 }
c12ba92… lmata 51
c12ba92… lmata 52 func (s *MemoryStore) Query(channel string, limit int) ([]Entry, error) {
c12ba92… lmata 53 s.mu.RLock()
c12ba92… lmata 54 defer s.mu.RUnlock()
c12ba92… lmata 55
c12ba92… lmata 56 var out []Entry
c12ba92… lmata 57 for _, e := range s.entries {
c12ba92… lmata 58 if channel == "" || e.Channel == channel {
c12ba92… lmata 59 out = append(out, e)
c12ba92… lmata 60 }
c12ba92… lmata 61 }
c12ba92… lmata 62 if limit > 0 && len(out) > limit {
c12ba92… lmata 63 out = out[len(out)-limit:]
c12ba92… lmata 64 }
c12ba92… lmata 65 return out, nil
c12ba92… lmata 66 }
c12ba92… lmata 67
c12ba92… lmata 68 // All returns all entries (test helper).
c12ba92… lmata 69 func (s *MemoryStore) All() []Entry {
c12ba92… lmata 70 s.mu.RLock()
c12ba92… lmata 71 defer s.mu.RUnlock()
c12ba92… lmata 72 out := make([]Entry, len(s.entries))
c12ba92… lmata 73 copy(out, s.entries)
c12ba92… lmata 74 return out
5ac549c… lmata 75 }
5ac549c… lmata 76
5ac549c… lmata 77 // ---------------------------------------------------------------------------
5ac549c… lmata 78 // FileStore
5ac549c… lmata 79 // ---------------------------------------------------------------------------
5ac549c… lmata 80
5ac549c… lmata 81 // FileStoreConfig controls FileStore behaviour.
5ac549c… lmata 82 type FileStoreConfig struct {
5ac549c… lmata 83 Dir string // base directory; created on first write if absent
5ac549c… lmata 84 Format string // "jsonl" (default) | "csv" | "text"
5ac549c… lmata 85 Rotation string // "none" (default) | "daily" | "weekly" | "size"
5ac549c… lmata 86 MaxSizeMB int // size rotation threshold in MiB; 0 = no limit
5ac549c… lmata 87 PerChannel bool // true: one file per channel; false: single combined file
5ac549c… lmata 88 MaxAgeDays int // prune rotated files older than N days; 0 = keep all
5ac549c… lmata 89 }
5ac549c… lmata 90
5ac549c… lmata 91 // FileStore writes log entries to rotating files on disk.
5ac549c… lmata 92 // It is safe for concurrent use.
5ac549c… lmata 93 type FileStore struct {
5ac549c… lmata 94 cfg FileStoreConfig
5ac549c… lmata 95 mu sync.Mutex
5ac549c… lmata 96 files map[string]*openFile // key: sanitized channel name or "_all"
5ac549c… lmata 97 }
5ac549c… lmata 98
5ac549c… lmata 99 type openFile struct {
5ac549c… lmata 100 f *os.File
5ac549c… lmata 101 size int64
5ac549c… lmata 102 bucket string // date bucket for time-based rotation ("YYYY-MM-DD", "YYYY-Www")
5ac549c… lmata 103 path string // absolute path of the current file (for size rotation)
5ac549c… lmata 104 }
5ac549c… lmata 105
5ac549c… lmata 106 // NewFileStore creates a FileStore with the given config.
5ac549c… lmata 107 // Defaults: Format="jsonl", Rotation="none".
5ac549c… lmata 108 func NewFileStore(cfg FileStoreConfig) *FileStore {
5ac549c… lmata 109 if cfg.Format == "" {
5ac549c… lmata 110 cfg.Format = "jsonl"
5ac549c… lmata 111 }
5ac549c… lmata 112 if cfg.Rotation == "" {
5ac549c… lmata 113 cfg.Rotation = "none"
5ac549c… lmata 114 }
5ac549c… lmata 115 return &FileStore{
5ac549c… lmata 116 cfg: cfg,
5ac549c… lmata 117 files: make(map[string]*openFile),
5ac549c… lmata 118 }
5ac549c… lmata 119 }
5ac549c… lmata 120
5ac549c… lmata 121 func (s *FileStore) Append(entry Entry) error {
5ac549c… lmata 122 s.mu.Lock()
5ac549c… lmata 123 defer s.mu.Unlock()
5ac549c… lmata 124
5ac549c… lmata 125 if err := os.MkdirAll(s.cfg.Dir, 0755); err != nil {
5ac549c… lmata 126 return fmt.Errorf("scribe filestore: mkdir %s: %w", s.cfg.Dir, err)
5ac549c… lmata 127 }
5ac549c… lmata 128
5ac549c… lmata 129 key := "_all"
5ac549c… lmata 130 if s.cfg.PerChannel {
5ac549c… lmata 131 key = sanitizeChannel(entry.Channel)
5ac549c… lmata 132 }
5ac549c… lmata 133
5ac549c… lmata 134 of, err := s.getFile(key, entry.At)
5ac549c… lmata 135 if err != nil {
5ac549c… lmata 136 return err
5ac549c… lmata 137 }
5ac549c… lmata 138
5ac549c… lmata 139 line, err := s.formatEntry(entry)
5ac549c… lmata 140 if err != nil {
5ac549c… lmata 141 return err
5ac549c… lmata 142 }
5ac549c… lmata 143
5ac549c… lmata 144 n, err := fmt.Fprintln(of.f, line)
5ac549c… lmata 145 if err != nil {
5ac549c… lmata 146 return fmt.Errorf("scribe filestore: write: %w", err)
5ac549c… lmata 147 }
5ac549c… lmata 148 of.size += int64(n)
5ac549c… lmata 149 return nil
5ac549c… lmata 150 }
5ac549c… lmata 151
5ac549c… lmata 152 // getFile returns the open file for the given key, rotating if necessary.
5ac549c… lmata 153 // Caller must hold s.mu.
5ac549c… lmata 154 func (s *FileStore) getFile(key string, now time.Time) (*openFile, error) {
5ac549c… lmata 155 of := s.files[key]
5ac549c… lmata 156 bucket := s.timeBucket(now)
5ac549c… lmata 157
5ac549c… lmata 158 if of != nil {
5ac549c… lmata 159 needRotate := false
5ac549c… lmata 160 switch s.cfg.Rotation {
5ac549c… lmata 161 case "daily", "weekly":
5ac549c… lmata 162 needRotate = of.bucket != bucket
5ac549c… lmata 163 case "size":
5ac549c… lmata 164 if s.cfg.MaxSizeMB > 0 {
5ac549c… lmata 165 needRotate = of.size >= int64(s.cfg.MaxSizeMB)*1024*1024
5ac549c… lmata 166 }
5ac549c… lmata 167 }
5ac549c… lmata 168 if needRotate {
5ac549c… lmata 169 _ = of.f.Close()
5ac549c… lmata 170 if s.cfg.Rotation == "size" {
5ac549c… lmata 171 s.shiftSizeBackups(of.path)
5ac549c… lmata 172 }
5ac549c… lmata 173 of = nil
5ac549c… lmata 174 delete(s.files, key)
5ac549c… lmata 175 }
5ac549c… lmata 176 }
5ac549c… lmata 177
5ac549c… lmata 178 if of == nil {
5ac549c… lmata 179 path := s.filePath(key, now)
5ac549c… lmata 180 f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
5ac549c… lmata 181 if err != nil {
5ac549c… lmata 182 return nil, fmt.Errorf("scribe filestore: open %s: %w", path, err)
5ac549c… lmata 183 }
5ac549c… lmata 184 var size int64
5ac549c… lmata 185 if fi, err := f.Stat(); err == nil {
5ac549c… lmata 186 size = fi.Size()
5ac549c… lmata 187 }
5ac549c… lmata 188 of = &openFile{f: f, size: size, bucket: bucket, path: path}
5ac549c… lmata 189 s.files[key] = of
5ac549c… lmata 190 }
5ac549c… lmata 191 return of, nil
5ac549c… lmata 192 }
5ac549c… lmata 193
5ac549c… lmata 194 // shiftSizeBackups renames path → path.1, path.1 → path.2 … up to .10.
5ac549c… lmata 195 func (s *FileStore) shiftSizeBackups(path string) {
5ac549c… lmata 196 for i := 9; i >= 1; i-- {
5ac549c… lmata 197 _ = os.Rename(fmt.Sprintf("%s.%d", path, i), fmt.Sprintf("%s.%d", path, i+1))
5ac549c… lmata 198 }
5ac549c… lmata 199 _ = os.Rename(path, path+".1")
5ac549c… lmata 200 }
5ac549c… lmata 201
5ac549c… lmata 202 func (s *FileStore) timeBucket(t time.Time) string {
5ac549c… lmata 203 switch s.cfg.Rotation {
5ac549c… lmata 204 case "daily":
5ac549c… lmata 205 return t.Format("2006-01-02")
5ac549c… lmata 206 case "weekly":
5ac549c… lmata 207 year, week := t.ISOWeek()
5ac549c… lmata 208 return fmt.Sprintf("%04d-W%02d", year, week)
5ac549c… lmata 209 case "monthly":
5ac549c… lmata 210 return t.Format("2006-01")
5ac549c… lmata 211 case "yearly":
5ac549c… lmata 212 return t.Format("2006")
5ac549c… lmata 213 default:
5ac549c… lmata 214 return "current"
5ac549c… lmata 215 }
5ac549c… lmata 216 }
5ac549c… lmata 217
5ac549c… lmata 218 func (s *FileStore) filePath(key string, now time.Time) string {
5ac549c… lmata 219 extMap := map[string]string{"jsonl": ".jsonl", "csv": ".csv", "text": ".log"}
5ac549c… lmata 220 ext, ok := extMap[s.cfg.Format]
5ac549c… lmata 221 if !ok {
5ac549c… lmata 222 ext = ".jsonl"
5ac549c… lmata 223 }
5ac549c… lmata 224
5ac549c… lmata 225 var suffix string
5ac549c… lmata 226 switch s.cfg.Rotation {
5ac549c… lmata 227 case "daily":
5ac549c… lmata 228 suffix = "-" + now.Format("2006-01-02")
5ac549c… lmata 229 case "weekly":
5ac549c… lmata 230 year, week := now.ISOWeek()
5ac549c… lmata 231 suffix = fmt.Sprintf("-%04d-W%02d", year, week)
5ac549c… lmata 232 case "monthly":
5ac549c… lmata 233 suffix = "-" + now.Format("2006-01")
5ac549c… lmata 234 case "yearly":
5ac549c… lmata 235 suffix = "-" + now.Format("2006")
5ac549c… lmata 236 }
5ac549c… lmata 237 return filepath.Join(s.cfg.Dir, key+suffix+ext)
5ac549c… lmata 238 }
5ac549c… lmata 239
5ac549c… lmata 240 func (s *FileStore) formatEntry(e Entry) (string, error) {
5ac549c… lmata 241 switch s.cfg.Format {
5ac549c… lmata 242 case "csv":
5ac549c… lmata 243 return strings.Join([]string{
5ac549c… lmata 244 e.At.Format(time.RFC3339),
5ac549c… lmata 245 csvField(e.Channel),
5ac549c… lmata 246 csvField(e.Nick),
5ac549c… lmata 247 string(e.Kind),
5ac549c… lmata 248 csvField(e.MessageType),
5ac549c… lmata 249 csvField(e.MessageID),
5ac549c… lmata 250 csvField(e.Raw),
5ac549c… lmata 251 }, ","), nil
5ac549c… lmata 252 case "text":
5ac549c… lmata 253 return fmt.Sprintf("%s %s <%s> %s",
5ac549c… lmata 254 e.At.Format("2006-01-02T15:04:05"),
5ac549c… lmata 255 e.Channel, e.Nick, e.Raw), nil
5ac549c… lmata 256 default: // jsonl
5ac549c… lmata 257 b, err := json.Marshal(e)
5ac549c… lmata 258 return string(b), err
5ac549c… lmata 259 }
5ac549c… lmata 260 }
5ac549c… lmata 261
5ac549c… lmata 262 // Query returns the most recent entries from the current log file.
5ac549c… lmata 263 // Only supported for "jsonl" format; other formats return nil, nil.
5ac549c… lmata 264 func (s *FileStore) Query(channel string, limit int) ([]Entry, error) {
5ac549c… lmata 265 if s.cfg.Format != "jsonl" {
5ac549c… lmata 266 return nil, nil
5ac549c… lmata 267 }
5ac549c… lmata 268 s.mu.Lock()
5ac549c… lmata 269 path := s.filePath(keyFor(s.cfg.PerChannel, channel), time.Now())
5ac549c… lmata 270 s.mu.Unlock()
5ac549c… lmata 271
5ac549c… lmata 272 f, err := os.Open(path)
5ac549c… lmata 273 if os.IsNotExist(err) {
5ac549c… lmata 274 return nil, nil
5ac549c… lmata 275 }
5ac549c… lmata 276 if err != nil {
5ac549c… lmata 277 return nil, err
5ac549c… lmata 278 }
5ac549c… lmata 279 defer f.Close()
5ac549c… lmata 280
5ac549c… lmata 281 var entries []Entry
5ac549c… lmata 282 scanner := bufio.NewScanner(f)
5ac549c… lmata 283 for scanner.Scan() {
5ac549c… lmata 284 line := scanner.Bytes()
5ac549c… lmata 285 if len(line) == 0 {
5ac549c… lmata 286 continue
5ac549c… lmata 287 }
5ac549c… lmata 288 var e Entry
5ac549c… lmata 289 if json.Unmarshal(line, &e) != nil {
5ac549c… lmata 290 continue
5ac549c… lmata 291 }
5ac549c… lmata 292 if channel == "" || e.Channel == channel {
5ac549c… lmata 293 entries = append(entries, e)
5ac549c… lmata 294 }
5ac549c… lmata 295 }
5ac549c… lmata 296 if err := scanner.Err(); err != nil {
5ac549c… lmata 297 return nil, err
5ac549c… lmata 298 }
5ac549c… lmata 299 if limit > 0 && len(entries) > limit {
5ac549c… lmata 300 entries = entries[len(entries)-limit:]
5ac549c… lmata 301 }
5ac549c… lmata 302 return entries, nil
5ac549c… lmata 303 }
5ac549c… lmata 304
5ac549c… lmata 305 // Close flushes and closes all open file handles.
5ac549c… lmata 306 func (s *FileStore) Close() {
5ac549c… lmata 307 s.mu.Lock()
5ac549c… lmata 308 defer s.mu.Unlock()
5ac549c… lmata 309 for key, of := range s.files {
5ac549c… lmata 310 _ = of.f.Close()
5ac549c… lmata 311 delete(s.files, key)
5ac549c… lmata 312 }
5ac549c… lmata 313 }
5ac549c… lmata 314
5ac549c… lmata 315 // PruneOld removes log files whose modification time is older than MaxAgeDays.
5ac549c… lmata 316 // No-op if MaxAgeDays is 0 or Dir is empty.
5ac549c… lmata 317 func (s *FileStore) PruneOld() error {
5ac549c… lmata 318 if s.cfg.MaxAgeDays <= 0 || s.cfg.Dir == "" {
5ac549c… lmata 319 return nil
5ac549c… lmata 320 }
5ac549c… lmata 321 cutoff := time.Now().AddDate(0, 0, -s.cfg.MaxAgeDays)
5ac549c… lmata 322 entries, err := os.ReadDir(s.cfg.Dir)
5ac549c… lmata 323 if err != nil {
5ac549c… lmata 324 if os.IsNotExist(err) {
5ac549c… lmata 325 return nil
5ac549c… lmata 326 }
5ac549c… lmata 327 return err
5ac549c… lmata 328 }
5ac549c… lmata 329 for _, de := range entries {
5ac549c… lmata 330 if de.IsDir() {
5ac549c… lmata 331 continue
5ac549c… lmata 332 }
5ac549c… lmata 333 info, err := de.Info()
5ac549c… lmata 334 if err != nil {
5ac549c… lmata 335 continue
5ac549c… lmata 336 }
5ac549c… lmata 337 if info.ModTime().Before(cutoff) {
5ac549c… lmata 338 _ = os.Remove(filepath.Join(s.cfg.Dir, de.Name()))
5ac549c… lmata 339 }
5ac549c… lmata 340 }
5ac549c… lmata 341 return nil
5ac549c… lmata 342 }
5ac549c… lmata 343
5ac549c… lmata 344 // ---------------------------------------------------------------------------
5ac549c… lmata 345 // helpers
5ac549c… lmata 346 // ---------------------------------------------------------------------------
5ac549c… lmata 347
5ac549c… lmata 348 func keyFor(perChannel bool, channel string) string {
5ac549c… lmata 349 if perChannel && channel != "" {
5ac549c… lmata 350 return sanitizeChannel(channel)
5ac549c… lmata 351 }
5ac549c… lmata 352 return "_all"
5ac549c… lmata 353 }
5ac549c… lmata 354
5ac549c… lmata 355 // sanitizeChannel strips "#" and replaces filesystem-unsafe characters.
5ac549c… lmata 356 func sanitizeChannel(ch string) string {
5ac549c… lmata 357 ch = strings.TrimPrefix(ch, "#")
5ac549c… lmata 358 return strings.Map(func(r rune) rune {
5ac549c… lmata 359 switch r {
5ac549c… lmata 360 case '/', '\\', ':', '*', '?', '"', '<', '>', '|':
5ac549c… lmata 361 return '_'
5ac549c… lmata 362 }
5ac549c… lmata 363 return r
5ac549c… lmata 364 }, ch)
5ac549c… lmata 365 }
5ac549c… lmata 366
5ac549c… lmata 367 // csvField wraps a field in double-quotes if it contains a comma, quote, or newline.
5ac549c… lmata 368 func csvField(s string) string {
5ac549c… lmata 369 if strings.ContainsAny(s, "\",\n") {
5ac549c… lmata 370 return `"` + strings.ReplaceAll(s, `"`, `""`) + `"`
5ac549c… lmata 371 }
5ac549c… lmata 372 return s
c12ba92… lmata 373 }

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button