ScuttleBot

scuttlebot / pkg / client / client.go
Blame History Raw 279 lines
1
// Package client provides a Go SDK for connecting agents to scuttlebot.
2
//
3
// Agents use this package to join IRC channels, send structured messages, and
4
// register handlers for incoming message types. IRC is abstracted entirely —
5
// callers think in terms of channels and message types, not IRC primitives.
6
//
7
// Quick start:
8
//
9
// c, err := client.New(client.Options{
10
// ServerAddr: "127.0.0.1:6667",
11
// Nick: "my-agent",
12
// Password: creds.Password,
13
// Channels: []string{"#fleet"},
14
// })
15
// c.Handle("task.create", func(ctx context.Context, env *protocol.Envelope) error {
16
// // handle the task
17
// return nil
18
// })
19
// err = c.Run(ctx)
20
package client
21
22
import (
23
"context"
24
"fmt"
25
"log/slog"
26
"strings"
27
"sync"
28
"time"
29
30
"github.com/lrstanley/girc"
31
32
"github.com/conflicthq/scuttlebot/pkg/protocol"
33
)
34
35
const (
36
reconnectBase = 2 * time.Second
37
reconnectMax = 60 * time.Second
38
)
39
40
// HandlerFunc is called when a message of a registered type arrives.
41
// Returning a non-nil error logs it but does not disconnect the client.
42
type HandlerFunc func(ctx context.Context, env *protocol.Envelope) error
43
44
// Options configures a Client.
45
type Options struct {
46
// ServerAddr is the IRC server address (host:port).
47
ServerAddr string
48
49
// Nick is the IRC nick and NickServ account name.
50
Nick string
51
52
// Password is the NickServ / SASL password received at registration.
53
Password string
54
55
// Channels is the list of IRC channels to join on connect.
56
Channels []string
57
58
// Log is an optional structured logger. Defaults to discarding all output.
59
Log *slog.Logger
60
}
61
62
// Client connects an agent to scuttlebot over IRC.
63
type Client struct {
64
opts Options
65
log *slog.Logger
66
mu sync.RWMutex
67
handlers map[string][]HandlerFunc
68
irc *girc.Client
69
}
70
71
// New creates a Client. Call Run to connect.
72
func New(opts Options) (*Client, error) {
73
if opts.ServerAddr == "" {
74
return nil, fmt.Errorf("client: ServerAddr is required")
75
}
76
if opts.Nick == "" {
77
return nil, fmt.Errorf("client: Nick is required")
78
}
79
if opts.Password == "" {
80
return nil, fmt.Errorf("client: Password is required")
81
}
82
log := opts.Log
83
if log == nil {
84
log = slog.New(slog.NewTextHandler(noopWriter{}, nil))
85
}
86
return &Client{
87
opts: opts,
88
log: log,
89
handlers: make(map[string][]HandlerFunc),
90
}, nil
91
}
92
93
// Handle registers a handler for messages of the given type (e.g. "task.create").
94
// Multiple handlers can be registered for the same type; they run concurrently.
95
// Use "*" to receive every message type.
96
func (c *Client) Handle(msgType string, fn HandlerFunc) {
97
c.mu.Lock()
98
defer c.mu.Unlock()
99
c.handlers[msgType] = append(c.handlers[msgType], fn)
100
}
101
102
// Send encodes payload as a protocol.Envelope of the given type and sends it
103
// to channel as a PRIVMSG.
104
func (c *Client) Send(ctx context.Context, channel, msgType string, payload any) error {
105
env, err := protocol.New(msgType, c.opts.Nick, payload)
106
if err != nil {
107
return fmt.Errorf("client: build envelope: %w", err)
108
}
109
data, err := protocol.Marshal(env)
110
if err != nil {
111
return fmt.Errorf("client: marshal envelope: %w", err)
112
}
113
114
c.mu.RLock()
115
irc := c.irc
116
c.mu.RUnlock()
117
118
if irc == nil {
119
return fmt.Errorf("client: not connected")
120
}
121
irc.Cmd.Message(channel, string(data))
122
return nil
123
}
124
125
// Run connects to IRC and blocks until ctx is cancelled. It reconnects with
126
// exponential backoff if the connection drops.
127
func (c *Client) Run(ctx context.Context) error {
128
wait := reconnectBase
129
for {
130
if ctx.Err() != nil {
131
return nil
132
}
133
134
err := c.connect(ctx)
135
if ctx.Err() != nil {
136
return nil
137
}
138
if err != nil {
139
c.log.Warn("irc connection error, reconnecting", "err", err, "wait", wait)
140
} else {
141
c.log.Info("irc disconnected, reconnecting", "wait", wait)
142
}
143
144
select {
145
case <-ctx.Done():
146
return nil
147
case <-time.After(wait):
148
}
149
wait = minDuration(wait*2, reconnectMax)
150
}
151
}
152
153
// connect makes one connection attempt. Blocks until disconnected or ctx done.
154
func (c *Client) connect(ctx context.Context) error {
155
host, port, err := splitHostPort(c.opts.ServerAddr)
156
if err != nil {
157
return fmt.Errorf("parse server addr: %w", err)
158
}
159
160
irc := girc.New(girc.Config{
161
Server: host,
162
Port: port,
163
Nick: c.opts.Nick,
164
User: c.opts.Nick,
165
Name: c.opts.Nick,
166
SASL: &girc.SASLPlain{User: c.opts.Nick, Pass: c.opts.Password},
167
SSL: false,
168
})
169
170
irc.Handlers.AddBg(girc.CONNECTED, func(cl *girc.Client, e girc.Event) {
171
for _, ch := range c.opts.Channels {
172
cl.Cmd.Join(ch)
173
}
174
c.log.Info("connected to irc", "server", c.opts.ServerAddr, "channels", c.opts.Channels)
175
// Reset backoff in caller on successful connect.
176
})
177
178
irc.Handlers.AddBg(girc.PRIVMSG, func(cl *girc.Client, e girc.Event) {
179
if len(e.Params) < 1 {
180
return
181
}
182
channel := e.Params[0]
183
if !strings.HasPrefix(channel, "#") {
184
return // ignore DMs
185
}
186
text := e.Last()
187
env, err := protocol.Unmarshal([]byte(text))
188
if err != nil {
189
return // non-JSON PRIVMSG (human chat) — silently ignored
190
}
191
192
// Populate IRCv3 transport metadata.
193
env.Channel = channel
194
env.ServerTime = e.Timestamp
195
if acct, ok := e.Tags.Get("account"); ok {
196
env.Account = acct
197
}
198
if msgID, ok := e.Tags.Get("msgid"); ok {
199
env.MsgID = msgID
200
}
201
if len(e.Tags) > 0 {
202
env.Tags = make(map[string]string, len(e.Tags))
203
for k, v := range e.Tags {
204
env.Tags[k] = v
205
}
206
}
207
208
c.dispatch(ctx, env)
209
})
210
211
// NOTICE is ignored — system/human commentary, not agent traffic.
212
213
c.mu.Lock()
214
c.irc = irc
215
c.mu.Unlock()
216
217
errCh := make(chan error, 1)
218
go func() {
219
if err := irc.Connect(); err != nil {
220
errCh <- err
221
} else {
222
errCh <- nil
223
}
224
}()
225
226
select {
227
case <-ctx.Done():
228
irc.Close()
229
c.mu.Lock()
230
c.irc = nil
231
c.mu.Unlock()
232
return nil
233
case err := <-errCh:
234
c.mu.Lock()
235
c.irc = nil
236
c.mu.Unlock()
237
return err
238
}
239
}
240
241
// dispatch delivers an envelope to all matching handlers, each in its own goroutine.
242
func (c *Client) dispatch(ctx context.Context, env *protocol.Envelope) {
243
c.mu.RLock()
244
typed := append([]HandlerFunc(nil), c.handlers[env.Type]...)
245
wild := append([]HandlerFunc(nil), c.handlers["*"]...)
246
c.mu.RUnlock()
247
248
fns := append(typed, wild...)
249
for _, fn := range fns {
250
fn := fn
251
go func() {
252
if err := fn(ctx, env); err != nil {
253
c.log.Error("handler error", "type", env.Type, "id", env.ID, "err", err)
254
}
255
}()
256
}
257
}
258
259
func splitHostPort(addr string) (string, int, error) {
260
var host string
261
var port int
262
if _, err := fmt.Sscanf(addr, "%[^:]:%d", &host, &port); err != nil {
263
return "", 0, fmt.Errorf("invalid address %q: %w", addr, err)
264
}
265
return host, port, nil
266
}
267
268
func minDuration(a, b time.Duration) time.Duration {
269
if a < b {
270
return a
271
}
272
return b
273
}
274
275
// noopWriter satisfies io.Writer for the discard logger.
276
type noopWriter struct{}
277
278
func (noopWriter) Write(p []byte) (int, error) { return len(p), nil }
279

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button