ScuttleBot

scuttlebot / internal / bots / scribe / store.go
Blame History Raw 374 lines
1
package scribe
2
3
import (
4
"bufio"
5
"encoding/json"
6
"fmt"
7
"os"
8
"path/filepath"
9
"strings"
10
"sync"
11
"time"
12
)
13
14
// EntryKind describes how a log entry was parsed.
15
type EntryKind string
16
17
const (
18
EntryKindEnvelope EntryKind = "envelope" // parsed as a valid JSON envelope
19
EntryKindRaw EntryKind = "raw" // could not be parsed, logged as-is
20
)
21
22
// Entry is a single structured log record written by scribe.
23
type Entry struct {
24
At time.Time `json:"at"`
25
Channel string `json:"channel"`
26
Nick string `json:"nick"`
27
Kind EntryKind `json:"kind"`
28
MessageType string `json:"message_type,omitempty"` // envelope type if Kind == envelope
29
MessageID string `json:"message_id,omitempty"` // envelope ID if Kind == envelope
30
Raw string `json:"raw"`
31
}
32
33
// Store is the storage backend for scribe log entries.
34
type Store interface {
35
Append(entry Entry) error
36
Query(channel string, limit int) ([]Entry, error)
37
}
38
39
// MemoryStore is an in-memory Store used for testing.
40
type MemoryStore struct {
41
mu sync.RWMutex
42
entries []Entry
43
}
44
45
func (s *MemoryStore) Append(entry Entry) error {
46
s.mu.Lock()
47
defer s.mu.Unlock()
48
s.entries = append(s.entries, entry)
49
return nil
50
}
51
52
func (s *MemoryStore) Query(channel string, limit int) ([]Entry, error) {
53
s.mu.RLock()
54
defer s.mu.RUnlock()
55
56
var out []Entry
57
for _, e := range s.entries {
58
if channel == "" || e.Channel == channel {
59
out = append(out, e)
60
}
61
}
62
if limit > 0 && len(out) > limit {
63
out = out[len(out)-limit:]
64
}
65
return out, nil
66
}
67
68
// All returns all entries (test helper).
69
func (s *MemoryStore) All() []Entry {
70
s.mu.RLock()
71
defer s.mu.RUnlock()
72
out := make([]Entry, len(s.entries))
73
copy(out, s.entries)
74
return out
75
}
76
77
// ---------------------------------------------------------------------------
78
// FileStore
79
// ---------------------------------------------------------------------------
80
81
// FileStoreConfig controls FileStore behaviour.
82
type FileStoreConfig struct {
83
Dir string // base directory; created on first write if absent
84
Format string // "jsonl" (default) | "csv" | "text"
85
Rotation string // "none" (default) | "daily" | "weekly" | "size"
86
MaxSizeMB int // size rotation threshold in MiB; 0 = no limit
87
PerChannel bool // true: one file per channel; false: single combined file
88
MaxAgeDays int // prune rotated files older than N days; 0 = keep all
89
}
90
91
// FileStore writes log entries to rotating files on disk.
92
// It is safe for concurrent use.
93
type FileStore struct {
94
cfg FileStoreConfig
95
mu sync.Mutex
96
files map[string]*openFile // key: sanitized channel name or "_all"
97
}
98
99
type openFile struct {
100
f *os.File
101
size int64
102
bucket string // date bucket for time-based rotation ("YYYY-MM-DD", "YYYY-Www")
103
path string // absolute path of the current file (for size rotation)
104
}
105
106
// NewFileStore creates a FileStore with the given config.
107
// Defaults: Format="jsonl", Rotation="none".
108
func NewFileStore(cfg FileStoreConfig) *FileStore {
109
if cfg.Format == "" {
110
cfg.Format = "jsonl"
111
}
112
if cfg.Rotation == "" {
113
cfg.Rotation = "none"
114
}
115
return &FileStore{
116
cfg: cfg,
117
files: make(map[string]*openFile),
118
}
119
}
120
121
func (s *FileStore) Append(entry Entry) error {
122
s.mu.Lock()
123
defer s.mu.Unlock()
124
125
if err := os.MkdirAll(s.cfg.Dir, 0755); err != nil {
126
return fmt.Errorf("scribe filestore: mkdir %s: %w", s.cfg.Dir, err)
127
}
128
129
key := "_all"
130
if s.cfg.PerChannel {
131
key = sanitizeChannel(entry.Channel)
132
}
133
134
of, err := s.getFile(key, entry.At)
135
if err != nil {
136
return err
137
}
138
139
line, err := s.formatEntry(entry)
140
if err != nil {
141
return err
142
}
143
144
n, err := fmt.Fprintln(of.f, line)
145
if err != nil {
146
return fmt.Errorf("scribe filestore: write: %w", err)
147
}
148
of.size += int64(n)
149
return nil
150
}
151
152
// getFile returns the open file for the given key, rotating if necessary.
153
// Caller must hold s.mu.
154
func (s *FileStore) getFile(key string, now time.Time) (*openFile, error) {
155
of := s.files[key]
156
bucket := s.timeBucket(now)
157
158
if of != nil {
159
needRotate := false
160
switch s.cfg.Rotation {
161
case "daily", "weekly":
162
needRotate = of.bucket != bucket
163
case "size":
164
if s.cfg.MaxSizeMB > 0 {
165
needRotate = of.size >= int64(s.cfg.MaxSizeMB)*1024*1024
166
}
167
}
168
if needRotate {
169
_ = of.f.Close()
170
if s.cfg.Rotation == "size" {
171
s.shiftSizeBackups(of.path)
172
}
173
of = nil
174
delete(s.files, key)
175
}
176
}
177
178
if of == nil {
179
path := s.filePath(key, now)
180
f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
181
if err != nil {
182
return nil, fmt.Errorf("scribe filestore: open %s: %w", path, err)
183
}
184
var size int64
185
if fi, err := f.Stat(); err == nil {
186
size = fi.Size()
187
}
188
of = &openFile{f: f, size: size, bucket: bucket, path: path}
189
s.files[key] = of
190
}
191
return of, nil
192
}
193
194
// shiftSizeBackups renames path → path.1, path.1 → path.2 … up to .10.
195
func (s *FileStore) shiftSizeBackups(path string) {
196
for i := 9; i >= 1; i-- {
197
_ = os.Rename(fmt.Sprintf("%s.%d", path, i), fmt.Sprintf("%s.%d", path, i+1))
198
}
199
_ = os.Rename(path, path+".1")
200
}
201
202
func (s *FileStore) timeBucket(t time.Time) string {
203
switch s.cfg.Rotation {
204
case "daily":
205
return t.Format("2006-01-02")
206
case "weekly":
207
year, week := t.ISOWeek()
208
return fmt.Sprintf("%04d-W%02d", year, week)
209
case "monthly":
210
return t.Format("2006-01")
211
case "yearly":
212
return t.Format("2006")
213
default:
214
return "current"
215
}
216
}
217
218
func (s *FileStore) filePath(key string, now time.Time) string {
219
extMap := map[string]string{"jsonl": ".jsonl", "csv": ".csv", "text": ".log"}
220
ext, ok := extMap[s.cfg.Format]
221
if !ok {
222
ext = ".jsonl"
223
}
224
225
var suffix string
226
switch s.cfg.Rotation {
227
case "daily":
228
suffix = "-" + now.Format("2006-01-02")
229
case "weekly":
230
year, week := now.ISOWeek()
231
suffix = fmt.Sprintf("-%04d-W%02d", year, week)
232
case "monthly":
233
suffix = "-" + now.Format("2006-01")
234
case "yearly":
235
suffix = "-" + now.Format("2006")
236
}
237
return filepath.Join(s.cfg.Dir, key+suffix+ext)
238
}
239
240
func (s *FileStore) formatEntry(e Entry) (string, error) {
241
switch s.cfg.Format {
242
case "csv":
243
return strings.Join([]string{
244
e.At.Format(time.RFC3339),
245
csvField(e.Channel),
246
csvField(e.Nick),
247
string(e.Kind),
248
csvField(e.MessageType),
249
csvField(e.MessageID),
250
csvField(e.Raw),
251
}, ","), nil
252
case "text":
253
return fmt.Sprintf("%s %s <%s> %s",
254
e.At.Format("2006-01-02T15:04:05"),
255
e.Channel, e.Nick, e.Raw), nil
256
default: // jsonl
257
b, err := json.Marshal(e)
258
return string(b), err
259
}
260
}
261
262
// Query returns the most recent entries from the current log file.
263
// Only supported for "jsonl" format; other formats return nil, nil.
264
func (s *FileStore) Query(channel string, limit int) ([]Entry, error) {
265
if s.cfg.Format != "jsonl" {
266
return nil, nil
267
}
268
s.mu.Lock()
269
path := s.filePath(keyFor(s.cfg.PerChannel, channel), time.Now())
270
s.mu.Unlock()
271
272
f, err := os.Open(path)
273
if os.IsNotExist(err) {
274
return nil, nil
275
}
276
if err != nil {
277
return nil, err
278
}
279
defer f.Close()
280
281
var entries []Entry
282
scanner := bufio.NewScanner(f)
283
for scanner.Scan() {
284
line := scanner.Bytes()
285
if len(line) == 0 {
286
continue
287
}
288
var e Entry
289
if json.Unmarshal(line, &e) != nil {
290
continue
291
}
292
if channel == "" || e.Channel == channel {
293
entries = append(entries, e)
294
}
295
}
296
if err := scanner.Err(); err != nil {
297
return nil, err
298
}
299
if limit > 0 && len(entries) > limit {
300
entries = entries[len(entries)-limit:]
301
}
302
return entries, nil
303
}
304
305
// Close flushes and closes all open file handles.
306
func (s *FileStore) Close() {
307
s.mu.Lock()
308
defer s.mu.Unlock()
309
for key, of := range s.files {
310
_ = of.f.Close()
311
delete(s.files, key)
312
}
313
}
314
315
// PruneOld removes log files whose modification time is older than MaxAgeDays.
316
// No-op if MaxAgeDays is 0 or Dir is empty.
317
func (s *FileStore) PruneOld() error {
318
if s.cfg.MaxAgeDays <= 0 || s.cfg.Dir == "" {
319
return nil
320
}
321
cutoff := time.Now().AddDate(0, 0, -s.cfg.MaxAgeDays)
322
entries, err := os.ReadDir(s.cfg.Dir)
323
if err != nil {
324
if os.IsNotExist(err) {
325
return nil
326
}
327
return err
328
}
329
for _, de := range entries {
330
if de.IsDir() {
331
continue
332
}
333
info, err := de.Info()
334
if err != nil {
335
continue
336
}
337
if info.ModTime().Before(cutoff) {
338
_ = os.Remove(filepath.Join(s.cfg.Dir, de.Name()))
339
}
340
}
341
return nil
342
}
343
344
// ---------------------------------------------------------------------------
345
// helpers
346
// ---------------------------------------------------------------------------
347
348
func keyFor(perChannel bool, channel string) string {
349
if perChannel && channel != "" {
350
return sanitizeChannel(channel)
351
}
352
return "_all"
353
}
354
355
// sanitizeChannel strips "#" and replaces filesystem-unsafe characters.
356
func sanitizeChannel(ch string) string {
357
ch = strings.TrimPrefix(ch, "#")
358
return strings.Map(func(r rune) rune {
359
switch r {
360
case '/', '\\', ':', '*', '?', '"', '<', '>', '|':
361
return '_'
362
}
363
return r
364
}, ch)
365
}
366
367
// csvField wraps a field in double-quotes if it contains a comma, quote, or newline.
368
func csvField(s string) string {
369
if strings.ContainsAny(s, "\",\n") {
370
return `"` + strings.ReplaceAll(s, `"`, `""`) + `"`
371
}
372
return s
373
}
374

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button