ScuttleBot

scuttlebot / internal / bots / herald / herald.go
Blame History Raw 302 lines
1
// Package herald implements the herald bot — alert and notification delivery.
2
//
3
// External systems push events to herald via Emit(); herald routes them to
4
// IRC channels based on event type. Supports agent mentions/highlights and
5
// rate limiting (burst allowed, sustained flood protection).
6
//
7
// Event routing is configured per-type in RouteConfig. Unrouted events are
8
// dropped with a warning log.
9
package herald
10
11
import (
12
"context"
13
"fmt"
14
"log/slog"
15
"net"
16
"strconv"
17
"strings"
18
"sync"
19
"time"
20
21
"github.com/lrstanley/girc"
22
23
"github.com/conflicthq/scuttlebot/internal/bots/cmdparse"
24
)
25
26
const botNick = "herald"
27
28
// Event is a notification pushed to herald for delivery.
29
type Event struct {
30
// Type identifies the event (e.g. "ci.build.failed", "deploy.complete").
31
Type string
32
33
// Channel overrides the default route for this event type.
34
// If empty, the RouteConfig default is used.
35
Channel string
36
37
// Message is the human-readable notification text.
38
Message string
39
40
// MentionNicks are agent nicks to highlight in the message.
41
MentionNicks []string
42
}
43
44
// RouteConfig maps event types to IRC channels.
45
type RouteConfig struct {
46
// Routes maps event type prefixes to channels.
47
// Key can be an exact type ("ci.build.failed") or a prefix ("ci.").
48
// Longest match wins.
49
Routes map[string]string
50
51
// DefaultChannel is used when no route matches.
52
// If empty, unrouted events are dropped.
53
DefaultChannel string
54
}
55
56
// RateLimiter is a simple token-bucket rate limiter.
57
type RateLimiter struct {
58
mu sync.Mutex
59
tokens float64
60
maxBurst float64
61
rate float64 // tokens per second
62
last time.Time
63
}
64
65
func newRateLimiter(ratePerSec float64, burst int) *RateLimiter {
66
return &RateLimiter{
67
tokens: float64(burst),
68
maxBurst: float64(burst),
69
rate: ratePerSec,
70
last: time.Now(),
71
}
72
}
73
74
// Allow returns true if a token is available.
75
func (r *RateLimiter) Allow() bool {
76
r.mu.Lock()
77
defer r.mu.Unlock()
78
now := time.Now()
79
elapsed := now.Sub(r.last).Seconds()
80
r.last = now
81
r.tokens = min(r.maxBurst, r.tokens+elapsed*r.rate)
82
if r.tokens >= 1 {
83
r.tokens--
84
return true
85
}
86
return false
87
}
88
89
// Bot is the herald bot.
90
type Bot struct {
91
ircAddr string
92
password string
93
channels []string
94
routes RouteConfig
95
limiter *RateLimiter
96
queue chan Event
97
log *slog.Logger
98
client *girc.Client
99
}
100
101
const defaultQueueSize = 256
102
103
// New creates a herald bot. ratePerSec and burst configure the token-bucket
104
// rate limiter (e.g. 5 messages/sec with burst of 20).
105
func New(ircAddr, password string, channels []string, routes RouteConfig, ratePerSec float64, burst int, log *slog.Logger) *Bot {
106
if ratePerSec <= 0 {
107
ratePerSec = 5
108
}
109
if burst <= 0 {
110
burst = 20
111
}
112
return &Bot{
113
ircAddr: ircAddr,
114
password: password,
115
channels: channels,
116
routes: routes,
117
limiter: newRateLimiter(ratePerSec, burst),
118
queue: make(chan Event, defaultQueueSize),
119
log: log,
120
}
121
}
122
123
// Name returns the bot's IRC nick.
124
func (b *Bot) Name() string { return botNick }
125
126
// Emit queues an event for delivery. Non-blocking: drops the event if the
127
// queue is full and logs a warning.
128
func (b *Bot) Emit(e Event) {
129
select {
130
case b.queue <- e:
131
default:
132
if b.log != nil {
133
b.log.Warn("herald: queue full, dropping event", "type", e.Type)
134
}
135
}
136
}
137
138
// Start connects to IRC and begins processing events. Blocks until ctx is cancelled.
139
func (b *Bot) Start(ctx context.Context) error {
140
host, port, err := splitHostPort(b.ircAddr)
141
if err != nil {
142
return fmt.Errorf("herald: parse irc addr: %w", err)
143
}
144
145
c := girc.New(girc.Config{
146
Server: host,
147
Port: port,
148
Nick: botNick,
149
User: botNick,
150
Name: "scuttlebot herald",
151
SASL: &girc.SASLPlain{User: botNick, Pass: b.password},
152
PingDelay: 30 * time.Second,
153
PingTimeout: 30 * time.Second,
154
SSL: false,
155
})
156
157
c.Handlers.AddBg(girc.CONNECTED, func(cl *girc.Client, _ girc.Event) {
158
cl.Cmd.Mode(cl.GetNick(), "+B")
159
for _, ch := range b.channels {
160
cl.Cmd.Join(ch)
161
}
162
if b.log != nil {
163
b.log.Info("herald connected", "channels", b.channels)
164
}
165
})
166
167
c.Handlers.AddBg(girc.INVITE, func(cl *girc.Client, e girc.Event) {
168
if ch := e.Last(); strings.HasPrefix(ch, "#") {
169
cl.Cmd.Join(ch)
170
}
171
})
172
173
router := cmdparse.NewRouter(botNick)
174
router.Register(cmdparse.Command{
175
Name: "status",
176
Usage: "STATUS",
177
Description: "show webhook endpoint status and recent events",
178
Handler: func(_ *cmdparse.Context, _ string) string { return "not implemented yet" },
179
})
180
router.Register(cmdparse.Command{
181
Name: "test",
182
Usage: "TEST #channel",
183
Description: "send a test event to a channel",
184
Handler: func(_ *cmdparse.Context, _ string) string { return "not implemented yet" },
185
})
186
187
c.Handlers.AddBg(girc.PRIVMSG, func(cl *girc.Client, e girc.Event) {
188
if len(e.Params) < 1 || e.Source == nil {
189
return
190
}
191
// Dispatch commands (DMs and channel messages).
192
if reply := router.Dispatch(e.Source.Name, e.Params[0], e.Last()); reply != nil {
193
cl.Cmd.Message(reply.Target, reply.Text)
194
return
195
}
196
})
197
198
b.client = c
199
200
errCh := make(chan error, 1)
201
go func() {
202
if err := c.Connect(); err != nil && ctx.Err() == nil {
203
errCh <- err
204
}
205
}()
206
207
// Event delivery loop.
208
go b.deliverLoop(ctx)
209
210
select {
211
case <-ctx.Done():
212
c.Close()
213
return nil
214
case err := <-errCh:
215
return fmt.Errorf("herald: irc connection: %w", err)
216
}
217
}
218
219
// Stop disconnects the bot.
220
func (b *Bot) Stop() {
221
if b.client != nil {
222
b.client.Close()
223
}
224
}
225
226
func (b *Bot) deliverLoop(ctx context.Context) {
227
for {
228
select {
229
case <-ctx.Done():
230
return
231
case evt := <-b.queue:
232
b.deliver(evt)
233
}
234
}
235
}
236
237
func (b *Bot) deliver(evt Event) {
238
channel := evt.Channel
239
if channel == "" {
240
channel = b.route(evt.Type)
241
}
242
if channel == "" {
243
if b.log != nil {
244
b.log.Warn("herald: no route for event, dropping", "type", evt.Type)
245
}
246
return
247
}
248
249
if !b.limiter.Allow() {
250
if b.log != nil {
251
b.log.Warn("herald: rate limited, dropping event", "type", evt.Type, "channel", channel)
252
}
253
return
254
}
255
256
msg := evt.Message
257
if len(evt.MentionNicks) > 0 {
258
msg = strings.Join(evt.MentionNicks, " ") + ": " + msg
259
}
260
261
irc := b.client
262
if irc != nil {
263
irc.Cmd.Message(channel, msg)
264
}
265
}
266
267
// route finds the best-matching channel for an event type.
268
// Longest prefix match wins.
269
func (b *Bot) route(eventType string) string {
270
best := ""
271
bestLen := -1
272
for prefix, ch := range b.routes.Routes {
273
if strings.HasPrefix(eventType, prefix) && len(prefix) > bestLen {
274
best = ch
275
bestLen = len(prefix)
276
}
277
}
278
if best != "" {
279
return best
280
}
281
return b.routes.DefaultChannel
282
}
283
284
func splitHostPort(addr string) (string, int, error) {
285
host, portStr, err := net.SplitHostPort(addr)
286
if err != nil {
287
return "", 0, fmt.Errorf("invalid address %q: %w", addr, err)
288
}
289
port, err := strconv.Atoi(portStr)
290
if err != nil {
291
return "", 0, fmt.Errorf("invalid port in %q: %w", addr, err)
292
}
293
return host, port, nil
294
}
295
296
func min(a, b float64) float64 {
297
if a < b {
298
return a
299
}
300
return b
301
}
302

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button