ScuttleBot

scuttlebot / pkg / sessionrelay / irc.go
Blame History Raw 613 lines
1
package sessionrelay
2
3
import (
4
"bytes"
5
"context"
6
"encoding/json"
7
"fmt"
8
"net"
9
"net/http"
10
"os"
11
"slices"
12
"strconv"
13
"strings"
14
"sync"
15
"time"
16
17
"github.com/lrstanley/girc"
18
)
19
20
type ircConnector struct {
21
http *http.Client
22
apiURL string
23
token string
24
primary string
25
nick string
26
addr string
27
agentType string
28
pass string
29
deleteOnClose bool
30
envelopeMode bool
31
32
mu sync.RWMutex
33
channels []string
34
messages []Message
35
client *girc.Client
36
errCh chan error
37
38
registeredByRelay bool
39
connectedAt time.Time
40
}
41
42
func newIRCConnector(cfg Config) (Connector, error) {
43
if cfg.IRC.Addr == "" {
44
return nil, fmt.Errorf("sessionrelay: irc transport requires irc addr")
45
}
46
return &ircConnector{
47
http: cfg.HTTPClient,
48
apiURL: stringsTrimRightSlash(cfg.URL),
49
token: cfg.Token,
50
primary: normalizeChannel(cfg.Channel),
51
nick: cfg.Nick,
52
addr: cfg.IRC.Addr,
53
agentType: cfg.IRC.AgentType,
54
pass: cfg.IRC.Pass,
55
deleteOnClose: cfg.IRC.DeleteOnClose,
56
envelopeMode: cfg.IRC.EnvelopeMode,
57
channels: append([]string(nil), cfg.Channels...),
58
messages: make([]Message, 0, defaultBufferSize),
59
errCh: make(chan error, 1),
60
}, nil
61
}
62
63
const (
64
ircReconnectMin = 2 * time.Second
65
ircReconnectMax = 30 * time.Second
66
)
67
68
func (c *ircConnector) Connect(ctx context.Context) error {
69
if err := c.ensureCredentials(ctx); err != nil {
70
return err
71
}
72
73
host, port, err := splitHostPort(c.addr)
74
if err != nil {
75
return err
76
}
77
78
joined := make(chan struct{})
79
var joinOnce sync.Once
80
c.dial(host, port, func() { joinOnce.Do(func() { close(joined) }) })
81
82
select {
83
case <-ctx.Done():
84
c.mu.Lock()
85
if c.client != nil {
86
c.client.Close()
87
}
88
c.mu.Unlock()
89
return ctx.Err()
90
case err := <-c.errCh:
91
_ = c.cleanupRegistration(context.Background())
92
return fmt.Errorf("sessionrelay: irc connect: %w", err)
93
case <-joined:
94
go c.keepAlive(ctx, host, port)
95
return nil
96
}
97
}
98
99
// dial creates a fresh girc client, wires up handlers, and starts the
100
// connection goroutine. onJoined fires once when the primary channel is
101
// joined — used as the initial-connect signal and to reset backoff on
102
// successful reconnects.
103
func (c *ircConnector) dial(host string, port int, onJoined func()) {
104
client := girc.New(girc.Config{
105
Server: host,
106
Port: port,
107
Nick: c.nick,
108
User: c.nick,
109
Name: c.nick + " (session relay)",
110
SASL: &girc.SASLPlain{User: c.nick, Pass: c.pass},
111
PingDelay: 30 * time.Second,
112
PingTimeout: 30 * time.Second,
113
})
114
client.Handlers.AddBg(girc.CONNECTED, func(cl *girc.Client, _ girc.Event) {
115
c.mu.Lock()
116
c.connectedAt = time.Now()
117
c.mu.Unlock()
118
for _, channel := range c.Channels() {
119
cl.Cmd.Join(channel)
120
}
121
})
122
client.Handlers.AddBg(girc.JOIN, func(_ *girc.Client, e girc.Event) {
123
if len(e.Params) < 1 || e.Source == nil || e.Source.Name != c.nick {
124
return
125
}
126
if normalizeChannel(e.Params[0]) != c.primary {
127
return
128
}
129
if onJoined != nil {
130
onJoined()
131
}
132
})
133
client.Handlers.AddBg(girc.PRIVMSG, func(cl *girc.Client, e girc.Event) {
134
if len(e.Params) < 1 || e.Source == nil {
135
return
136
}
137
target := normalizeChannel(e.Params[0])
138
if !c.hasChannel(target) {
139
return
140
}
141
// Prefer account-tag (IRCv3) over source nick.
142
sender := e.Source.Name
143
if acct, ok := e.Tags.Get("account"); ok && acct != "" {
144
sender = acct
145
}
146
text := strings.TrimSpace(e.Last())
147
// RELAYMSG: server delivers as "nick/bridge" — strip the relay suffix.
148
if sep, ok := cl.GetServerOption("RELAYMSG"); ok && sep != "" {
149
if idx := strings.Index(sender, sep); idx != -1 {
150
sender = sender[:idx]
151
}
152
}
153
// Fallback: parse legacy [nick] prefix from bridge bot.
154
if sender == "bridge" && strings.HasPrefix(text, "[") {
155
if end := strings.Index(text, "] "); end != -1 {
156
sender = text[1:end]
157
text = strings.TrimSpace(text[end+2:])
158
}
159
}
160
// Use server-time when available; fall back to local clock.
161
at := e.Timestamp
162
if at.IsZero() {
163
at = time.Now()
164
}
165
var msgID string
166
if id, ok := e.Tags.Get("msgid"); ok {
167
msgID = id
168
}
169
c.appendMessage(Message{At: at, Channel: target, Nick: sender, Text: text, MsgID: msgID})
170
})
171
172
c.mu.Lock()
173
c.client = client
174
c.mu.Unlock()
175
176
go func() {
177
err := client.Connect()
178
if err == nil {
179
err = fmt.Errorf("connection closed")
180
}
181
select {
182
case c.errCh <- err:
183
default:
184
}
185
}()
186
}
187
188
// keepAlive watches for connection errors and redials with exponential backoff.
189
// It stops when ctx is cancelled (i.e. the broker is shutting down).
190
func (c *ircConnector) keepAlive(ctx context.Context, host string, port int) {
191
wait := ircReconnectMin
192
for {
193
select {
194
case <-ctx.Done():
195
return
196
case err := <-c.errCh:
197
fmt.Fprintf(os.Stderr, "sessionrelay: connection lost: %v\n", err)
198
}
199
200
// Close the dead client before replacing it.
201
c.mu.Lock()
202
if c.client != nil {
203
c.client.Close()
204
c.client = nil
205
}
206
c.mu.Unlock()
207
208
select {
209
case <-ctx.Done():
210
return
211
case <-time.After(wait):
212
}
213
fmt.Fprintf(os.Stderr, "sessionrelay: reconnecting (backoff %v)...\n", wait)
214
215
// Re-register to get fresh SASL credentials in case the server
216
// restarted and the Ergo database was reset.
217
c.pass = "" // clear stale creds
218
if err := c.ensureCredentials(ctx); err != nil {
219
fmt.Fprintf(os.Stderr, "sessionrelay: reconnect credential refresh failed: %v\n", err)
220
wait = min(wait*2, ircReconnectMax)
221
// Push a synthetic error so the loop retries.
222
go func() {
223
select {
224
case c.errCh <- err:
225
default:
226
}
227
}()
228
continue
229
}
230
fmt.Fprintf(os.Stderr, "sessionrelay: credentials refreshed, dialing...\n")
231
232
wait = min(wait*2, ircReconnectMax)
233
c.dial(host, port, func() {
234
wait = ircReconnectMin
235
fmt.Fprintf(os.Stderr, "sessionrelay: reconnected successfully\n")
236
})
237
}
238
}
239
240
func (c *ircConnector) Post(_ context.Context, text string) error {
241
return c.PostWithMeta(context.Background(), text, nil)
242
}
243
244
func (c *ircConnector) PostTo(_ context.Context, channel, text string) error {
245
return c.PostToWithMeta(context.Background(), channel, text, nil)
246
}
247
248
// PostWithMeta sends text to all channels.
249
// In envelope mode, wraps the message in a protocol.Envelope JSON.
250
func (c *ircConnector) PostWithMeta(_ context.Context, text string, meta json.RawMessage) error {
251
c.mu.RLock()
252
client := c.client
253
c.mu.RUnlock()
254
if client == nil {
255
return fmt.Errorf("sessionrelay: irc client not connected")
256
}
257
msg := c.formatMessage(text, meta)
258
for _, channel := range c.Channels() {
259
client.Cmd.Message(channel, msg)
260
}
261
return nil
262
}
263
264
// PostToWithMeta sends text to a specific channel.
265
func (c *ircConnector) PostToWithMeta(_ context.Context, channel, text string, meta json.RawMessage) error {
266
c.mu.RLock()
267
client := c.client
268
c.mu.RUnlock()
269
if client == nil {
270
return fmt.Errorf("sessionrelay: irc client not connected")
271
}
272
channel = normalizeChannel(channel)
273
if channel == "" {
274
return fmt.Errorf("sessionrelay: post channel is required")
275
}
276
client.Cmd.Message(channel, c.formatMessage(text, meta))
277
return nil
278
}
279
280
// formatMessage wraps text in a JSON envelope when envelope mode is enabled.
281
func (c *ircConnector) formatMessage(text string, meta json.RawMessage) string {
282
if !c.envelopeMode {
283
return text
284
}
285
env := map[string]any{
286
"v": 1,
287
"type": "relay.message",
288
"from": c.nick,
289
"ts": time.Now().UnixMilli(),
290
"payload": map[string]any{
291
"text": text,
292
},
293
}
294
if len(meta) > 0 {
295
env["payload"] = json.RawMessage(meta)
296
}
297
data, err := json.Marshal(env)
298
if err != nil {
299
return text // fallback to plain text
300
}
301
return string(data)
302
}
303
304
func (c *ircConnector) MessagesSince(_ context.Context, since time.Time) ([]Message, error) {
305
c.mu.RLock()
306
defer c.mu.RUnlock()
307
308
out := make([]Message, 0, len(c.messages))
309
for _, msg := range c.messages {
310
if msg.At.After(since) {
311
out = append(out, msg)
312
}
313
}
314
return out, nil
315
}
316
317
func (c *ircConnector) Touch(ctx context.Context) error {
318
c.mu.RLock()
319
client := c.client
320
c.mu.RUnlock()
321
322
if client == nil {
323
return fmt.Errorf("sessionrelay: not connected")
324
}
325
326
if !client.IsConnected() {
327
client.Close()
328
select {
329
case c.errCh <- fmt.Errorf("touch: client disconnected"):
330
default:
331
}
332
return fmt.Errorf("sessionrelay: disconnected")
333
}
334
335
// Detect server restarts by checking the server's startup time.
336
// If the server started after our IRC connection was established,
337
// the IRC connection is stale and must be recycled.
338
if c.apiURL != "" && c.token != "" {
339
probeCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
340
defer cancel()
341
req, err := http.NewRequestWithContext(probeCtx, http.MethodGet, c.apiURL+"/v1/status", nil)
342
if err != nil {
343
return nil
344
}
345
req.Header.Set("Authorization", "Bearer "+c.token)
346
resp, err := http.DefaultClient.Do(req)
347
if err != nil {
348
return nil // API unreachable, transient
349
}
350
defer resp.Body.Close()
351
352
var status struct {
353
Started string `json:"started"`
354
}
355
if err := json.NewDecoder(resp.Body).Decode(&status); err == nil && status.Started != "" {
356
serverStart, err := time.Parse(time.RFC3339Nano, status.Started)
357
if err == nil {
358
c.mu.RLock()
359
connectedAt := c.connectedAt
360
c.mu.RUnlock()
361
if !connectedAt.IsZero() && serverStart.After(connectedAt) {
362
// Server restarted after we connected — our IRC session is dead.
363
client.Close()
364
select {
365
case c.errCh <- fmt.Errorf("touch: server restarted (started %s, connected %s)", serverStart.Format(time.RFC3339), connectedAt.Format(time.RFC3339)):
366
default:
367
}
368
return fmt.Errorf("sessionrelay: server restarted")
369
}
370
}
371
}
372
373
// Also touch presence so the server tracks us.
374
presenceReq, _ := http.NewRequestWithContext(probeCtx, http.MethodPost,
375
c.apiURL+"/v1/channels/"+channelSlug(c.primary)+"/presence",
376
bytes.NewReader([]byte(`{"nick":"`+c.nick+`"}`)))
377
if presenceReq != nil {
378
presenceReq.Header.Set("Authorization", "Bearer "+c.token)
379
presenceReq.Header.Set("Content-Type", "application/json")
380
pr, err := http.DefaultClient.Do(presenceReq)
381
if pr != nil {
382
pr.Body.Close()
383
}
384
_ = err
385
}
386
}
387
388
return nil
389
}
390
391
func (c *ircConnector) JoinChannel(ctx context.Context, channel string) error {
392
channel = normalizeChannel(channel)
393
if channel == "" {
394
return fmt.Errorf("sessionrelay: join channel is required")
395
}
396
c.mu.Lock()
397
if slices.Contains(c.channels, channel) {
398
c.mu.Unlock()
399
return nil
400
}
401
c.channels = append(c.channels, channel)
402
client := c.client
403
c.mu.Unlock()
404
if client != nil {
405
client.Cmd.Join(channel)
406
}
407
go c.syncChannelsToRegistry(ctx)
408
return nil
409
}
410
411
func (c *ircConnector) PartChannel(ctx context.Context, channel string) error {
412
channel = normalizeChannel(channel)
413
if channel == "" {
414
return fmt.Errorf("sessionrelay: part channel is required")
415
}
416
if channel == c.primary {
417
return fmt.Errorf("sessionrelay: cannot part control channel %s", channel)
418
}
419
c.mu.Lock()
420
if !slices.Contains(c.channels, channel) {
421
c.mu.Unlock()
422
return nil
423
}
424
filtered := c.channels[:0]
425
for _, existing := range c.channels {
426
if existing == channel {
427
continue
428
}
429
filtered = append(filtered, existing)
430
}
431
c.channels = filtered
432
client := c.client
433
c.mu.Unlock()
434
if client != nil {
435
client.Cmd.Part(channel)
436
}
437
go c.syncChannelsToRegistry(ctx)
438
return nil
439
}
440
441
// syncChannelsToRegistry PATCHes the agent's channel list in the registry so
442
// the Agents tab stays in sync after live /join and /part commands.
443
func (c *ircConnector) syncChannelsToRegistry(ctx context.Context) {
444
if c.apiURL == "" || c.token == "" || c.nick == "" {
445
return
446
}
447
channels := c.Channels()
448
body, err := json.Marshal(map[string]any{"channels": channels})
449
if err != nil {
450
return
451
}
452
req, err := http.NewRequestWithContext(ctx, http.MethodPatch, c.apiURL+"/v1/agents/"+c.nick, bytes.NewReader(body))
453
if err != nil {
454
return
455
}
456
req.Header.Set("Authorization", "Bearer "+c.token)
457
req.Header.Set("Content-Type", "application/json")
458
resp, err := c.http.Do(req)
459
if err != nil {
460
return
461
}
462
resp.Body.Close()
463
}
464
465
func (c *ircConnector) Channels() []string {
466
c.mu.RLock()
467
defer c.mu.RUnlock()
468
return append([]string(nil), c.channels...)
469
}
470
471
func (c *ircConnector) ControlChannel() string {
472
return c.primary
473
}
474
475
func (c *ircConnector) Close(ctx context.Context) error {
476
c.mu.Lock()
477
if c.client != nil {
478
c.client.Close()
479
c.client = nil
480
}
481
c.mu.Unlock()
482
return c.cleanupRegistration(ctx)
483
}
484
485
func (c *ircConnector) appendMessage(msg Message) {
486
c.mu.Lock()
487
defer c.mu.Unlock()
488
if len(c.messages) == defaultBufferSize {
489
copy(c.messages, c.messages[1:])
490
c.messages = c.messages[:defaultBufferSize-1]
491
}
492
c.messages = append(c.messages, msg)
493
}
494
495
func (c *ircConnector) ensureCredentials(ctx context.Context) error {
496
if c.pass != "" {
497
return nil
498
}
499
if c.apiURL == "" || c.token == "" {
500
return fmt.Errorf("sessionrelay: irc transport requires irc pass or api url/token for auto-registration")
501
}
502
503
created, pass, err := c.registerOrRotate(ctx)
504
if err != nil {
505
return err
506
}
507
c.pass = pass
508
c.registeredByRelay = created
509
return nil
510
}
511
512
func (c *ircConnector) registerOrRotate(ctx context.Context) (bool, string, error) {
513
body, _ := json.Marshal(map[string]any{
514
"nick": c.nick,
515
"type": c.agentType,
516
"channels": c.Channels(),
517
})
518
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.apiURL+"/v1/agents/register", bytes.NewReader(body))
519
if err != nil {
520
return false, "", err
521
}
522
req.Header.Set("Authorization", "Bearer "+c.token)
523
req.Header.Set("Content-Type", "application/json")
524
525
resp, err := c.http.Do(req)
526
if err != nil {
527
return false, "", err
528
}
529
defer resp.Body.Close()
530
531
var createdPayload struct {
532
Credentials struct {
533
Passphrase string `json:"passphrase"`
534
} `json:"credentials"`
535
}
536
if resp.StatusCode == http.StatusCreated {
537
if err := json.NewDecoder(resp.Body).Decode(&createdPayload); err != nil {
538
return false, "", err
539
}
540
if createdPayload.Credentials.Passphrase == "" {
541
return false, "", fmt.Errorf("sessionrelay: register %s: empty passphrase", c.nick)
542
}
543
return true, createdPayload.Credentials.Passphrase, nil
544
}
545
if resp.StatusCode != http.StatusConflict {
546
return false, "", fmt.Errorf("sessionrelay: register %s: %s", c.nick, resp.Status)
547
}
548
549
rotateReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.apiURL+"/v1/agents/"+c.nick+"/rotate", nil)
550
if err != nil {
551
return false, "", err
552
}
553
rotateReq.Header.Set("Authorization", "Bearer "+c.token)
554
rotateResp, err := c.http.Do(rotateReq)
555
if err != nil {
556
return false, "", err
557
}
558
defer rotateResp.Body.Close()
559
if rotateResp.StatusCode != http.StatusOK {
560
return false, "", fmt.Errorf("sessionrelay: rotate %s: %s", c.nick, rotateResp.Status)
561
}
562
563
var rotated struct {
564
Passphrase string `json:"passphrase"`
565
}
566
if err := json.NewDecoder(rotateResp.Body).Decode(&rotated); err != nil {
567
return false, "", err
568
}
569
if rotated.Passphrase == "" {
570
return false, "", fmt.Errorf("sessionrelay: rotate %s: empty passphrase", c.nick)
571
}
572
return false, rotated.Passphrase, nil
573
}
574
575
func (c *ircConnector) cleanupRegistration(ctx context.Context) error {
576
if !c.deleteOnClose || !c.registeredByRelay || c.apiURL == "" || c.token == "" {
577
return nil
578
}
579
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, c.apiURL+"/v1/agents/"+c.nick, nil)
580
if err != nil {
581
return err
582
}
583
req.Header.Set("Authorization", "Bearer "+c.token)
584
resp, err := c.http.Do(req)
585
if err != nil {
586
return err
587
}
588
defer resp.Body.Close()
589
if resp.StatusCode/100 != 2 && resp.StatusCode != http.StatusNotFound {
590
return fmt.Errorf("sessionrelay: delete %s: %s", c.nick, resp.Status)
591
}
592
c.registeredByRelay = false
593
return nil
594
}
595
596
func (c *ircConnector) hasChannel(channel string) bool {
597
c.mu.RLock()
598
defer c.mu.RUnlock()
599
return slices.Contains(c.channels, channel)
600
}
601
602
func splitHostPort(addr string) (string, int, error) {
603
host, portStr, err := net.SplitHostPort(addr)
604
if err != nil {
605
return "", 0, fmt.Errorf("sessionrelay: invalid irc address %q: %w", addr, err)
606
}
607
port, err := strconv.Atoi(portStr)
608
if err != nil {
609
return "", 0, fmt.Errorf("sessionrelay: invalid irc port in %q: %w", addr, err)
610
}
611
return host, port, nil
612
}
613

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button