|
1
|
// Package client provides a Go SDK for connecting agents to scuttlebot. |
|
2
|
// |
|
3
|
// Agents use this package to join IRC channels, send structured messages, and |
|
4
|
// register handlers for incoming message types. IRC is abstracted entirely — |
|
5
|
// callers think in terms of channels and message types, not IRC primitives. |
|
6
|
// |
|
7
|
// Quick start: |
|
8
|
// |
|
9
|
// c, err := client.New(client.Options{ |
|
10
|
// ServerAddr: "127.0.0.1:6667", |
|
11
|
// Nick: "my-agent", |
|
12
|
// Password: creds.Password, |
|
13
|
// Channels: []string{"#fleet"}, |
|
14
|
// }) |
|
15
|
// c.Handle("task.create", func(ctx context.Context, env *protocol.Envelope) error { |
|
16
|
// // handle the task |
|
17
|
// return nil |
|
18
|
// }) |
|
19
|
// err = c.Run(ctx) |
|
20
|
package client |
|
21
|
|
|
22
|
import ( |
|
23
|
"context" |
|
24
|
"fmt" |
|
25
|
"log/slog" |
|
26
|
"strings" |
|
27
|
"sync" |
|
28
|
"time" |
|
29
|
|
|
30
|
"github.com/lrstanley/girc" |
|
31
|
|
|
32
|
"github.com/conflicthq/scuttlebot/pkg/protocol" |
|
33
|
) |
|
34
|
|
|
35
|
const ( |
|
36
|
reconnectBase = 2 * time.Second |
|
37
|
reconnectMax = 60 * time.Second |
|
38
|
) |
|
39
|
|
|
40
|
// HandlerFunc is called when a message of a registered type arrives. |
|
41
|
// Returning a non-nil error logs it but does not disconnect the client. |
|
42
|
type HandlerFunc func(ctx context.Context, env *protocol.Envelope) error |
|
43
|
|
|
44
|
// Options configures a Client. |
|
45
|
type Options struct { |
|
46
|
// ServerAddr is the IRC server address (host:port). |
|
47
|
ServerAddr string |
|
48
|
|
|
49
|
// Nick is the IRC nick and NickServ account name. |
|
50
|
Nick string |
|
51
|
|
|
52
|
// Password is the NickServ / SASL password received at registration. |
|
53
|
Password string |
|
54
|
|
|
55
|
// Channels is the list of IRC channels to join on connect. |
|
56
|
Channels []string |
|
57
|
|
|
58
|
// Log is an optional structured logger. Defaults to discarding all output. |
|
59
|
Log *slog.Logger |
|
60
|
} |
|
61
|
|
|
62
|
// Client connects an agent to scuttlebot over IRC. |
|
63
|
type Client struct { |
|
64
|
opts Options |
|
65
|
log *slog.Logger |
|
66
|
mu sync.RWMutex |
|
67
|
handlers map[string][]HandlerFunc |
|
68
|
irc *girc.Client |
|
69
|
} |
|
70
|
|
|
71
|
// New creates a Client. Call Run to connect. |
|
72
|
func New(opts Options) (*Client, error) { |
|
73
|
if opts.ServerAddr == "" { |
|
74
|
return nil, fmt.Errorf("client: ServerAddr is required") |
|
75
|
} |
|
76
|
if opts.Nick == "" { |
|
77
|
return nil, fmt.Errorf("client: Nick is required") |
|
78
|
} |
|
79
|
if opts.Password == "" { |
|
80
|
return nil, fmt.Errorf("client: Password is required") |
|
81
|
} |
|
82
|
log := opts.Log |
|
83
|
if log == nil { |
|
84
|
log = slog.New(slog.NewTextHandler(noopWriter{}, nil)) |
|
85
|
} |
|
86
|
return &Client{ |
|
87
|
opts: opts, |
|
88
|
log: log, |
|
89
|
handlers: make(map[string][]HandlerFunc), |
|
90
|
}, nil |
|
91
|
} |
|
92
|
|
|
93
|
// Handle registers a handler for messages of the given type (e.g. "task.create"). |
|
94
|
// Multiple handlers can be registered for the same type; they run concurrently. |
|
95
|
// Use "*" to receive every message type. |
|
96
|
func (c *Client) Handle(msgType string, fn HandlerFunc) { |
|
97
|
c.mu.Lock() |
|
98
|
defer c.mu.Unlock() |
|
99
|
c.handlers[msgType] = append(c.handlers[msgType], fn) |
|
100
|
} |
|
101
|
|
|
102
|
// Send encodes payload as a protocol.Envelope of the given type and sends it |
|
103
|
// to channel as a PRIVMSG. |
|
104
|
func (c *Client) Send(ctx context.Context, channel, msgType string, payload any) error { |
|
105
|
env, err := protocol.New(msgType, c.opts.Nick, payload) |
|
106
|
if err != nil { |
|
107
|
return fmt.Errorf("client: build envelope: %w", err) |
|
108
|
} |
|
109
|
data, err := protocol.Marshal(env) |
|
110
|
if err != nil { |
|
111
|
return fmt.Errorf("client: marshal envelope: %w", err) |
|
112
|
} |
|
113
|
|
|
114
|
c.mu.RLock() |
|
115
|
irc := c.irc |
|
116
|
c.mu.RUnlock() |
|
117
|
|
|
118
|
if irc == nil { |
|
119
|
return fmt.Errorf("client: not connected") |
|
120
|
} |
|
121
|
irc.Cmd.Message(channel, string(data)) |
|
122
|
return nil |
|
123
|
} |
|
124
|
|
|
125
|
// Run connects to IRC and blocks until ctx is cancelled. It reconnects with |
|
126
|
// exponential backoff if the connection drops. |
|
127
|
func (c *Client) Run(ctx context.Context) error { |
|
128
|
wait := reconnectBase |
|
129
|
for { |
|
130
|
if ctx.Err() != nil { |
|
131
|
return nil |
|
132
|
} |
|
133
|
|
|
134
|
err := c.connect(ctx) |
|
135
|
if ctx.Err() != nil { |
|
136
|
return nil |
|
137
|
} |
|
138
|
if err != nil { |
|
139
|
c.log.Warn("irc connection error, reconnecting", "err", err, "wait", wait) |
|
140
|
} else { |
|
141
|
c.log.Info("irc disconnected, reconnecting", "wait", wait) |
|
142
|
} |
|
143
|
|
|
144
|
select { |
|
145
|
case <-ctx.Done(): |
|
146
|
return nil |
|
147
|
case <-time.After(wait): |
|
148
|
} |
|
149
|
wait = minDuration(wait*2, reconnectMax) |
|
150
|
} |
|
151
|
} |
|
152
|
|
|
153
|
// connect makes one connection attempt. Blocks until disconnected or ctx done. |
|
154
|
func (c *Client) connect(ctx context.Context) error { |
|
155
|
host, port, err := splitHostPort(c.opts.ServerAddr) |
|
156
|
if err != nil { |
|
157
|
return fmt.Errorf("parse server addr: %w", err) |
|
158
|
} |
|
159
|
|
|
160
|
irc := girc.New(girc.Config{ |
|
161
|
Server: host, |
|
162
|
Port: port, |
|
163
|
Nick: c.opts.Nick, |
|
164
|
User: c.opts.Nick, |
|
165
|
Name: c.opts.Nick, |
|
166
|
SASL: &girc.SASLPlain{User: c.opts.Nick, Pass: c.opts.Password}, |
|
167
|
SSL: false, |
|
168
|
}) |
|
169
|
|
|
170
|
irc.Handlers.AddBg(girc.CONNECTED, func(cl *girc.Client, e girc.Event) { |
|
171
|
for _, ch := range c.opts.Channels { |
|
172
|
cl.Cmd.Join(ch) |
|
173
|
} |
|
174
|
c.log.Info("connected to irc", "server", c.opts.ServerAddr, "channels", c.opts.Channels) |
|
175
|
// Reset backoff in caller on successful connect. |
|
176
|
}) |
|
177
|
|
|
178
|
irc.Handlers.AddBg(girc.PRIVMSG, func(cl *girc.Client, e girc.Event) { |
|
179
|
if len(e.Params) < 1 { |
|
180
|
return |
|
181
|
} |
|
182
|
channel := e.Params[0] |
|
183
|
if !strings.HasPrefix(channel, "#") { |
|
184
|
return // ignore DMs |
|
185
|
} |
|
186
|
text := e.Last() |
|
187
|
env, err := protocol.Unmarshal([]byte(text)) |
|
188
|
if err != nil { |
|
189
|
return // non-JSON PRIVMSG (human chat) — silently ignored |
|
190
|
} |
|
191
|
|
|
192
|
// Populate IRCv3 transport metadata. |
|
193
|
env.Channel = channel |
|
194
|
env.ServerTime = e.Timestamp |
|
195
|
if acct, ok := e.Tags.Get("account"); ok { |
|
196
|
env.Account = acct |
|
197
|
} |
|
198
|
if msgID, ok := e.Tags.Get("msgid"); ok { |
|
199
|
env.MsgID = msgID |
|
200
|
} |
|
201
|
if len(e.Tags) > 0 { |
|
202
|
env.Tags = make(map[string]string, len(e.Tags)) |
|
203
|
for k, v := range e.Tags { |
|
204
|
env.Tags[k] = v |
|
205
|
} |
|
206
|
} |
|
207
|
|
|
208
|
c.dispatch(ctx, env) |
|
209
|
}) |
|
210
|
|
|
211
|
// NOTICE is ignored — system/human commentary, not agent traffic. |
|
212
|
|
|
213
|
c.mu.Lock() |
|
214
|
c.irc = irc |
|
215
|
c.mu.Unlock() |
|
216
|
|
|
217
|
errCh := make(chan error, 1) |
|
218
|
go func() { |
|
219
|
if err := irc.Connect(); err != nil { |
|
220
|
errCh <- err |
|
221
|
} else { |
|
222
|
errCh <- nil |
|
223
|
} |
|
224
|
}() |
|
225
|
|
|
226
|
select { |
|
227
|
case <-ctx.Done(): |
|
228
|
irc.Close() |
|
229
|
c.mu.Lock() |
|
230
|
c.irc = nil |
|
231
|
c.mu.Unlock() |
|
232
|
return nil |
|
233
|
case err := <-errCh: |
|
234
|
c.mu.Lock() |
|
235
|
c.irc = nil |
|
236
|
c.mu.Unlock() |
|
237
|
return err |
|
238
|
} |
|
239
|
} |
|
240
|
|
|
241
|
// dispatch delivers an envelope to all matching handlers, each in its own goroutine. |
|
242
|
func (c *Client) dispatch(ctx context.Context, env *protocol.Envelope) { |
|
243
|
c.mu.RLock() |
|
244
|
typed := append([]HandlerFunc(nil), c.handlers[env.Type]...) |
|
245
|
wild := append([]HandlerFunc(nil), c.handlers["*"]...) |
|
246
|
c.mu.RUnlock() |
|
247
|
|
|
248
|
fns := append(typed, wild...) |
|
249
|
for _, fn := range fns { |
|
250
|
fn := fn |
|
251
|
go func() { |
|
252
|
if err := fn(ctx, env); err != nil { |
|
253
|
c.log.Error("handler error", "type", env.Type, "id", env.ID, "err", err) |
|
254
|
} |
|
255
|
}() |
|
256
|
} |
|
257
|
} |
|
258
|
|
|
259
|
func splitHostPort(addr string) (string, int, error) { |
|
260
|
var host string |
|
261
|
var port int |
|
262
|
if _, err := fmt.Sscanf(addr, "%[^:]:%d", &host, &port); err != nil { |
|
263
|
return "", 0, fmt.Errorf("invalid address %q: %w", addr, err) |
|
264
|
} |
|
265
|
return host, port, nil |
|
266
|
} |
|
267
|
|
|
268
|
func minDuration(a, b time.Duration) time.Duration { |
|
269
|
if a < b { |
|
270
|
return a |
|
271
|
} |
|
272
|
return b |
|
273
|
} |
|
274
|
|
|
275
|
// noopWriter satisfies io.Writer for the discard logger. |
|
276
|
type noopWriter struct{} |
|
277
|
|
|
278
|
func (noopWriter) Write(p []byte) (int, error) { return len(p), nil } |
|
279
|
|