|
1
|
package scribe |
|
2
|
|
|
3
|
import ( |
|
4
|
"bufio" |
|
5
|
"encoding/json" |
|
6
|
"fmt" |
|
7
|
"os" |
|
8
|
"path/filepath" |
|
9
|
"strings" |
|
10
|
"sync" |
|
11
|
"time" |
|
12
|
) |
|
13
|
|
|
14
|
// EntryKind describes how a log entry was parsed. |
|
15
|
type EntryKind string |
|
16
|
|
|
17
|
const ( |
|
18
|
EntryKindEnvelope EntryKind = "envelope" // parsed as a valid JSON envelope |
|
19
|
EntryKindRaw EntryKind = "raw" // could not be parsed, logged as-is |
|
20
|
) |
|
21
|
|
|
22
|
// Entry is a single structured log record written by scribe. |
|
23
|
type Entry struct { |
|
24
|
At time.Time `json:"at"` |
|
25
|
Channel string `json:"channel"` |
|
26
|
Nick string `json:"nick"` |
|
27
|
Kind EntryKind `json:"kind"` |
|
28
|
MessageType string `json:"message_type,omitempty"` // envelope type if Kind == envelope |
|
29
|
MessageID string `json:"message_id,omitempty"` // envelope ID if Kind == envelope |
|
30
|
Raw string `json:"raw"` |
|
31
|
} |
|
32
|
|
|
33
|
// Store is the storage backend for scribe log entries. |
|
34
|
type Store interface { |
|
35
|
Append(entry Entry) error |
|
36
|
Query(channel string, limit int) ([]Entry, error) |
|
37
|
} |
|
38
|
|
|
39
|
// MemoryStore is an in-memory Store used for testing. |
|
40
|
type MemoryStore struct { |
|
41
|
mu sync.RWMutex |
|
42
|
entries []Entry |
|
43
|
} |
|
44
|
|
|
45
|
func (s *MemoryStore) Append(entry Entry) error { |
|
46
|
s.mu.Lock() |
|
47
|
defer s.mu.Unlock() |
|
48
|
s.entries = append(s.entries, entry) |
|
49
|
return nil |
|
50
|
} |
|
51
|
|
|
52
|
func (s *MemoryStore) Query(channel string, limit int) ([]Entry, error) { |
|
53
|
s.mu.RLock() |
|
54
|
defer s.mu.RUnlock() |
|
55
|
|
|
56
|
var out []Entry |
|
57
|
for _, e := range s.entries { |
|
58
|
if channel == "" || e.Channel == channel { |
|
59
|
out = append(out, e) |
|
60
|
} |
|
61
|
} |
|
62
|
if limit > 0 && len(out) > limit { |
|
63
|
out = out[len(out)-limit:] |
|
64
|
} |
|
65
|
return out, nil |
|
66
|
} |
|
67
|
|
|
68
|
// All returns all entries (test helper). |
|
69
|
func (s *MemoryStore) All() []Entry { |
|
70
|
s.mu.RLock() |
|
71
|
defer s.mu.RUnlock() |
|
72
|
out := make([]Entry, len(s.entries)) |
|
73
|
copy(out, s.entries) |
|
74
|
return out |
|
75
|
} |
|
76
|
|
|
77
|
// --------------------------------------------------------------------------- |
|
78
|
// FileStore |
|
79
|
// --------------------------------------------------------------------------- |
|
80
|
|
|
81
|
// FileStoreConfig controls FileStore behaviour. |
|
82
|
type FileStoreConfig struct { |
|
83
|
Dir string // base directory; created on first write if absent |
|
84
|
Format string // "jsonl" (default) | "csv" | "text" |
|
85
|
Rotation string // "none" (default) | "daily" | "weekly" | "size" |
|
86
|
MaxSizeMB int // size rotation threshold in MiB; 0 = no limit |
|
87
|
PerChannel bool // true: one file per channel; false: single combined file |
|
88
|
MaxAgeDays int // prune rotated files older than N days; 0 = keep all |
|
89
|
} |
|
90
|
|
|
91
|
// FileStore writes log entries to rotating files on disk. |
|
92
|
// It is safe for concurrent use. |
|
93
|
type FileStore struct { |
|
94
|
cfg FileStoreConfig |
|
95
|
mu sync.Mutex |
|
96
|
files map[string]*openFile // key: sanitized channel name or "_all" |
|
97
|
} |
|
98
|
|
|
99
|
type openFile struct { |
|
100
|
f *os.File |
|
101
|
size int64 |
|
102
|
bucket string // date bucket for time-based rotation ("YYYY-MM-DD", "YYYY-Www") |
|
103
|
path string // absolute path of the current file (for size rotation) |
|
104
|
} |
|
105
|
|
|
106
|
// NewFileStore creates a FileStore with the given config. |
|
107
|
// Defaults: Format="jsonl", Rotation="none". |
|
108
|
func NewFileStore(cfg FileStoreConfig) *FileStore { |
|
109
|
if cfg.Format == "" { |
|
110
|
cfg.Format = "jsonl" |
|
111
|
} |
|
112
|
if cfg.Rotation == "" { |
|
113
|
cfg.Rotation = "none" |
|
114
|
} |
|
115
|
return &FileStore{ |
|
116
|
cfg: cfg, |
|
117
|
files: make(map[string]*openFile), |
|
118
|
} |
|
119
|
} |
|
120
|
|
|
121
|
func (s *FileStore) Append(entry Entry) error { |
|
122
|
s.mu.Lock() |
|
123
|
defer s.mu.Unlock() |
|
124
|
|
|
125
|
if err := os.MkdirAll(s.cfg.Dir, 0755); err != nil { |
|
126
|
return fmt.Errorf("scribe filestore: mkdir %s: %w", s.cfg.Dir, err) |
|
127
|
} |
|
128
|
|
|
129
|
key := "_all" |
|
130
|
if s.cfg.PerChannel { |
|
131
|
key = sanitizeChannel(entry.Channel) |
|
132
|
} |
|
133
|
|
|
134
|
of, err := s.getFile(key, entry.At) |
|
135
|
if err != nil { |
|
136
|
return err |
|
137
|
} |
|
138
|
|
|
139
|
line, err := s.formatEntry(entry) |
|
140
|
if err != nil { |
|
141
|
return err |
|
142
|
} |
|
143
|
|
|
144
|
n, err := fmt.Fprintln(of.f, line) |
|
145
|
if err != nil { |
|
146
|
return fmt.Errorf("scribe filestore: write: %w", err) |
|
147
|
} |
|
148
|
of.size += int64(n) |
|
149
|
return nil |
|
150
|
} |
|
151
|
|
|
152
|
// getFile returns the open file for the given key, rotating if necessary. |
|
153
|
// Caller must hold s.mu. |
|
154
|
func (s *FileStore) getFile(key string, now time.Time) (*openFile, error) { |
|
155
|
of := s.files[key] |
|
156
|
bucket := s.timeBucket(now) |
|
157
|
|
|
158
|
if of != nil { |
|
159
|
needRotate := false |
|
160
|
switch s.cfg.Rotation { |
|
161
|
case "daily", "weekly": |
|
162
|
needRotate = of.bucket != bucket |
|
163
|
case "size": |
|
164
|
if s.cfg.MaxSizeMB > 0 { |
|
165
|
needRotate = of.size >= int64(s.cfg.MaxSizeMB)*1024*1024 |
|
166
|
} |
|
167
|
} |
|
168
|
if needRotate { |
|
169
|
_ = of.f.Close() |
|
170
|
if s.cfg.Rotation == "size" { |
|
171
|
s.shiftSizeBackups(of.path) |
|
172
|
} |
|
173
|
of = nil |
|
174
|
delete(s.files, key) |
|
175
|
} |
|
176
|
} |
|
177
|
|
|
178
|
if of == nil { |
|
179
|
path := s.filePath(key, now) |
|
180
|
f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) |
|
181
|
if err != nil { |
|
182
|
return nil, fmt.Errorf("scribe filestore: open %s: %w", path, err) |
|
183
|
} |
|
184
|
var size int64 |
|
185
|
if fi, err := f.Stat(); err == nil { |
|
186
|
size = fi.Size() |
|
187
|
} |
|
188
|
of = &openFile{f: f, size: size, bucket: bucket, path: path} |
|
189
|
s.files[key] = of |
|
190
|
} |
|
191
|
return of, nil |
|
192
|
} |
|
193
|
|
|
194
|
// shiftSizeBackups renames path → path.1, path.1 → path.2 … up to .10. |
|
195
|
func (s *FileStore) shiftSizeBackups(path string) { |
|
196
|
for i := 9; i >= 1; i-- { |
|
197
|
_ = os.Rename(fmt.Sprintf("%s.%d", path, i), fmt.Sprintf("%s.%d", path, i+1)) |
|
198
|
} |
|
199
|
_ = os.Rename(path, path+".1") |
|
200
|
} |
|
201
|
|
|
202
|
func (s *FileStore) timeBucket(t time.Time) string { |
|
203
|
switch s.cfg.Rotation { |
|
204
|
case "daily": |
|
205
|
return t.Format("2006-01-02") |
|
206
|
case "weekly": |
|
207
|
year, week := t.ISOWeek() |
|
208
|
return fmt.Sprintf("%04d-W%02d", year, week) |
|
209
|
case "monthly": |
|
210
|
return t.Format("2006-01") |
|
211
|
case "yearly": |
|
212
|
return t.Format("2006") |
|
213
|
default: |
|
214
|
return "current" |
|
215
|
} |
|
216
|
} |
|
217
|
|
|
218
|
func (s *FileStore) filePath(key string, now time.Time) string { |
|
219
|
extMap := map[string]string{"jsonl": ".jsonl", "csv": ".csv", "text": ".log"} |
|
220
|
ext, ok := extMap[s.cfg.Format] |
|
221
|
if !ok { |
|
222
|
ext = ".jsonl" |
|
223
|
} |
|
224
|
|
|
225
|
var suffix string |
|
226
|
switch s.cfg.Rotation { |
|
227
|
case "daily": |
|
228
|
suffix = "-" + now.Format("2006-01-02") |
|
229
|
case "weekly": |
|
230
|
year, week := now.ISOWeek() |
|
231
|
suffix = fmt.Sprintf("-%04d-W%02d", year, week) |
|
232
|
case "monthly": |
|
233
|
suffix = "-" + now.Format("2006-01") |
|
234
|
case "yearly": |
|
235
|
suffix = "-" + now.Format("2006") |
|
236
|
} |
|
237
|
return filepath.Join(s.cfg.Dir, key+suffix+ext) |
|
238
|
} |
|
239
|
|
|
240
|
func (s *FileStore) formatEntry(e Entry) (string, error) { |
|
241
|
switch s.cfg.Format { |
|
242
|
case "csv": |
|
243
|
return strings.Join([]string{ |
|
244
|
e.At.Format(time.RFC3339), |
|
245
|
csvField(e.Channel), |
|
246
|
csvField(e.Nick), |
|
247
|
string(e.Kind), |
|
248
|
csvField(e.MessageType), |
|
249
|
csvField(e.MessageID), |
|
250
|
csvField(e.Raw), |
|
251
|
}, ","), nil |
|
252
|
case "text": |
|
253
|
return fmt.Sprintf("%s %s <%s> %s", |
|
254
|
e.At.Format("2006-01-02T15:04:05"), |
|
255
|
e.Channel, e.Nick, e.Raw), nil |
|
256
|
default: // jsonl |
|
257
|
b, err := json.Marshal(e) |
|
258
|
return string(b), err |
|
259
|
} |
|
260
|
} |
|
261
|
|
|
262
|
// Query returns the most recent entries from the current log file. |
|
263
|
// Only supported for "jsonl" format; other formats return nil, nil. |
|
264
|
func (s *FileStore) Query(channel string, limit int) ([]Entry, error) { |
|
265
|
if s.cfg.Format != "jsonl" { |
|
266
|
return nil, nil |
|
267
|
} |
|
268
|
s.mu.Lock() |
|
269
|
path := s.filePath(keyFor(s.cfg.PerChannel, channel), time.Now()) |
|
270
|
s.mu.Unlock() |
|
271
|
|
|
272
|
f, err := os.Open(path) |
|
273
|
if os.IsNotExist(err) { |
|
274
|
return nil, nil |
|
275
|
} |
|
276
|
if err != nil { |
|
277
|
return nil, err |
|
278
|
} |
|
279
|
defer f.Close() |
|
280
|
|
|
281
|
var entries []Entry |
|
282
|
scanner := bufio.NewScanner(f) |
|
283
|
for scanner.Scan() { |
|
284
|
line := scanner.Bytes() |
|
285
|
if len(line) == 0 { |
|
286
|
continue |
|
287
|
} |
|
288
|
var e Entry |
|
289
|
if json.Unmarshal(line, &e) != nil { |
|
290
|
continue |
|
291
|
} |
|
292
|
if channel == "" || e.Channel == channel { |
|
293
|
entries = append(entries, e) |
|
294
|
} |
|
295
|
} |
|
296
|
if err := scanner.Err(); err != nil { |
|
297
|
return nil, err |
|
298
|
} |
|
299
|
if limit > 0 && len(entries) > limit { |
|
300
|
entries = entries[len(entries)-limit:] |
|
301
|
} |
|
302
|
return entries, nil |
|
303
|
} |
|
304
|
|
|
305
|
// Close flushes and closes all open file handles. |
|
306
|
func (s *FileStore) Close() { |
|
307
|
s.mu.Lock() |
|
308
|
defer s.mu.Unlock() |
|
309
|
for key, of := range s.files { |
|
310
|
_ = of.f.Close() |
|
311
|
delete(s.files, key) |
|
312
|
} |
|
313
|
} |
|
314
|
|
|
315
|
// PruneOld removes log files whose modification time is older than MaxAgeDays. |
|
316
|
// No-op if MaxAgeDays is 0 or Dir is empty. |
|
317
|
func (s *FileStore) PruneOld() error { |
|
318
|
if s.cfg.MaxAgeDays <= 0 || s.cfg.Dir == "" { |
|
319
|
return nil |
|
320
|
} |
|
321
|
cutoff := time.Now().AddDate(0, 0, -s.cfg.MaxAgeDays) |
|
322
|
entries, err := os.ReadDir(s.cfg.Dir) |
|
323
|
if err != nil { |
|
324
|
if os.IsNotExist(err) { |
|
325
|
return nil |
|
326
|
} |
|
327
|
return err |
|
328
|
} |
|
329
|
for _, de := range entries { |
|
330
|
if de.IsDir() { |
|
331
|
continue |
|
332
|
} |
|
333
|
info, err := de.Info() |
|
334
|
if err != nil { |
|
335
|
continue |
|
336
|
} |
|
337
|
if info.ModTime().Before(cutoff) { |
|
338
|
_ = os.Remove(filepath.Join(s.cfg.Dir, de.Name())) |
|
339
|
} |
|
340
|
} |
|
341
|
return nil |
|
342
|
} |
|
343
|
|
|
344
|
// --------------------------------------------------------------------------- |
|
345
|
// helpers |
|
346
|
// --------------------------------------------------------------------------- |
|
347
|
|
|
348
|
func keyFor(perChannel bool, channel string) string { |
|
349
|
if perChannel && channel != "" { |
|
350
|
return sanitizeChannel(channel) |
|
351
|
} |
|
352
|
return "_all" |
|
353
|
} |
|
354
|
|
|
355
|
// sanitizeChannel strips "#" and replaces filesystem-unsafe characters. |
|
356
|
func sanitizeChannel(ch string) string { |
|
357
|
ch = strings.TrimPrefix(ch, "#") |
|
358
|
return strings.Map(func(r rune) rune { |
|
359
|
switch r { |
|
360
|
case '/', '\\', ':', '*', '?', '"', '<', '>', '|': |
|
361
|
return '_' |
|
362
|
} |
|
363
|
return r |
|
364
|
}, ch) |
|
365
|
} |
|
366
|
|
|
367
|
// csvField wraps a field in double-quotes if it contains a comma, quote, or newline. |
|
368
|
func csvField(s string) string { |
|
369
|
if strings.ContainsAny(s, "\",\n") { |
|
370
|
return `"` + strings.ReplaceAll(s, `"`, `""`) + `"` |
|
371
|
} |
|
372
|
return s |
|
373
|
} |
|
374
|
|