ScuttleBot

scuttlebot / pkg / client / client.go
Source Blame History 278 lines
7afa71e… lmata 1 // Package client provides a Go SDK for connecting agents to scuttlebot.
7afa71e… lmata 2 //
7afa71e… lmata 3 // Agents use this package to join IRC channels, send structured messages, and
7afa71e… lmata 4 // register handlers for incoming message types. IRC is abstracted entirely —
7afa71e… lmata 5 // callers think in terms of channels and message types, not IRC primitives.
7afa71e… lmata 6 //
7afa71e… lmata 7 // Quick start:
7afa71e… lmata 8 //
7afa71e… lmata 9 // c, err := client.New(client.Options{
7afa71e… lmata 10 // ServerAddr: "127.0.0.1:6667",
7afa71e… lmata 11 // Nick: "my-agent",
7afa71e… lmata 12 // Password: creds.Password,
7afa71e… lmata 13 // Channels: []string{"#fleet"},
7afa71e… lmata 14 // })
7afa71e… lmata 15 // c.Handle("task.create", func(ctx context.Context, env *protocol.Envelope) error {
7afa71e… lmata 16 // // handle the task
7afa71e… lmata 17 // return nil
7afa71e… lmata 18 // })
7afa71e… lmata 19 // err = c.Run(ctx)
cadb504… lmata 20 package client
7afa71e… lmata 21
7afa71e… lmata 22 import (
7afa71e… lmata 23 "context"
7afa71e… lmata 24 "fmt"
7afa71e… lmata 25 "log/slog"
7afa71e… lmata 26 "strings"
7afa71e… lmata 27 "sync"
7afa71e… lmata 28 "time"
7afa71e… lmata 29
7afa71e… lmata 30 "github.com/lrstanley/girc"
7afa71e… lmata 31
7afa71e… lmata 32 "github.com/conflicthq/scuttlebot/pkg/protocol"
7afa71e… lmata 33 )
7afa71e… lmata 34
7afa71e… lmata 35 const (
7afa71e… lmata 36 reconnectBase = 2 * time.Second
7afa71e… lmata 37 reconnectMax = 60 * time.Second
7afa71e… lmata 38 )
7afa71e… lmata 39
7afa71e… lmata 40 // HandlerFunc is called when a message of a registered type arrives.
7afa71e… lmata 41 // Returning a non-nil error logs it but does not disconnect the client.
7afa71e… lmata 42 type HandlerFunc func(ctx context.Context, env *protocol.Envelope) error
7afa71e… lmata 43
7afa71e… lmata 44 // Options configures a Client.
7afa71e… lmata 45 type Options struct {
7afa71e… lmata 46 // ServerAddr is the IRC server address (host:port).
7afa71e… lmata 47 ServerAddr string
7afa71e… lmata 48
7afa71e… lmata 49 // Nick is the IRC nick and NickServ account name.
7afa71e… lmata 50 Nick string
7afa71e… lmata 51
7afa71e… lmata 52 // Password is the NickServ / SASL password received at registration.
7afa71e… lmata 53 Password string
7afa71e… lmata 54
7afa71e… lmata 55 // Channels is the list of IRC channels to join on connect.
7afa71e… lmata 56 Channels []string
7afa71e… lmata 57
7afa71e… lmata 58 // Log is an optional structured logger. Defaults to discarding all output.
7afa71e… lmata 59 Log *slog.Logger
7afa71e… lmata 60 }
7afa71e… lmata 61
7afa71e… lmata 62 // Client connects an agent to scuttlebot over IRC.
7afa71e… lmata 63 type Client struct {
7afa71e… lmata 64 opts Options
7afa71e… lmata 65 log *slog.Logger
7afa71e… lmata 66 mu sync.RWMutex
7afa71e… lmata 67 handlers map[string][]HandlerFunc
7afa71e… lmata 68 irc *girc.Client
7afa71e… lmata 69 }
7afa71e… lmata 70
7afa71e… lmata 71 // New creates a Client. Call Run to connect.
7afa71e… lmata 72 func New(opts Options) (*Client, error) {
7afa71e… lmata 73 if opts.ServerAddr == "" {
7afa71e… lmata 74 return nil, fmt.Errorf("client: ServerAddr is required")
7afa71e… lmata 75 }
7afa71e… lmata 76 if opts.Nick == "" {
7afa71e… lmata 77 return nil, fmt.Errorf("client: Nick is required")
7afa71e… lmata 78 }
7afa71e… lmata 79 if opts.Password == "" {
7afa71e… lmata 80 return nil, fmt.Errorf("client: Password is required")
7afa71e… lmata 81 }
7afa71e… lmata 82 log := opts.Log
7afa71e… lmata 83 if log == nil {
7afa71e… lmata 84 log = slog.New(slog.NewTextHandler(noopWriter{}, nil))
7afa71e… lmata 85 }
7afa71e… lmata 86 return &Client{
7afa71e… lmata 87 opts: opts,
7afa71e… lmata 88 log: log,
7afa71e… lmata 89 handlers: make(map[string][]HandlerFunc),
7afa71e… lmata 90 }, nil
7afa71e… lmata 91 }
7afa71e… lmata 92
7afa71e… lmata 93 // Handle registers a handler for messages of the given type (e.g. "task.create").
7afa71e… lmata 94 // Multiple handlers can be registered for the same type; they run concurrently.
7afa71e… lmata 95 // Use "*" to receive every message type.
7afa71e… lmata 96 func (c *Client) Handle(msgType string, fn HandlerFunc) {
7afa71e… lmata 97 c.mu.Lock()
7afa71e… lmata 98 defer c.mu.Unlock()
7afa71e… lmata 99 c.handlers[msgType] = append(c.handlers[msgType], fn)
7afa71e… lmata 100 }
7afa71e… lmata 101
7afa71e… lmata 102 // Send encodes payload as a protocol.Envelope of the given type and sends it
7afa71e… lmata 103 // to channel as a PRIVMSG.
7afa71e… lmata 104 func (c *Client) Send(ctx context.Context, channel, msgType string, payload any) error {
7afa71e… lmata 105 env, err := protocol.New(msgType, c.opts.Nick, payload)
7afa71e… lmata 106 if err != nil {
7afa71e… lmata 107 return fmt.Errorf("client: build envelope: %w", err)
7afa71e… lmata 108 }
7afa71e… lmata 109 data, err := protocol.Marshal(env)
7afa71e… lmata 110 if err != nil {
7afa71e… lmata 111 return fmt.Errorf("client: marshal envelope: %w", err)
7afa71e… lmata 112 }
7afa71e… lmata 113
7afa71e… lmata 114 c.mu.RLock()
7afa71e… lmata 115 irc := c.irc
7afa71e… lmata 116 c.mu.RUnlock()
7afa71e… lmata 117
7afa71e… lmata 118 if irc == nil {
7afa71e… lmata 119 return fmt.Errorf("client: not connected")
7afa71e… lmata 120 }
7afa71e… lmata 121 irc.Cmd.Message(channel, string(data))
7afa71e… lmata 122 return nil
7afa71e… lmata 123 }
7afa71e… lmata 124
7afa71e… lmata 125 // Run connects to IRC and blocks until ctx is cancelled. It reconnects with
7afa71e… lmata 126 // exponential backoff if the connection drops.
7afa71e… lmata 127 func (c *Client) Run(ctx context.Context) error {
7afa71e… lmata 128 wait := reconnectBase
7afa71e… lmata 129 for {
7afa71e… lmata 130 if ctx.Err() != nil {
7afa71e… lmata 131 return nil
7afa71e… lmata 132 }
7afa71e… lmata 133
7afa71e… lmata 134 err := c.connect(ctx)
7afa71e… lmata 135 if ctx.Err() != nil {
7afa71e… lmata 136 return nil
7afa71e… lmata 137 }
7afa71e… lmata 138 if err != nil {
7afa71e… lmata 139 c.log.Warn("irc connection error, reconnecting", "err", err, "wait", wait)
7afa71e… lmata 140 } else {
7afa71e… lmata 141 c.log.Info("irc disconnected, reconnecting", "wait", wait)
7afa71e… lmata 142 }
7afa71e… lmata 143
7afa71e… lmata 144 select {
7afa71e… lmata 145 case <-ctx.Done():
7afa71e… lmata 146 return nil
7afa71e… lmata 147 case <-time.After(wait):
7afa71e… lmata 148 }
7afa71e… lmata 149 wait = minDuration(wait*2, reconnectMax)
7afa71e… lmata 150 }
7afa71e… lmata 151 }
7afa71e… lmata 152
7afa71e… lmata 153 // connect makes one connection attempt. Blocks until disconnected or ctx done.
7afa71e… lmata 154 func (c *Client) connect(ctx context.Context) error {
7afa71e… lmata 155 host, port, err := splitHostPort(c.opts.ServerAddr)
7afa71e… lmata 156 if err != nil {
7afa71e… lmata 157 return fmt.Errorf("parse server addr: %w", err)
7afa71e… lmata 158 }
7afa71e… lmata 159
7afa71e… lmata 160 irc := girc.New(girc.Config{
7afa71e… lmata 161 Server: host,
7afa71e… lmata 162 Port: port,
7afa71e… lmata 163 Nick: c.opts.Nick,
7afa71e… lmata 164 User: c.opts.Nick,
7afa71e… lmata 165 Name: c.opts.Nick,
7afa71e… lmata 166 SASL: &girc.SASLPlain{User: c.opts.Nick, Pass: c.opts.Password},
7afa71e… lmata 167 SSL: false,
7afa71e… lmata 168 })
7afa71e… lmata 169
7afa71e… lmata 170 irc.Handlers.AddBg(girc.CONNECTED, func(cl *girc.Client, e girc.Event) {
7afa71e… lmata 171 for _, ch := range c.opts.Channels {
7afa71e… lmata 172 cl.Cmd.Join(ch)
7afa71e… lmata 173 }
7afa71e… lmata 174 c.log.Info("connected to irc", "server", c.opts.ServerAddr, "channels", c.opts.Channels)
7afa71e… lmata 175 // Reset backoff in caller on successful connect.
7afa71e… lmata 176 })
7afa71e… lmata 177
7afa71e… lmata 178 irc.Handlers.AddBg(girc.PRIVMSG, func(cl *girc.Client, e girc.Event) {
7afa71e… lmata 179 if len(e.Params) < 1 {
7afa71e… lmata 180 return
7afa71e… lmata 181 }
7afa71e… lmata 182 channel := e.Params[0]
7afa71e… lmata 183 if !strings.HasPrefix(channel, "#") {
7afa71e… lmata 184 return // ignore DMs
7afa71e… lmata 185 }
7afa71e… lmata 186 text := e.Last()
7afa71e… lmata 187 env, err := protocol.Unmarshal([]byte(text))
7afa71e… lmata 188 if err != nil {
7afa71e… lmata 189 return // non-JSON PRIVMSG (human chat) — silently ignored
7afa71e… lmata 190 }
9eb7d9e… noreply 191
9eb7d9e… noreply 192 // Populate IRCv3 transport metadata.
9eb7d9e… noreply 193 env.Channel = channel
9eb7d9e… noreply 194 env.ServerTime = e.Timestamp
9eb7d9e… noreply 195 if acct, ok := e.Tags.Get("account"); ok {
9eb7d9e… noreply 196 env.Account = acct
9eb7d9e… noreply 197 }
9eb7d9e… noreply 198 if msgID, ok := e.Tags.Get("msgid"); ok {
9eb7d9e… noreply 199 env.MsgID = msgID
9eb7d9e… noreply 200 }
9eb7d9e… noreply 201 if len(e.Tags) > 0 {
9eb7d9e… noreply 202 env.Tags = make(map[string]string, len(e.Tags))
9eb7d9e… noreply 203 for k, v := range e.Tags {
9eb7d9e… noreply 204 env.Tags[k] = v
9eb7d9e… noreply 205 }
9eb7d9e… noreply 206 }
9eb7d9e… noreply 207
7afa71e… lmata 208 c.dispatch(ctx, env)
7afa71e… lmata 209 })
7afa71e… lmata 210
7afa71e… lmata 211 // NOTICE is ignored — system/human commentary, not agent traffic.
7afa71e… lmata 212
7afa71e… lmata 213 c.mu.Lock()
7afa71e… lmata 214 c.irc = irc
7afa71e… lmata 215 c.mu.Unlock()
7afa71e… lmata 216
7afa71e… lmata 217 errCh := make(chan error, 1)
7afa71e… lmata 218 go func() {
7afa71e… lmata 219 if err := irc.Connect(); err != nil {
7afa71e… lmata 220 errCh <- err
7afa71e… lmata 221 } else {
7afa71e… lmata 222 errCh <- nil
7afa71e… lmata 223 }
7afa71e… lmata 224 }()
7afa71e… lmata 225
7afa71e… lmata 226 select {
7afa71e… lmata 227 case <-ctx.Done():
7afa71e… lmata 228 irc.Close()
7afa71e… lmata 229 c.mu.Lock()
7afa71e… lmata 230 c.irc = nil
7afa71e… lmata 231 c.mu.Unlock()
7afa71e… lmata 232 return nil
7afa71e… lmata 233 case err := <-errCh:
7afa71e… lmata 234 c.mu.Lock()
7afa71e… lmata 235 c.irc = nil
7afa71e… lmata 236 c.mu.Unlock()
7afa71e… lmata 237 return err
7afa71e… lmata 238 }
7afa71e… lmata 239 }
7afa71e… lmata 240
7afa71e… lmata 241 // dispatch delivers an envelope to all matching handlers, each in its own goroutine.
7afa71e… lmata 242 func (c *Client) dispatch(ctx context.Context, env *protocol.Envelope) {
7afa71e… lmata 243 c.mu.RLock()
7afa71e… lmata 244 typed := append([]HandlerFunc(nil), c.handlers[env.Type]...)
7afa71e… lmata 245 wild := append([]HandlerFunc(nil), c.handlers["*"]...)
7afa71e… lmata 246 c.mu.RUnlock()
7afa71e… lmata 247
7afa71e… lmata 248 fns := append(typed, wild...)
7afa71e… lmata 249 for _, fn := range fns {
7afa71e… lmata 250 fn := fn
7afa71e… lmata 251 go func() {
7afa71e… lmata 252 if err := fn(ctx, env); err != nil {
7afa71e… lmata 253 c.log.Error("handler error", "type", env.Type, "id", env.ID, "err", err)
7afa71e… lmata 254 }
7afa71e… lmata 255 }()
7afa71e… lmata 256 }
7afa71e… lmata 257 }
7afa71e… lmata 258
7afa71e… lmata 259 func splitHostPort(addr string) (string, int, error) {
7afa71e… lmata 260 var host string
7afa71e… lmata 261 var port int
7afa71e… lmata 262 if _, err := fmt.Sscanf(addr, "%[^:]:%d", &host, &port); err != nil {
7afa71e… lmata 263 return "", 0, fmt.Errorf("invalid address %q: %w", addr, err)
7afa71e… lmata 264 }
7afa71e… lmata 265 return host, port, nil
7afa71e… lmata 266 }
7afa71e… lmata 267
7afa71e… lmata 268 func minDuration(a, b time.Duration) time.Duration {
7afa71e… lmata 269 if a < b {
7afa71e… lmata 270 return a
7afa71e… lmata 271 }
7afa71e… lmata 272 return b
7afa71e… lmata 273 }
7afa71e… lmata 274
7afa71e… lmata 275 // noopWriter satisfies io.Writer for the discard logger.
7afa71e… lmata 276 type noopWriter struct{}
7afa71e… lmata 277
7afa71e… lmata 278 func (noopWriter) Write(p []byte) (int, error) { return len(p), nil }

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button