|
1
|
// Package herald implements the herald bot — alert and notification delivery. |
|
2
|
// |
|
3
|
// External systems push events to herald via Emit(); herald routes them to |
|
4
|
// IRC channels based on event type. Supports agent mentions/highlights and |
|
5
|
// rate limiting (burst allowed, sustained flood protection). |
|
6
|
// |
|
7
|
// Event routing is configured per-type in RouteConfig. Unrouted events are |
|
8
|
// dropped with a warning log. |
|
9
|
package herald |
|
10
|
|
|
11
|
import ( |
|
12
|
"context" |
|
13
|
"fmt" |
|
14
|
"log/slog" |
|
15
|
"net" |
|
16
|
"strconv" |
|
17
|
"strings" |
|
18
|
"sync" |
|
19
|
"time" |
|
20
|
|
|
21
|
"github.com/lrstanley/girc" |
|
22
|
|
|
23
|
"github.com/conflicthq/scuttlebot/internal/bots/cmdparse" |
|
24
|
) |
|
25
|
|
|
26
|
const botNick = "herald" |
|
27
|
|
|
28
|
// Event is a notification pushed to herald for delivery. |
|
29
|
type Event struct { |
|
30
|
// Type identifies the event (e.g. "ci.build.failed", "deploy.complete"). |
|
31
|
Type string |
|
32
|
|
|
33
|
// Channel overrides the default route for this event type. |
|
34
|
// If empty, the RouteConfig default is used. |
|
35
|
Channel string |
|
36
|
|
|
37
|
// Message is the human-readable notification text. |
|
38
|
Message string |
|
39
|
|
|
40
|
// MentionNicks are agent nicks to highlight in the message. |
|
41
|
MentionNicks []string |
|
42
|
} |
|
43
|
|
|
44
|
// RouteConfig maps event types to IRC channels. |
|
45
|
type RouteConfig struct { |
|
46
|
// Routes maps event type prefixes to channels. |
|
47
|
// Key can be an exact type ("ci.build.failed") or a prefix ("ci."). |
|
48
|
// Longest match wins. |
|
49
|
Routes map[string]string |
|
50
|
|
|
51
|
// DefaultChannel is used when no route matches. |
|
52
|
// If empty, unrouted events are dropped. |
|
53
|
DefaultChannel string |
|
54
|
} |
|
55
|
|
|
56
|
// RateLimiter is a simple token-bucket rate limiter. |
|
57
|
type RateLimiter struct { |
|
58
|
mu sync.Mutex |
|
59
|
tokens float64 |
|
60
|
maxBurst float64 |
|
61
|
rate float64 // tokens per second |
|
62
|
last time.Time |
|
63
|
} |
|
64
|
|
|
65
|
func newRateLimiter(ratePerSec float64, burst int) *RateLimiter { |
|
66
|
return &RateLimiter{ |
|
67
|
tokens: float64(burst), |
|
68
|
maxBurst: float64(burst), |
|
69
|
rate: ratePerSec, |
|
70
|
last: time.Now(), |
|
71
|
} |
|
72
|
} |
|
73
|
|
|
74
|
// Allow returns true if a token is available. |
|
75
|
func (r *RateLimiter) Allow() bool { |
|
76
|
r.mu.Lock() |
|
77
|
defer r.mu.Unlock() |
|
78
|
now := time.Now() |
|
79
|
elapsed := now.Sub(r.last).Seconds() |
|
80
|
r.last = now |
|
81
|
r.tokens = min(r.maxBurst, r.tokens+elapsed*r.rate) |
|
82
|
if r.tokens >= 1 { |
|
83
|
r.tokens-- |
|
84
|
return true |
|
85
|
} |
|
86
|
return false |
|
87
|
} |
|
88
|
|
|
89
|
// Bot is the herald bot. |
|
90
|
type Bot struct { |
|
91
|
ircAddr string |
|
92
|
password string |
|
93
|
channels []string |
|
94
|
routes RouteConfig |
|
95
|
limiter *RateLimiter |
|
96
|
queue chan Event |
|
97
|
log *slog.Logger |
|
98
|
client *girc.Client |
|
99
|
} |
|
100
|
|
|
101
|
const defaultQueueSize = 256 |
|
102
|
|
|
103
|
// New creates a herald bot. ratePerSec and burst configure the token-bucket |
|
104
|
// rate limiter (e.g. 5 messages/sec with burst of 20). |
|
105
|
func New(ircAddr, password string, channels []string, routes RouteConfig, ratePerSec float64, burst int, log *slog.Logger) *Bot { |
|
106
|
if ratePerSec <= 0 { |
|
107
|
ratePerSec = 5 |
|
108
|
} |
|
109
|
if burst <= 0 { |
|
110
|
burst = 20 |
|
111
|
} |
|
112
|
return &Bot{ |
|
113
|
ircAddr: ircAddr, |
|
114
|
password: password, |
|
115
|
channels: channels, |
|
116
|
routes: routes, |
|
117
|
limiter: newRateLimiter(ratePerSec, burst), |
|
118
|
queue: make(chan Event, defaultQueueSize), |
|
119
|
log: log, |
|
120
|
} |
|
121
|
} |
|
122
|
|
|
123
|
// Name returns the bot's IRC nick. |
|
124
|
func (b *Bot) Name() string { return botNick } |
|
125
|
|
|
126
|
// Emit queues an event for delivery. Non-blocking: drops the event if the |
|
127
|
// queue is full and logs a warning. |
|
128
|
func (b *Bot) Emit(e Event) { |
|
129
|
select { |
|
130
|
case b.queue <- e: |
|
131
|
default: |
|
132
|
if b.log != nil { |
|
133
|
b.log.Warn("herald: queue full, dropping event", "type", e.Type) |
|
134
|
} |
|
135
|
} |
|
136
|
} |
|
137
|
|
|
138
|
// Start connects to IRC and begins processing events. Blocks until ctx is cancelled. |
|
139
|
func (b *Bot) Start(ctx context.Context) error { |
|
140
|
host, port, err := splitHostPort(b.ircAddr) |
|
141
|
if err != nil { |
|
142
|
return fmt.Errorf("herald: parse irc addr: %w", err) |
|
143
|
} |
|
144
|
|
|
145
|
c := girc.New(girc.Config{ |
|
146
|
Server: host, |
|
147
|
Port: port, |
|
148
|
Nick: botNick, |
|
149
|
User: botNick, |
|
150
|
Name: "scuttlebot herald", |
|
151
|
SASL: &girc.SASLPlain{User: botNick, Pass: b.password}, |
|
152
|
PingDelay: 30 * time.Second, |
|
153
|
PingTimeout: 30 * time.Second, |
|
154
|
SSL: false, |
|
155
|
}) |
|
156
|
|
|
157
|
c.Handlers.AddBg(girc.CONNECTED, func(cl *girc.Client, _ girc.Event) { |
|
158
|
cl.Cmd.Mode(cl.GetNick(), "+B") |
|
159
|
for _, ch := range b.channels { |
|
160
|
cl.Cmd.Join(ch) |
|
161
|
} |
|
162
|
if b.log != nil { |
|
163
|
b.log.Info("herald connected", "channels", b.channels) |
|
164
|
} |
|
165
|
}) |
|
166
|
|
|
167
|
c.Handlers.AddBg(girc.INVITE, func(cl *girc.Client, e girc.Event) { |
|
168
|
if ch := e.Last(); strings.HasPrefix(ch, "#") { |
|
169
|
cl.Cmd.Join(ch) |
|
170
|
} |
|
171
|
}) |
|
172
|
|
|
173
|
router := cmdparse.NewRouter(botNick) |
|
174
|
router.Register(cmdparse.Command{ |
|
175
|
Name: "status", |
|
176
|
Usage: "STATUS", |
|
177
|
Description: "show webhook endpoint status and recent events", |
|
178
|
Handler: func(_ *cmdparse.Context, _ string) string { return "not implemented yet" }, |
|
179
|
}) |
|
180
|
router.Register(cmdparse.Command{ |
|
181
|
Name: "test", |
|
182
|
Usage: "TEST #channel", |
|
183
|
Description: "send a test event to a channel", |
|
184
|
Handler: func(_ *cmdparse.Context, _ string) string { return "not implemented yet" }, |
|
185
|
}) |
|
186
|
|
|
187
|
c.Handlers.AddBg(girc.PRIVMSG, func(cl *girc.Client, e girc.Event) { |
|
188
|
if len(e.Params) < 1 || e.Source == nil { |
|
189
|
return |
|
190
|
} |
|
191
|
// Dispatch commands (DMs and channel messages). |
|
192
|
if reply := router.Dispatch(e.Source.Name, e.Params[0], e.Last()); reply != nil { |
|
193
|
cl.Cmd.Message(reply.Target, reply.Text) |
|
194
|
return |
|
195
|
} |
|
196
|
}) |
|
197
|
|
|
198
|
b.client = c |
|
199
|
|
|
200
|
errCh := make(chan error, 1) |
|
201
|
go func() { |
|
202
|
if err := c.Connect(); err != nil && ctx.Err() == nil { |
|
203
|
errCh <- err |
|
204
|
} |
|
205
|
}() |
|
206
|
|
|
207
|
// Event delivery loop. |
|
208
|
go b.deliverLoop(ctx) |
|
209
|
|
|
210
|
select { |
|
211
|
case <-ctx.Done(): |
|
212
|
c.Close() |
|
213
|
return nil |
|
214
|
case err := <-errCh: |
|
215
|
return fmt.Errorf("herald: irc connection: %w", err) |
|
216
|
} |
|
217
|
} |
|
218
|
|
|
219
|
// Stop disconnects the bot. |
|
220
|
func (b *Bot) Stop() { |
|
221
|
if b.client != nil { |
|
222
|
b.client.Close() |
|
223
|
} |
|
224
|
} |
|
225
|
|
|
226
|
func (b *Bot) deliverLoop(ctx context.Context) { |
|
227
|
for { |
|
228
|
select { |
|
229
|
case <-ctx.Done(): |
|
230
|
return |
|
231
|
case evt := <-b.queue: |
|
232
|
b.deliver(evt) |
|
233
|
} |
|
234
|
} |
|
235
|
} |
|
236
|
|
|
237
|
func (b *Bot) deliver(evt Event) { |
|
238
|
channel := evt.Channel |
|
239
|
if channel == "" { |
|
240
|
channel = b.route(evt.Type) |
|
241
|
} |
|
242
|
if channel == "" { |
|
243
|
if b.log != nil { |
|
244
|
b.log.Warn("herald: no route for event, dropping", "type", evt.Type) |
|
245
|
} |
|
246
|
return |
|
247
|
} |
|
248
|
|
|
249
|
if !b.limiter.Allow() { |
|
250
|
if b.log != nil { |
|
251
|
b.log.Warn("herald: rate limited, dropping event", "type", evt.Type, "channel", channel) |
|
252
|
} |
|
253
|
return |
|
254
|
} |
|
255
|
|
|
256
|
msg := evt.Message |
|
257
|
if len(evt.MentionNicks) > 0 { |
|
258
|
msg = strings.Join(evt.MentionNicks, " ") + ": " + msg |
|
259
|
} |
|
260
|
|
|
261
|
irc := b.client |
|
262
|
if irc != nil { |
|
263
|
irc.Cmd.Message(channel, msg) |
|
264
|
} |
|
265
|
} |
|
266
|
|
|
267
|
// route finds the best-matching channel for an event type. |
|
268
|
// Longest prefix match wins. |
|
269
|
func (b *Bot) route(eventType string) string { |
|
270
|
best := "" |
|
271
|
bestLen := -1 |
|
272
|
for prefix, ch := range b.routes.Routes { |
|
273
|
if strings.HasPrefix(eventType, prefix) && len(prefix) > bestLen { |
|
274
|
best = ch |
|
275
|
bestLen = len(prefix) |
|
276
|
} |
|
277
|
} |
|
278
|
if best != "" { |
|
279
|
return best |
|
280
|
} |
|
281
|
return b.routes.DefaultChannel |
|
282
|
} |
|
283
|
|
|
284
|
func splitHostPort(addr string) (string, int, error) { |
|
285
|
host, portStr, err := net.SplitHostPort(addr) |
|
286
|
if err != nil { |
|
287
|
return "", 0, fmt.Errorf("invalid address %q: %w", addr, err) |
|
288
|
} |
|
289
|
port, err := strconv.Atoi(portStr) |
|
290
|
if err != nil { |
|
291
|
return "", 0, fmt.Errorf("invalid port in %q: %w", addr, err) |
|
292
|
} |
|
293
|
return host, port, nil |
|
294
|
} |
|
295
|
|
|
296
|
func min(a, b float64) float64 { |
|
297
|
if a < b { |
|
298
|
return a |
|
299
|
} |
|
300
|
return b |
|
301
|
} |
|
302
|
|