ScuttleBot

scuttlebot / internal / bots / herald / herald.go
Source Blame History 301 lines
8fe9b10… lmata 1 // Package herald implements the herald bot — alert and notification delivery.
8fe9b10… lmata 2 //
8fe9b10… lmata 3 // External systems push events to herald via Emit(); herald routes them to
8fe9b10… lmata 4 // IRC channels based on event type. Supports agent mentions/highlights and
8fe9b10… lmata 5 // rate limiting (burst allowed, sustained flood protection).
8fe9b10… lmata 6 //
8fe9b10… lmata 7 // Event routing is configured per-type in RouteConfig. Unrouted events are
8fe9b10… lmata 8 // dropped with a warning log.
8fe9b10… lmata 9 package herald
8fe9b10… lmata 10
8fe9b10… lmata 11 import (
8fe9b10… lmata 12 "context"
8fe9b10… lmata 13 "fmt"
8fe9b10… lmata 14 "log/slog"
18e8fef… lmata 15 "net"
18e8fef… lmata 16 "strconv"
8fe9b10… lmata 17 "strings"
8fe9b10… lmata 18 "sync"
8fe9b10… lmata 19 "time"
8fe9b10… lmata 20
8fe9b10… lmata 21 "github.com/lrstanley/girc"
e8d318d… noreply 22
e8d318d… noreply 23 "github.com/conflicthq/scuttlebot/internal/bots/cmdparse"
8fe9b10… lmata 24 )
8fe9b10… lmata 25
8fe9b10… lmata 26 const botNick = "herald"
8fe9b10… lmata 27
8fe9b10… lmata 28 // Event is a notification pushed to herald for delivery.
8fe9b10… lmata 29 type Event struct {
8fe9b10… lmata 30 // Type identifies the event (e.g. "ci.build.failed", "deploy.complete").
8fe9b10… lmata 31 Type string
8fe9b10… lmata 32
8fe9b10… lmata 33 // Channel overrides the default route for this event type.
8fe9b10… lmata 34 // If empty, the RouteConfig default is used.
8fe9b10… lmata 35 Channel string
8fe9b10… lmata 36
8fe9b10… lmata 37 // Message is the human-readable notification text.
8fe9b10… lmata 38 Message string
8fe9b10… lmata 39
8fe9b10… lmata 40 // MentionNicks are agent nicks to highlight in the message.
8fe9b10… lmata 41 MentionNicks []string
8fe9b10… lmata 42 }
8fe9b10… lmata 43
8fe9b10… lmata 44 // RouteConfig maps event types to IRC channels.
8fe9b10… lmata 45 type RouteConfig struct {
8fe9b10… lmata 46 // Routes maps event type prefixes to channels.
8fe9b10… lmata 47 // Key can be an exact type ("ci.build.failed") or a prefix ("ci.").
8fe9b10… lmata 48 // Longest match wins.
8fe9b10… lmata 49 Routes map[string]string
8fe9b10… lmata 50
8fe9b10… lmata 51 // DefaultChannel is used when no route matches.
8fe9b10… lmata 52 // If empty, unrouted events are dropped.
8fe9b10… lmata 53 DefaultChannel string
8fe9b10… lmata 54 }
8fe9b10… lmata 55
8fe9b10… lmata 56 // RateLimiter is a simple token-bucket rate limiter.
8fe9b10… lmata 57 type RateLimiter struct {
8fe9b10… lmata 58 mu sync.Mutex
8fe9b10… lmata 59 tokens float64
8fe9b10… lmata 60 maxBurst float64
8fe9b10… lmata 61 rate float64 // tokens per second
8fe9b10… lmata 62 last time.Time
8fe9b10… lmata 63 }
8fe9b10… lmata 64
8fe9b10… lmata 65 func newRateLimiter(ratePerSec float64, burst int) *RateLimiter {
8fe9b10… lmata 66 return &RateLimiter{
8fe9b10… lmata 67 tokens: float64(burst),
8fe9b10… lmata 68 maxBurst: float64(burst),
8fe9b10… lmata 69 rate: ratePerSec,
8fe9b10… lmata 70 last: time.Now(),
8fe9b10… lmata 71 }
8fe9b10… lmata 72 }
8fe9b10… lmata 73
8fe9b10… lmata 74 // Allow returns true if a token is available.
8fe9b10… lmata 75 func (r *RateLimiter) Allow() bool {
8fe9b10… lmata 76 r.mu.Lock()
8fe9b10… lmata 77 defer r.mu.Unlock()
8fe9b10… lmata 78 now := time.Now()
8fe9b10… lmata 79 elapsed := now.Sub(r.last).Seconds()
8fe9b10… lmata 80 r.last = now
8fe9b10… lmata 81 r.tokens = min(r.maxBurst, r.tokens+elapsed*r.rate)
8fe9b10… lmata 82 if r.tokens >= 1 {
8fe9b10… lmata 83 r.tokens--
8fe9b10… lmata 84 return true
8fe9b10… lmata 85 }
8fe9b10… lmata 86 return false
8fe9b10… lmata 87 }
8fe9b10… lmata 88
8fe9b10… lmata 89 // Bot is the herald bot.
8fe9b10… lmata 90 type Bot struct {
8fe9b10… lmata 91 ircAddr string
8fe9b10… lmata 92 password string
3420a83… lmata 93 channels []string
8fe9b10… lmata 94 routes RouteConfig
8fe9b10… lmata 95 limiter *RateLimiter
8fe9b10… lmata 96 queue chan Event
8fe9b10… lmata 97 log *slog.Logger
8fe9b10… lmata 98 client *girc.Client
8fe9b10… lmata 99 }
8fe9b10… lmata 100
8fe9b10… lmata 101 const defaultQueueSize = 256
8fe9b10… lmata 102
8fe9b10… lmata 103 // New creates a herald bot. ratePerSec and burst configure the token-bucket
8fe9b10… lmata 104 // rate limiter (e.g. 5 messages/sec with burst of 20).
3420a83… lmata 105 func New(ircAddr, password string, channels []string, routes RouteConfig, ratePerSec float64, burst int, log *slog.Logger) *Bot {
8fe9b10… lmata 106 if ratePerSec <= 0 {
8fe9b10… lmata 107 ratePerSec = 5
8fe9b10… lmata 108 }
8fe9b10… lmata 109 if burst <= 0 {
8fe9b10… lmata 110 burst = 20
8fe9b10… lmata 111 }
8fe9b10… lmata 112 return &Bot{
8fe9b10… lmata 113 ircAddr: ircAddr,
8fe9b10… lmata 114 password: password,
3420a83… lmata 115 channels: channels,
8fe9b10… lmata 116 routes: routes,
8fe9b10… lmata 117 limiter: newRateLimiter(ratePerSec, burst),
8fe9b10… lmata 118 queue: make(chan Event, defaultQueueSize),
8fe9b10… lmata 119 log: log,
8fe9b10… lmata 120 }
8fe9b10… lmata 121 }
8fe9b10… lmata 122
8fe9b10… lmata 123 // Name returns the bot's IRC nick.
8fe9b10… lmata 124 func (b *Bot) Name() string { return botNick }
8fe9b10… lmata 125
8fe9b10… lmata 126 // Emit queues an event for delivery. Non-blocking: drops the event if the
8fe9b10… lmata 127 // queue is full and logs a warning.
8fe9b10… lmata 128 func (b *Bot) Emit(e Event) {
8fe9b10… lmata 129 select {
8fe9b10… lmata 130 case b.queue <- e:
8fe9b10… lmata 131 default:
8fe9b10… lmata 132 if b.log != nil {
8fe9b10… lmata 133 b.log.Warn("herald: queue full, dropping event", "type", e.Type)
8fe9b10… lmata 134 }
8fe9b10… lmata 135 }
8fe9b10… lmata 136 }
8fe9b10… lmata 137
8fe9b10… lmata 138 // Start connects to IRC and begins processing events. Blocks until ctx is cancelled.
8fe9b10… lmata 139 func (b *Bot) Start(ctx context.Context) error {
8fe9b10… lmata 140 host, port, err := splitHostPort(b.ircAddr)
8fe9b10… lmata 141 if err != nil {
8fe9b10… lmata 142 return fmt.Errorf("herald: parse irc addr: %w", err)
8fe9b10… lmata 143 }
8fe9b10… lmata 144
8fe9b10… lmata 145 c := girc.New(girc.Config{
d924aea… lmata 146 Server: host,
d924aea… lmata 147 Port: port,
d924aea… lmata 148 Nick: botNick,
d924aea… lmata 149 User: botNick,
d924aea… lmata 150 Name: "scuttlebot herald",
81587e6… lmata 151 SASL: &girc.SASLPlain{User: botNick, Pass: b.password},
81587e6… lmata 152 PingDelay: 30 * time.Second,
81587e6… lmata 153 PingTimeout: 30 * time.Second,
81587e6… lmata 154 SSL: false,
bd16e1f… lmata 155 })
bd16e1f… lmata 156
3420a83… lmata 157 c.Handlers.AddBg(girc.CONNECTED, func(cl *girc.Client, _ girc.Event) {
f64fe5f… noreply 158 cl.Cmd.Mode(cl.GetNick(), "+B")
3420a83… lmata 159 for _, ch := range b.channels {
3420a83… lmata 160 cl.Cmd.Join(ch)
3420a83… lmata 161 }
bd16e1f… lmata 162 if b.log != nil {
3420a83… lmata 163 b.log.Info("herald connected", "channels", b.channels)
bd16e1f… lmata 164 }
bd16e1f… lmata 165 })
bd16e1f… lmata 166
bd16e1f… lmata 167 c.Handlers.AddBg(girc.INVITE, func(cl *girc.Client, e girc.Event) {
bd16e1f… lmata 168 if ch := e.Last(); strings.HasPrefix(ch, "#") {
bd16e1f… lmata 169 cl.Cmd.Join(ch)
e8d318d… noreply 170 }
e8d318d… noreply 171 })
e8d318d… noreply 172
e8d318d… noreply 173 router := cmdparse.NewRouter(botNick)
e8d318d… noreply 174 router.Register(cmdparse.Command{
e8d318d… noreply 175 Name: "status",
e8d318d… noreply 176 Usage: "STATUS",
e8d318d… noreply 177 Description: "show webhook endpoint status and recent events",
e8d318d… noreply 178 Handler: func(_ *cmdparse.Context, _ string) string { return "not implemented yet" },
e8d318d… noreply 179 })
e8d318d… noreply 180 router.Register(cmdparse.Command{
e8d318d… noreply 181 Name: "test",
e8d318d… noreply 182 Usage: "TEST #channel",
e8d318d… noreply 183 Description: "send a test event to a channel",
e8d318d… noreply 184 Handler: func(_ *cmdparse.Context, _ string) string { return "not implemented yet" },
e8d318d… noreply 185 })
e8d318d… noreply 186
e8d318d… noreply 187 c.Handlers.AddBg(girc.PRIVMSG, func(cl *girc.Client, e girc.Event) {
e8d318d… noreply 188 if len(e.Params) < 1 || e.Source == nil {
e8d318d… noreply 189 return
e8d318d… noreply 190 }
e8d318d… noreply 191 // Dispatch commands (DMs and channel messages).
e8d318d… noreply 192 if reply := router.Dispatch(e.Source.Name, e.Params[0], e.Last()); reply != nil {
e8d318d… noreply 193 cl.Cmd.Message(reply.Target, reply.Text)
e8d318d… noreply 194 return
8fe9b10… lmata 195 }
8fe9b10… lmata 196 })
8fe9b10… lmata 197
8fe9b10… lmata 198 b.client = c
8fe9b10… lmata 199
8fe9b10… lmata 200 errCh := make(chan error, 1)
8fe9b10… lmata 201 go func() {
8fe9b10… lmata 202 if err := c.Connect(); err != nil && ctx.Err() == nil {
8fe9b10… lmata 203 errCh <- err
8fe9b10… lmata 204 }
8fe9b10… lmata 205 }()
8fe9b10… lmata 206
8fe9b10… lmata 207 // Event delivery loop.
8fe9b10… lmata 208 go b.deliverLoop(ctx)
8fe9b10… lmata 209
8fe9b10… lmata 210 select {
8fe9b10… lmata 211 case <-ctx.Done():
8fe9b10… lmata 212 c.Close()
8fe9b10… lmata 213 return nil
8fe9b10… lmata 214 case err := <-errCh:
8fe9b10… lmata 215 return fmt.Errorf("herald: irc connection: %w", err)
8fe9b10… lmata 216 }
8fe9b10… lmata 217 }
8fe9b10… lmata 218
8fe9b10… lmata 219 // Stop disconnects the bot.
8fe9b10… lmata 220 func (b *Bot) Stop() {
8fe9b10… lmata 221 if b.client != nil {
8fe9b10… lmata 222 b.client.Close()
8fe9b10… lmata 223 }
8fe9b10… lmata 224 }
8fe9b10… lmata 225
8fe9b10… lmata 226 func (b *Bot) deliverLoop(ctx context.Context) {
8fe9b10… lmata 227 for {
8fe9b10… lmata 228 select {
8fe9b10… lmata 229 case <-ctx.Done():
8fe9b10… lmata 230 return
8fe9b10… lmata 231 case evt := <-b.queue:
8fe9b10… lmata 232 b.deliver(evt)
8fe9b10… lmata 233 }
8fe9b10… lmata 234 }
8fe9b10… lmata 235 }
8fe9b10… lmata 236
8fe9b10… lmata 237 func (b *Bot) deliver(evt Event) {
8fe9b10… lmata 238 channel := evt.Channel
8fe9b10… lmata 239 if channel == "" {
8fe9b10… lmata 240 channel = b.route(evt.Type)
8fe9b10… lmata 241 }
8fe9b10… lmata 242 if channel == "" {
8fe9b10… lmata 243 if b.log != nil {
8fe9b10… lmata 244 b.log.Warn("herald: no route for event, dropping", "type", evt.Type)
8fe9b10… lmata 245 }
8fe9b10… lmata 246 return
8fe9b10… lmata 247 }
8fe9b10… lmata 248
8fe9b10… lmata 249 if !b.limiter.Allow() {
8fe9b10… lmata 250 if b.log != nil {
8fe9b10… lmata 251 b.log.Warn("herald: rate limited, dropping event", "type", evt.Type, "channel", channel)
8fe9b10… lmata 252 }
8fe9b10… lmata 253 return
8fe9b10… lmata 254 }
8fe9b10… lmata 255
8fe9b10… lmata 256 msg := evt.Message
8fe9b10… lmata 257 if len(evt.MentionNicks) > 0 {
8fe9b10… lmata 258 msg = strings.Join(evt.MentionNicks, " ") + ": " + msg
8fe9b10… lmata 259 }
8fe9b10… lmata 260
8fe9b10… lmata 261 irc := b.client
8fe9b10… lmata 262 if irc != nil {
8fe9b10… lmata 263 irc.Cmd.Message(channel, msg)
8fe9b10… lmata 264 }
8fe9b10… lmata 265 }
8fe9b10… lmata 266
8fe9b10… lmata 267 // route finds the best-matching channel for an event type.
8fe9b10… lmata 268 // Longest prefix match wins.
8fe9b10… lmata 269 func (b *Bot) route(eventType string) string {
8fe9b10… lmata 270 best := ""
8fe9b10… lmata 271 bestLen := -1
8fe9b10… lmata 272 for prefix, ch := range b.routes.Routes {
8fe9b10… lmata 273 if strings.HasPrefix(eventType, prefix) && len(prefix) > bestLen {
8fe9b10… lmata 274 best = ch
8fe9b10… lmata 275 bestLen = len(prefix)
8fe9b10… lmata 276 }
8fe9b10… lmata 277 }
8fe9b10… lmata 278 if best != "" {
8fe9b10… lmata 279 return best
8fe9b10… lmata 280 }
8fe9b10… lmata 281 return b.routes.DefaultChannel
8fe9b10… lmata 282 }
8fe9b10… lmata 283
8fe9b10… lmata 284 func splitHostPort(addr string) (string, int, error) {
18e8fef… lmata 285 host, portStr, err := net.SplitHostPort(addr)
18e8fef… lmata 286 if err != nil {
8fe9b10… lmata 287 return "", 0, fmt.Errorf("invalid address %q: %w", addr, err)
18e8fef… lmata 288 }
18e8fef… lmata 289 port, err := strconv.Atoi(portStr)
18e8fef… lmata 290 if err != nil {
18e8fef… lmata 291 return "", 0, fmt.Errorf("invalid port in %q: %w", addr, err)
8fe9b10… lmata 292 }
8fe9b10… lmata 293 return host, port, nil
8fe9b10… lmata 294 }
8fe9b10… lmata 295
8fe9b10… lmata 296 func min(a, b float64) float64 {
8fe9b10… lmata 297 if a < b {
8fe9b10… lmata 298 return a
8fe9b10… lmata 299 }
8fe9b10… lmata 300 return b
8fe9b10… lmata 301 }

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button