|
24a217e…
|
lmata
|
1 |
package sessionrelay |
|
24a217e…
|
lmata
|
2 |
|
|
24a217e…
|
lmata
|
3 |
import ( |
|
24a217e…
|
lmata
|
4 |
"bytes" |
|
24a217e…
|
lmata
|
5 |
"context" |
|
24a217e…
|
lmata
|
6 |
"encoding/json" |
|
24a217e…
|
lmata
|
7 |
"fmt" |
|
24a217e…
|
lmata
|
8 |
"net" |
|
24a217e…
|
lmata
|
9 |
"net/http" |
|
54be8d2…
|
lmata
|
10 |
"os" |
|
1d3caa2…
|
lmata
|
11 |
"slices" |
|
24a217e…
|
lmata
|
12 |
"strconv" |
|
24a217e…
|
lmata
|
13 |
"strings" |
|
24a217e…
|
lmata
|
14 |
"sync" |
|
24a217e…
|
lmata
|
15 |
"time" |
|
24a217e…
|
lmata
|
16 |
|
|
24a217e…
|
lmata
|
17 |
"github.com/lrstanley/girc" |
|
24a217e…
|
lmata
|
18 |
) |
|
24a217e…
|
lmata
|
19 |
|
|
24a217e…
|
lmata
|
20 |
type ircConnector struct { |
|
24a217e…
|
lmata
|
21 |
http *http.Client |
|
24a217e…
|
lmata
|
22 |
apiURL string |
|
24a217e…
|
lmata
|
23 |
token string |
|
1d3caa2…
|
lmata
|
24 |
primary string |
|
24a217e…
|
lmata
|
25 |
nick string |
|
24a217e…
|
lmata
|
26 |
addr string |
|
24a217e…
|
lmata
|
27 |
agentType string |
|
24a217e…
|
lmata
|
28 |
pass string |
|
24a217e…
|
lmata
|
29 |
deleteOnClose bool |
|
a027855…
|
noreply
|
30 |
envelopeMode bool |
|
24a217e…
|
lmata
|
31 |
|
|
24a217e…
|
lmata
|
32 |
mu sync.RWMutex |
|
1d3caa2…
|
lmata
|
33 |
channels []string |
|
24a217e…
|
lmata
|
34 |
messages []Message |
|
24a217e…
|
lmata
|
35 |
client *girc.Client |
|
24a217e…
|
lmata
|
36 |
errCh chan error |
|
24a217e…
|
lmata
|
37 |
|
|
24a217e…
|
lmata
|
38 |
registeredByRelay bool |
|
9f5df4d…
|
lmata
|
39 |
connectedAt time.Time |
|
24a217e…
|
lmata
|
40 |
} |
|
24a217e…
|
lmata
|
41 |
|
|
24a217e…
|
lmata
|
42 |
func newIRCConnector(cfg Config) (Connector, error) { |
|
24a217e…
|
lmata
|
43 |
if cfg.IRC.Addr == "" { |
|
24a217e…
|
lmata
|
44 |
return nil, fmt.Errorf("sessionrelay: irc transport requires irc addr") |
|
24a217e…
|
lmata
|
45 |
} |
|
24a217e…
|
lmata
|
46 |
return &ircConnector{ |
|
24a217e…
|
lmata
|
47 |
http: cfg.HTTPClient, |
|
24a217e…
|
lmata
|
48 |
apiURL: stringsTrimRightSlash(cfg.URL), |
|
24a217e…
|
lmata
|
49 |
token: cfg.Token, |
|
1d3caa2…
|
lmata
|
50 |
primary: normalizeChannel(cfg.Channel), |
|
24a217e…
|
lmata
|
51 |
nick: cfg.Nick, |
|
24a217e…
|
lmata
|
52 |
addr: cfg.IRC.Addr, |
|
24a217e…
|
lmata
|
53 |
agentType: cfg.IRC.AgentType, |
|
24a217e…
|
lmata
|
54 |
pass: cfg.IRC.Pass, |
|
24a217e…
|
lmata
|
55 |
deleteOnClose: cfg.IRC.DeleteOnClose, |
|
a027855…
|
noreply
|
56 |
envelopeMode: cfg.IRC.EnvelopeMode, |
|
1d3caa2…
|
lmata
|
57 |
channels: append([]string(nil), cfg.Channels...), |
|
24a217e…
|
lmata
|
58 |
messages: make([]Message, 0, defaultBufferSize), |
|
24a217e…
|
lmata
|
59 |
errCh: make(chan error, 1), |
|
24a217e…
|
lmata
|
60 |
}, nil |
|
24a217e…
|
lmata
|
61 |
} |
|
763c873…
|
lmata
|
62 |
|
|
763c873…
|
lmata
|
63 |
const ( |
|
763c873…
|
lmata
|
64 |
ircReconnectMin = 2 * time.Second |
|
763c873…
|
lmata
|
65 |
ircReconnectMax = 30 * time.Second |
|
763c873…
|
lmata
|
66 |
) |
|
24a217e…
|
lmata
|
67 |
|
|
24a217e…
|
lmata
|
68 |
func (c *ircConnector) Connect(ctx context.Context) error { |
|
24a217e…
|
lmata
|
69 |
if err := c.ensureCredentials(ctx); err != nil { |
|
24a217e…
|
lmata
|
70 |
return err |
|
24a217e…
|
lmata
|
71 |
} |
|
24a217e…
|
lmata
|
72 |
|
|
24a217e…
|
lmata
|
73 |
host, port, err := splitHostPort(c.addr) |
|
24a217e…
|
lmata
|
74 |
if err != nil { |
|
24a217e…
|
lmata
|
75 |
return err |
|
24a217e…
|
lmata
|
76 |
} |
|
24a217e…
|
lmata
|
77 |
|
|
24a217e…
|
lmata
|
78 |
joined := make(chan struct{}) |
|
24a217e…
|
lmata
|
79 |
var joinOnce sync.Once |
|
763c873…
|
lmata
|
80 |
c.dial(host, port, func() { joinOnce.Do(func() { close(joined) }) }) |
|
763c873…
|
lmata
|
81 |
|
|
763c873…
|
lmata
|
82 |
select { |
|
763c873…
|
lmata
|
83 |
case <-ctx.Done(): |
|
763c873…
|
lmata
|
84 |
c.mu.Lock() |
|
763c873…
|
lmata
|
85 |
if c.client != nil { |
|
763c873…
|
lmata
|
86 |
c.client.Close() |
|
763c873…
|
lmata
|
87 |
} |
|
763c873…
|
lmata
|
88 |
c.mu.Unlock() |
|
763c873…
|
lmata
|
89 |
return ctx.Err() |
|
763c873…
|
lmata
|
90 |
case err := <-c.errCh: |
|
763c873…
|
lmata
|
91 |
_ = c.cleanupRegistration(context.Background()) |
|
763c873…
|
lmata
|
92 |
return fmt.Errorf("sessionrelay: irc connect: %w", err) |
|
763c873…
|
lmata
|
93 |
case <-joined: |
|
763c873…
|
lmata
|
94 |
go c.keepAlive(ctx, host, port) |
|
763c873…
|
lmata
|
95 |
return nil |
|
763c873…
|
lmata
|
96 |
} |
|
763c873…
|
lmata
|
97 |
} |
|
763c873…
|
lmata
|
98 |
|
|
763c873…
|
lmata
|
99 |
// dial creates a fresh girc client, wires up handlers, and starts the |
|
763c873…
|
lmata
|
100 |
// connection goroutine. onJoined fires once when the primary channel is |
|
763c873…
|
lmata
|
101 |
// joined — used as the initial-connect signal and to reset backoff on |
|
763c873…
|
lmata
|
102 |
// successful reconnects. |
|
763c873…
|
lmata
|
103 |
func (c *ircConnector) dial(host string, port int, onJoined func()) { |
|
24a217e…
|
lmata
|
104 |
client := girc.New(girc.Config{ |
|
81587e6…
|
lmata
|
105 |
Server: host, |
|
81587e6…
|
lmata
|
106 |
Port: port, |
|
81587e6…
|
lmata
|
107 |
Nick: c.nick, |
|
81587e6…
|
lmata
|
108 |
User: c.nick, |
|
81587e6…
|
lmata
|
109 |
Name: c.nick + " (session relay)", |
|
81587e6…
|
lmata
|
110 |
SASL: &girc.SASLPlain{User: c.nick, Pass: c.pass}, |
|
81587e6…
|
lmata
|
111 |
PingDelay: 30 * time.Second, |
|
81587e6…
|
lmata
|
112 |
PingTimeout: 30 * time.Second, |
|
24a217e…
|
lmata
|
113 |
}) |
|
24a217e…
|
lmata
|
114 |
client.Handlers.AddBg(girc.CONNECTED, func(cl *girc.Client, _ girc.Event) { |
|
9f5df4d…
|
lmata
|
115 |
c.mu.Lock() |
|
9f5df4d…
|
lmata
|
116 |
c.connectedAt = time.Now() |
|
9f5df4d…
|
lmata
|
117 |
c.mu.Unlock() |
|
1d3caa2…
|
lmata
|
118 |
for _, channel := range c.Channels() { |
|
1d3caa2…
|
lmata
|
119 |
cl.Cmd.Join(channel) |
|
1d3caa2…
|
lmata
|
120 |
} |
|
24a217e…
|
lmata
|
121 |
}) |
|
24a217e…
|
lmata
|
122 |
client.Handlers.AddBg(girc.JOIN, func(_ *girc.Client, e girc.Event) { |
|
24a217e…
|
lmata
|
123 |
if len(e.Params) < 1 || e.Source == nil || e.Source.Name != c.nick { |
|
24a217e…
|
lmata
|
124 |
return |
|
24a217e…
|
lmata
|
125 |
} |
|
1d3caa2…
|
lmata
|
126 |
if normalizeChannel(e.Params[0]) != c.primary { |
|
24a217e…
|
lmata
|
127 |
return |
|
24a217e…
|
lmata
|
128 |
} |
|
763c873…
|
lmata
|
129 |
if onJoined != nil { |
|
763c873…
|
lmata
|
130 |
onJoined() |
|
763c873…
|
lmata
|
131 |
} |
|
24a217e…
|
lmata
|
132 |
}) |
|
c3c693d…
|
noreply
|
133 |
client.Handlers.AddBg(girc.PRIVMSG, func(cl *girc.Client, e girc.Event) { |
|
24a217e…
|
lmata
|
134 |
if len(e.Params) < 1 || e.Source == nil { |
|
24a217e…
|
lmata
|
135 |
return |
|
24a217e…
|
lmata
|
136 |
} |
|
24a217e…
|
lmata
|
137 |
target := normalizeChannel(e.Params[0]) |
|
1d3caa2…
|
lmata
|
138 |
if !c.hasChannel(target) { |
|
24a217e…
|
lmata
|
139 |
return |
|
24a217e…
|
lmata
|
140 |
} |
|
f64fe5f…
|
noreply
|
141 |
// Prefer account-tag (IRCv3) over source nick. |
|
24a217e…
|
lmata
|
142 |
sender := e.Source.Name |
|
f64fe5f…
|
noreply
|
143 |
if acct, ok := e.Tags.Get("account"); ok && acct != "" { |
|
f64fe5f…
|
noreply
|
144 |
sender = acct |
|
f64fe5f…
|
noreply
|
145 |
} |
|
24a217e…
|
lmata
|
146 |
text := strings.TrimSpace(e.Last()) |
|
c3c693d…
|
noreply
|
147 |
// RELAYMSG: server delivers as "nick/bridge" — strip the relay suffix. |
|
c3c693d…
|
noreply
|
148 |
if sep, ok := cl.GetServerOption("RELAYMSG"); ok && sep != "" { |
|
c3c693d…
|
noreply
|
149 |
if idx := strings.Index(sender, sep); idx != -1 { |
|
c3c693d…
|
noreply
|
150 |
sender = sender[:idx] |
|
c3c693d…
|
noreply
|
151 |
} |
|
c3c693d…
|
noreply
|
152 |
} |
|
c3c693d…
|
noreply
|
153 |
// Fallback: parse legacy [nick] prefix from bridge bot. |
|
24a217e…
|
lmata
|
154 |
if sender == "bridge" && strings.HasPrefix(text, "[") { |
|
24a217e…
|
lmata
|
155 |
if end := strings.Index(text, "] "); end != -1 { |
|
24a217e…
|
lmata
|
156 |
sender = text[1:end] |
|
24a217e…
|
lmata
|
157 |
text = strings.TrimSpace(text[end+2:]) |
|
24a217e…
|
lmata
|
158 |
} |
|
24a217e…
|
lmata
|
159 |
} |
|
f64fe5f…
|
noreply
|
160 |
// Use server-time when available; fall back to local clock. |
|
f64fe5f…
|
noreply
|
161 |
at := e.Timestamp |
|
f64fe5f…
|
noreply
|
162 |
if at.IsZero() { |
|
f64fe5f…
|
noreply
|
163 |
at = time.Now() |
|
f64fe5f…
|
noreply
|
164 |
} |
|
f64fe5f…
|
noreply
|
165 |
var msgID string |
|
f64fe5f…
|
noreply
|
166 |
if id, ok := e.Tags.Get("msgid"); ok { |
|
f64fe5f…
|
noreply
|
167 |
msgID = id |
|
f64fe5f…
|
noreply
|
168 |
} |
|
f64fe5f…
|
noreply
|
169 |
c.appendMessage(Message{At: at, Channel: target, Nick: sender, Text: text, MsgID: msgID}) |
|
24a217e…
|
lmata
|
170 |
}) |
|
24a217e…
|
lmata
|
171 |
|
|
763c873…
|
lmata
|
172 |
c.mu.Lock() |
|
24a217e…
|
lmata
|
173 |
c.client = client |
|
763c873…
|
lmata
|
174 |
c.mu.Unlock() |
|
763c873…
|
lmata
|
175 |
|
|
24a217e…
|
lmata
|
176 |
go func() { |
|
81587e6…
|
lmata
|
177 |
err := client.Connect() |
|
81587e6…
|
lmata
|
178 |
if err == nil { |
|
81587e6…
|
lmata
|
179 |
err = fmt.Errorf("connection closed") |
|
81587e6…
|
lmata
|
180 |
} |
|
81587e6…
|
lmata
|
181 |
select { |
|
81587e6…
|
lmata
|
182 |
case c.errCh <- err: |
|
81587e6…
|
lmata
|
183 |
default: |
|
24a217e…
|
lmata
|
184 |
} |
|
24a217e…
|
lmata
|
185 |
}() |
|
763c873…
|
lmata
|
186 |
} |
|
763c873…
|
lmata
|
187 |
|
|
763c873…
|
lmata
|
188 |
// keepAlive watches for connection errors and redials with exponential backoff. |
|
763c873…
|
lmata
|
189 |
// It stops when ctx is cancelled (i.e. the broker is shutting down). |
|
763c873…
|
lmata
|
190 |
func (c *ircConnector) keepAlive(ctx context.Context, host string, port int) { |
|
763c873…
|
lmata
|
191 |
wait := ircReconnectMin |
|
763c873…
|
lmata
|
192 |
for { |
|
763c873…
|
lmata
|
193 |
select { |
|
763c873…
|
lmata
|
194 |
case <-ctx.Done(): |
|
763c873…
|
lmata
|
195 |
return |
|
5336d33…
|
lmata
|
196 |
case err := <-c.errCh: |
|
5336d33…
|
lmata
|
197 |
fmt.Fprintf(os.Stderr, "sessionrelay: connection lost: %v\n", err) |
|
763c873…
|
lmata
|
198 |
} |
|
763c873…
|
lmata
|
199 |
|
|
763c873…
|
lmata
|
200 |
// Close the dead client before replacing it. |
|
763c873…
|
lmata
|
201 |
c.mu.Lock() |
|
763c873…
|
lmata
|
202 |
if c.client != nil { |
|
763c873…
|
lmata
|
203 |
c.client.Close() |
|
763c873…
|
lmata
|
204 |
c.client = nil |
|
763c873…
|
lmata
|
205 |
} |
|
763c873…
|
lmata
|
206 |
c.mu.Unlock() |
|
763c873…
|
lmata
|
207 |
|
|
763c873…
|
lmata
|
208 |
select { |
|
763c873…
|
lmata
|
209 |
case <-ctx.Done(): |
|
763c873…
|
lmata
|
210 |
return |
|
763c873…
|
lmata
|
211 |
case <-time.After(wait): |
|
763c873…
|
lmata
|
212 |
} |
|
5336d33…
|
lmata
|
213 |
fmt.Fprintf(os.Stderr, "sessionrelay: reconnecting (backoff %v)...\n", wait) |
|
54be8d2…
|
lmata
|
214 |
|
|
54be8d2…
|
lmata
|
215 |
// Re-register to get fresh SASL credentials in case the server |
|
54be8d2…
|
lmata
|
216 |
// restarted and the Ergo database was reset. |
|
54be8d2…
|
lmata
|
217 |
c.pass = "" // clear stale creds |
|
54be8d2…
|
lmata
|
218 |
if err := c.ensureCredentials(ctx); err != nil { |
|
54be8d2…
|
lmata
|
219 |
fmt.Fprintf(os.Stderr, "sessionrelay: reconnect credential refresh failed: %v\n", err) |
|
54be8d2…
|
lmata
|
220 |
wait = min(wait*2, ircReconnectMax) |
|
54be8d2…
|
lmata
|
221 |
// Push a synthetic error so the loop retries. |
|
54be8d2…
|
lmata
|
222 |
go func() { |
|
54be8d2…
|
lmata
|
223 |
select { |
|
54be8d2…
|
lmata
|
224 |
case c.errCh <- err: |
|
54be8d2…
|
lmata
|
225 |
default: |
|
54be8d2…
|
lmata
|
226 |
} |
|
54be8d2…
|
lmata
|
227 |
}() |
|
54be8d2…
|
lmata
|
228 |
continue |
|
54be8d2…
|
lmata
|
229 |
} |
|
d924aea…
|
lmata
|
230 |
fmt.Fprintf(os.Stderr, "sessionrelay: credentials refreshed, dialing...\n") |
|
54be8d2…
|
lmata
|
231 |
|
|
763c873…
|
lmata
|
232 |
wait = min(wait*2, ircReconnectMax) |
|
5336d33…
|
lmata
|
233 |
c.dial(host, port, func() { |
|
5336d33…
|
lmata
|
234 |
wait = ircReconnectMin |
|
5336d33…
|
lmata
|
235 |
fmt.Fprintf(os.Stderr, "sessionrelay: reconnected successfully\n") |
|
5336d33…
|
lmata
|
236 |
}) |
|
24a217e…
|
lmata
|
237 |
} |
|
24a217e…
|
lmata
|
238 |
} |
|
24a217e…
|
lmata
|
239 |
|
|
24a217e…
|
lmata
|
240 |
func (c *ircConnector) Post(_ context.Context, text string) error { |
|
f3c383e…
|
noreply
|
241 |
return c.PostWithMeta(context.Background(), text, nil) |
|
f3c383e…
|
noreply
|
242 |
} |
|
f3c383e…
|
noreply
|
243 |
|
|
f3c383e…
|
noreply
|
244 |
func (c *ircConnector) PostTo(_ context.Context, channel, text string) error { |
|
f3c383e…
|
noreply
|
245 |
return c.PostToWithMeta(context.Background(), channel, text, nil) |
|
f3c383e…
|
noreply
|
246 |
} |
|
f3c383e…
|
noreply
|
247 |
|
|
a027855…
|
noreply
|
248 |
// PostWithMeta sends text to all channels. |
|
a027855…
|
noreply
|
249 |
// In envelope mode, wraps the message in a protocol.Envelope JSON. |
|
a027855…
|
noreply
|
250 |
func (c *ircConnector) PostWithMeta(_ context.Context, text string, meta json.RawMessage) error { |
|
763c873…
|
lmata
|
251 |
c.mu.RLock() |
|
763c873…
|
lmata
|
252 |
client := c.client |
|
763c873…
|
lmata
|
253 |
c.mu.RUnlock() |
|
763c873…
|
lmata
|
254 |
if client == nil { |
|
763c873…
|
lmata
|
255 |
return fmt.Errorf("sessionrelay: irc client not connected") |
|
763c873…
|
lmata
|
256 |
} |
|
a027855…
|
noreply
|
257 |
msg := c.formatMessage(text, meta) |
|
763c873…
|
lmata
|
258 |
for _, channel := range c.Channels() { |
|
a027855…
|
noreply
|
259 |
client.Cmd.Message(channel, msg) |
|
763c873…
|
lmata
|
260 |
} |
|
763c873…
|
lmata
|
261 |
return nil |
|
763c873…
|
lmata
|
262 |
} |
|
763c873…
|
lmata
|
263 |
|
|
a027855…
|
noreply
|
264 |
// PostToWithMeta sends text to a specific channel. |
|
a027855…
|
noreply
|
265 |
func (c *ircConnector) PostToWithMeta(_ context.Context, channel, text string, meta json.RawMessage) error { |
|
763c873…
|
lmata
|
266 |
c.mu.RLock() |
|
763c873…
|
lmata
|
267 |
client := c.client |
|
763c873…
|
lmata
|
268 |
c.mu.RUnlock() |
|
763c873…
|
lmata
|
269 |
if client == nil { |
|
24a217e…
|
lmata
|
270 |
return fmt.Errorf("sessionrelay: irc client not connected") |
|
24a217e…
|
lmata
|
271 |
} |
|
1d3caa2…
|
lmata
|
272 |
channel = normalizeChannel(channel) |
|
1d3caa2…
|
lmata
|
273 |
if channel == "" { |
|
1d3caa2…
|
lmata
|
274 |
return fmt.Errorf("sessionrelay: post channel is required") |
|
1d3caa2…
|
lmata
|
275 |
} |
|
a027855…
|
noreply
|
276 |
client.Cmd.Message(channel, c.formatMessage(text, meta)) |
|
24a217e…
|
lmata
|
277 |
return nil |
|
a027855…
|
noreply
|
278 |
} |
|
a027855…
|
noreply
|
279 |
|
|
a027855…
|
noreply
|
280 |
// formatMessage wraps text in a JSON envelope when envelope mode is enabled. |
|
a027855…
|
noreply
|
281 |
func (c *ircConnector) formatMessage(text string, meta json.RawMessage) string { |
|
a027855…
|
noreply
|
282 |
if !c.envelopeMode { |
|
a027855…
|
noreply
|
283 |
return text |
|
a027855…
|
noreply
|
284 |
} |
|
a027855…
|
noreply
|
285 |
env := map[string]any{ |
|
a027855…
|
noreply
|
286 |
"v": 1, |
|
a027855…
|
noreply
|
287 |
"type": "relay.message", |
|
a027855…
|
noreply
|
288 |
"from": c.nick, |
|
a027855…
|
noreply
|
289 |
"ts": time.Now().UnixMilli(), |
|
a027855…
|
noreply
|
290 |
"payload": map[string]any{ |
|
a027855…
|
noreply
|
291 |
"text": text, |
|
a027855…
|
noreply
|
292 |
}, |
|
a027855…
|
noreply
|
293 |
} |
|
a027855…
|
noreply
|
294 |
if len(meta) > 0 { |
|
a027855…
|
noreply
|
295 |
env["payload"] = json.RawMessage(meta) |
|
a027855…
|
noreply
|
296 |
} |
|
a027855…
|
noreply
|
297 |
data, err := json.Marshal(env) |
|
a027855…
|
noreply
|
298 |
if err != nil { |
|
a027855…
|
noreply
|
299 |
return text // fallback to plain text |
|
a027855…
|
noreply
|
300 |
} |
|
a027855…
|
noreply
|
301 |
return string(data) |
|
24a217e…
|
lmata
|
302 |
} |
|
24a217e…
|
lmata
|
303 |
|
|
24a217e…
|
lmata
|
304 |
func (c *ircConnector) MessagesSince(_ context.Context, since time.Time) ([]Message, error) { |
|
24a217e…
|
lmata
|
305 |
c.mu.RLock() |
|
24a217e…
|
lmata
|
306 |
defer c.mu.RUnlock() |
|
24a217e…
|
lmata
|
307 |
|
|
24a217e…
|
lmata
|
308 |
out := make([]Message, 0, len(c.messages)) |
|
24a217e…
|
lmata
|
309 |
for _, msg := range c.messages { |
|
24a217e…
|
lmata
|
310 |
if msg.At.After(since) { |
|
24a217e…
|
lmata
|
311 |
out = append(out, msg) |
|
24a217e…
|
lmata
|
312 |
} |
|
24a217e…
|
lmata
|
313 |
} |
|
24a217e…
|
lmata
|
314 |
return out, nil |
|
24a217e…
|
lmata
|
315 |
} |
|
24a217e…
|
lmata
|
316 |
|
|
9f5df4d…
|
lmata
|
317 |
func (c *ircConnector) Touch(ctx context.Context) error { |
|
9f5df4d…
|
lmata
|
318 |
c.mu.RLock() |
|
9f5df4d…
|
lmata
|
319 |
client := c.client |
|
9f5df4d…
|
lmata
|
320 |
c.mu.RUnlock() |
|
9f5df4d…
|
lmata
|
321 |
|
|
9f5df4d…
|
lmata
|
322 |
if client == nil { |
|
9f5df4d…
|
lmata
|
323 |
return fmt.Errorf("sessionrelay: not connected") |
|
9f5df4d…
|
lmata
|
324 |
} |
|
9f5df4d…
|
lmata
|
325 |
|
|
9f5df4d…
|
lmata
|
326 |
if !client.IsConnected() { |
|
9f5df4d…
|
lmata
|
327 |
client.Close() |
|
9f5df4d…
|
lmata
|
328 |
select { |
|
9f5df4d…
|
lmata
|
329 |
case c.errCh <- fmt.Errorf("touch: client disconnected"): |
|
9f5df4d…
|
lmata
|
330 |
default: |
|
9f5df4d…
|
lmata
|
331 |
} |
|
9f5df4d…
|
lmata
|
332 |
return fmt.Errorf("sessionrelay: disconnected") |
|
9f5df4d…
|
lmata
|
333 |
} |
|
9f5df4d…
|
lmata
|
334 |
|
|
9f5df4d…
|
lmata
|
335 |
// Detect server restarts by checking the server's startup time. |
|
9f5df4d…
|
lmata
|
336 |
// If the server started after our IRC connection was established, |
|
9f5df4d…
|
lmata
|
337 |
// the IRC connection is stale and must be recycled. |
|
9f5df4d…
|
lmata
|
338 |
if c.apiURL != "" && c.token != "" { |
|
9f5df4d…
|
lmata
|
339 |
probeCtx, cancel := context.WithTimeout(ctx, 5*time.Second) |
|
9f5df4d…
|
lmata
|
340 |
defer cancel() |
|
9f5df4d…
|
lmata
|
341 |
req, err := http.NewRequestWithContext(probeCtx, http.MethodGet, c.apiURL+"/v1/status", nil) |
|
9f5df4d…
|
lmata
|
342 |
if err != nil { |
|
9f5df4d…
|
lmata
|
343 |
return nil |
|
9f5df4d…
|
lmata
|
344 |
} |
|
9f5df4d…
|
lmata
|
345 |
req.Header.Set("Authorization", "Bearer "+c.token) |
|
9f5df4d…
|
lmata
|
346 |
resp, err := http.DefaultClient.Do(req) |
|
9f5df4d…
|
lmata
|
347 |
if err != nil { |
|
9f5df4d…
|
lmata
|
348 |
return nil // API unreachable, transient |
|
9f5df4d…
|
lmata
|
349 |
} |
|
9f5df4d…
|
lmata
|
350 |
defer resp.Body.Close() |
|
9f5df4d…
|
lmata
|
351 |
|
|
9f5df4d…
|
lmata
|
352 |
var status struct { |
|
9f5df4d…
|
lmata
|
353 |
Started string `json:"started"` |
|
9f5df4d…
|
lmata
|
354 |
} |
|
9f5df4d…
|
lmata
|
355 |
if err := json.NewDecoder(resp.Body).Decode(&status); err == nil && status.Started != "" { |
|
9f5df4d…
|
lmata
|
356 |
serverStart, err := time.Parse(time.RFC3339Nano, status.Started) |
|
9f5df4d…
|
lmata
|
357 |
if err == nil { |
|
9f5df4d…
|
lmata
|
358 |
c.mu.RLock() |
|
9f5df4d…
|
lmata
|
359 |
connectedAt := c.connectedAt |
|
9f5df4d…
|
lmata
|
360 |
c.mu.RUnlock() |
|
9f5df4d…
|
lmata
|
361 |
if !connectedAt.IsZero() && serverStart.After(connectedAt) { |
|
9f5df4d…
|
lmata
|
362 |
// Server restarted after we connected — our IRC session is dead. |
|
9f5df4d…
|
lmata
|
363 |
client.Close() |
|
9f5df4d…
|
lmata
|
364 |
select { |
|
9f5df4d…
|
lmata
|
365 |
case c.errCh <- fmt.Errorf("touch: server restarted (started %s, connected %s)", serverStart.Format(time.RFC3339), connectedAt.Format(time.RFC3339)): |
|
9f5df4d…
|
lmata
|
366 |
default: |
|
9f5df4d…
|
lmata
|
367 |
} |
|
9f5df4d…
|
lmata
|
368 |
return fmt.Errorf("sessionrelay: server restarted") |
|
9f5df4d…
|
lmata
|
369 |
} |
|
9f5df4d…
|
lmata
|
370 |
} |
|
9f5df4d…
|
lmata
|
371 |
} |
|
9f5df4d…
|
lmata
|
372 |
|
|
9f5df4d…
|
lmata
|
373 |
// Also touch presence so the server tracks us. |
|
9f5df4d…
|
lmata
|
374 |
presenceReq, _ := http.NewRequestWithContext(probeCtx, http.MethodPost, |
|
9f5df4d…
|
lmata
|
375 |
c.apiURL+"/v1/channels/"+channelSlug(c.primary)+"/presence", |
|
9f5df4d…
|
lmata
|
376 |
bytes.NewReader([]byte(`{"nick":"`+c.nick+`"}`))) |
|
9f5df4d…
|
lmata
|
377 |
if presenceReq != nil { |
|
9f5df4d…
|
lmata
|
378 |
presenceReq.Header.Set("Authorization", "Bearer "+c.token) |
|
9f5df4d…
|
lmata
|
379 |
presenceReq.Header.Set("Content-Type", "application/json") |
|
9f5df4d…
|
lmata
|
380 |
pr, err := http.DefaultClient.Do(presenceReq) |
|
9f5df4d…
|
lmata
|
381 |
if pr != nil { |
|
9f5df4d…
|
lmata
|
382 |
pr.Body.Close() |
|
9f5df4d…
|
lmata
|
383 |
} |
|
9f5df4d…
|
lmata
|
384 |
_ = err |
|
9f5df4d…
|
lmata
|
385 |
} |
|
9f5df4d…
|
lmata
|
386 |
} |
|
9f5df4d…
|
lmata
|
387 |
|
|
1d3caa2…
|
lmata
|
388 |
return nil |
|
1d3caa2…
|
lmata
|
389 |
} |
|
1d3caa2…
|
lmata
|
390 |
|
|
87e6978…
|
lmata
|
391 |
func (c *ircConnector) JoinChannel(ctx context.Context, channel string) error { |
|
1d3caa2…
|
lmata
|
392 |
channel = normalizeChannel(channel) |
|
1d3caa2…
|
lmata
|
393 |
if channel == "" { |
|
1d3caa2…
|
lmata
|
394 |
return fmt.Errorf("sessionrelay: join channel is required") |
|
1d3caa2…
|
lmata
|
395 |
} |
|
1d3caa2…
|
lmata
|
396 |
c.mu.Lock() |
|
1d3caa2…
|
lmata
|
397 |
if slices.Contains(c.channels, channel) { |
|
1d3caa2…
|
lmata
|
398 |
c.mu.Unlock() |
|
1d3caa2…
|
lmata
|
399 |
return nil |
|
1d3caa2…
|
lmata
|
400 |
} |
|
1d3caa2…
|
lmata
|
401 |
c.channels = append(c.channels, channel) |
|
1d3caa2…
|
lmata
|
402 |
client := c.client |
|
1d3caa2…
|
lmata
|
403 |
c.mu.Unlock() |
|
1d3caa2…
|
lmata
|
404 |
if client != nil { |
|
1d3caa2…
|
lmata
|
405 |
client.Cmd.Join(channel) |
|
1d3caa2…
|
lmata
|
406 |
} |
|
87e6978…
|
lmata
|
407 |
go c.syncChannelsToRegistry(ctx) |
|
1d3caa2…
|
lmata
|
408 |
return nil |
|
1d3caa2…
|
lmata
|
409 |
} |
|
1d3caa2…
|
lmata
|
410 |
|
|
87e6978…
|
lmata
|
411 |
func (c *ircConnector) PartChannel(ctx context.Context, channel string) error { |
|
1d3caa2…
|
lmata
|
412 |
channel = normalizeChannel(channel) |
|
1d3caa2…
|
lmata
|
413 |
if channel == "" { |
|
1d3caa2…
|
lmata
|
414 |
return fmt.Errorf("sessionrelay: part channel is required") |
|
1d3caa2…
|
lmata
|
415 |
} |
|
1d3caa2…
|
lmata
|
416 |
if channel == c.primary { |
|
1d3caa2…
|
lmata
|
417 |
return fmt.Errorf("sessionrelay: cannot part control channel %s", channel) |
|
1d3caa2…
|
lmata
|
418 |
} |
|
1d3caa2…
|
lmata
|
419 |
c.mu.Lock() |
|
1d3caa2…
|
lmata
|
420 |
if !slices.Contains(c.channels, channel) { |
|
1d3caa2…
|
lmata
|
421 |
c.mu.Unlock() |
|
1d3caa2…
|
lmata
|
422 |
return nil |
|
1d3caa2…
|
lmata
|
423 |
} |
|
1d3caa2…
|
lmata
|
424 |
filtered := c.channels[:0] |
|
1d3caa2…
|
lmata
|
425 |
for _, existing := range c.channels { |
|
1d3caa2…
|
lmata
|
426 |
if existing == channel { |
|
1d3caa2…
|
lmata
|
427 |
continue |
|
1d3caa2…
|
lmata
|
428 |
} |
|
1d3caa2…
|
lmata
|
429 |
filtered = append(filtered, existing) |
|
1d3caa2…
|
lmata
|
430 |
} |
|
1d3caa2…
|
lmata
|
431 |
c.channels = filtered |
|
1d3caa2…
|
lmata
|
432 |
client := c.client |
|
1d3caa2…
|
lmata
|
433 |
c.mu.Unlock() |
|
1d3caa2…
|
lmata
|
434 |
if client != nil { |
|
1d3caa2…
|
lmata
|
435 |
client.Cmd.Part(channel) |
|
1d3caa2…
|
lmata
|
436 |
} |
|
87e6978…
|
lmata
|
437 |
go c.syncChannelsToRegistry(ctx) |
|
24a217e…
|
lmata
|
438 |
return nil |
|
87e6978…
|
lmata
|
439 |
} |
|
87e6978…
|
lmata
|
440 |
|
|
87e6978…
|
lmata
|
441 |
// syncChannelsToRegistry PATCHes the agent's channel list in the registry so |
|
87e6978…
|
lmata
|
442 |
// the Agents tab stays in sync after live /join and /part commands. |
|
87e6978…
|
lmata
|
443 |
func (c *ircConnector) syncChannelsToRegistry(ctx context.Context) { |
|
87e6978…
|
lmata
|
444 |
if c.apiURL == "" || c.token == "" || c.nick == "" { |
|
87e6978…
|
lmata
|
445 |
return |
|
87e6978…
|
lmata
|
446 |
} |
|
87e6978…
|
lmata
|
447 |
channels := c.Channels() |
|
87e6978…
|
lmata
|
448 |
body, err := json.Marshal(map[string]any{"channels": channels}) |
|
87e6978…
|
lmata
|
449 |
if err != nil { |
|
87e6978…
|
lmata
|
450 |
return |
|
87e6978…
|
lmata
|
451 |
} |
|
87e6978…
|
lmata
|
452 |
req, err := http.NewRequestWithContext(ctx, http.MethodPatch, c.apiURL+"/v1/agents/"+c.nick, bytes.NewReader(body)) |
|
87e6978…
|
lmata
|
453 |
if err != nil { |
|
87e6978…
|
lmata
|
454 |
return |
|
87e6978…
|
lmata
|
455 |
} |
|
87e6978…
|
lmata
|
456 |
req.Header.Set("Authorization", "Bearer "+c.token) |
|
87e6978…
|
lmata
|
457 |
req.Header.Set("Content-Type", "application/json") |
|
87e6978…
|
lmata
|
458 |
resp, err := c.http.Do(req) |
|
87e6978…
|
lmata
|
459 |
if err != nil { |
|
87e6978…
|
lmata
|
460 |
return |
|
87e6978…
|
lmata
|
461 |
} |
|
87e6978…
|
lmata
|
462 |
resp.Body.Close() |
|
1d3caa2…
|
lmata
|
463 |
} |
|
1d3caa2…
|
lmata
|
464 |
|
|
1d3caa2…
|
lmata
|
465 |
func (c *ircConnector) Channels() []string { |
|
1d3caa2…
|
lmata
|
466 |
c.mu.RLock() |
|
1d3caa2…
|
lmata
|
467 |
defer c.mu.RUnlock() |
|
1d3caa2…
|
lmata
|
468 |
return append([]string(nil), c.channels...) |
|
1d3caa2…
|
lmata
|
469 |
} |
|
1d3caa2…
|
lmata
|
470 |
|
|
1d3caa2…
|
lmata
|
471 |
func (c *ircConnector) ControlChannel() string { |
|
1d3caa2…
|
lmata
|
472 |
return c.primary |
|
1d3caa2…
|
lmata
|
473 |
} |
|
1d3caa2…
|
lmata
|
474 |
|
|
24a217e…
|
lmata
|
475 |
func (c *ircConnector) Close(ctx context.Context) error { |
|
763c873…
|
lmata
|
476 |
c.mu.Lock() |
|
24a217e…
|
lmata
|
477 |
if c.client != nil { |
|
24a217e…
|
lmata
|
478 |
c.client.Close() |
|
763c873…
|
lmata
|
479 |
c.client = nil |
|
24a217e…
|
lmata
|
480 |
} |
|
763c873…
|
lmata
|
481 |
c.mu.Unlock() |
|
24a217e…
|
lmata
|
482 |
return c.cleanupRegistration(ctx) |
|
24a217e…
|
lmata
|
483 |
} |
|
24a217e…
|
lmata
|
484 |
|
|
24a217e…
|
lmata
|
485 |
func (c *ircConnector) appendMessage(msg Message) { |
|
24a217e…
|
lmata
|
486 |
c.mu.Lock() |
|
24a217e…
|
lmata
|
487 |
defer c.mu.Unlock() |
|
24a217e…
|
lmata
|
488 |
if len(c.messages) == defaultBufferSize { |
|
24a217e…
|
lmata
|
489 |
copy(c.messages, c.messages[1:]) |
|
24a217e…
|
lmata
|
490 |
c.messages = c.messages[:defaultBufferSize-1] |
|
24a217e…
|
lmata
|
491 |
} |
|
24a217e…
|
lmata
|
492 |
c.messages = append(c.messages, msg) |
|
24a217e…
|
lmata
|
493 |
} |
|
24a217e…
|
lmata
|
494 |
|
|
24a217e…
|
lmata
|
495 |
func (c *ircConnector) ensureCredentials(ctx context.Context) error { |
|
24a217e…
|
lmata
|
496 |
if c.pass != "" { |
|
24a217e…
|
lmata
|
497 |
return nil |
|
24a217e…
|
lmata
|
498 |
} |
|
24a217e…
|
lmata
|
499 |
if c.apiURL == "" || c.token == "" { |
|
24a217e…
|
lmata
|
500 |
return fmt.Errorf("sessionrelay: irc transport requires irc pass or api url/token for auto-registration") |
|
24a217e…
|
lmata
|
501 |
} |
|
24a217e…
|
lmata
|
502 |
|
|
24a217e…
|
lmata
|
503 |
created, pass, err := c.registerOrRotate(ctx) |
|
24a217e…
|
lmata
|
504 |
if err != nil { |
|
24a217e…
|
lmata
|
505 |
return err |
|
24a217e…
|
lmata
|
506 |
} |
|
24a217e…
|
lmata
|
507 |
c.pass = pass |
|
24a217e…
|
lmata
|
508 |
c.registeredByRelay = created |
|
24a217e…
|
lmata
|
509 |
return nil |
|
24a217e…
|
lmata
|
510 |
} |
|
24a217e…
|
lmata
|
511 |
|
|
24a217e…
|
lmata
|
512 |
func (c *ircConnector) registerOrRotate(ctx context.Context) (bool, string, error) { |
|
24a217e…
|
lmata
|
513 |
body, _ := json.Marshal(map[string]any{ |
|
24a217e…
|
lmata
|
514 |
"nick": c.nick, |
|
24a217e…
|
lmata
|
515 |
"type": c.agentType, |
|
1d3caa2…
|
lmata
|
516 |
"channels": c.Channels(), |
|
24a217e…
|
lmata
|
517 |
}) |
|
24a217e…
|
lmata
|
518 |
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.apiURL+"/v1/agents/register", bytes.NewReader(body)) |
|
24a217e…
|
lmata
|
519 |
if err != nil { |
|
24a217e…
|
lmata
|
520 |
return false, "", err |
|
24a217e…
|
lmata
|
521 |
} |
|
24a217e…
|
lmata
|
522 |
req.Header.Set("Authorization", "Bearer "+c.token) |
|
24a217e…
|
lmata
|
523 |
req.Header.Set("Content-Type", "application/json") |
|
24a217e…
|
lmata
|
524 |
|
|
24a217e…
|
lmata
|
525 |
resp, err := c.http.Do(req) |
|
24a217e…
|
lmata
|
526 |
if err != nil { |
|
24a217e…
|
lmata
|
527 |
return false, "", err |
|
24a217e…
|
lmata
|
528 |
} |
|
24a217e…
|
lmata
|
529 |
defer resp.Body.Close() |
|
24a217e…
|
lmata
|
530 |
|
|
24a217e…
|
lmata
|
531 |
var createdPayload struct { |
|
24a217e…
|
lmata
|
532 |
Credentials struct { |
|
24a217e…
|
lmata
|
533 |
Passphrase string `json:"passphrase"` |
|
24a217e…
|
lmata
|
534 |
} `json:"credentials"` |
|
24a217e…
|
lmata
|
535 |
} |
|
24a217e…
|
lmata
|
536 |
if resp.StatusCode == http.StatusCreated { |
|
24a217e…
|
lmata
|
537 |
if err := json.NewDecoder(resp.Body).Decode(&createdPayload); err != nil { |
|
24a217e…
|
lmata
|
538 |
return false, "", err |
|
24a217e…
|
lmata
|
539 |
} |
|
24a217e…
|
lmata
|
540 |
if createdPayload.Credentials.Passphrase == "" { |
|
24a217e…
|
lmata
|
541 |
return false, "", fmt.Errorf("sessionrelay: register %s: empty passphrase", c.nick) |
|
24a217e…
|
lmata
|
542 |
} |
|
24a217e…
|
lmata
|
543 |
return true, createdPayload.Credentials.Passphrase, nil |
|
24a217e…
|
lmata
|
544 |
} |
|
24a217e…
|
lmata
|
545 |
if resp.StatusCode != http.StatusConflict { |
|
24a217e…
|
lmata
|
546 |
return false, "", fmt.Errorf("sessionrelay: register %s: %s", c.nick, resp.Status) |
|
24a217e…
|
lmata
|
547 |
} |
|
24a217e…
|
lmata
|
548 |
|
|
24a217e…
|
lmata
|
549 |
rotateReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.apiURL+"/v1/agents/"+c.nick+"/rotate", nil) |
|
24a217e…
|
lmata
|
550 |
if err != nil { |
|
24a217e…
|
lmata
|
551 |
return false, "", err |
|
24a217e…
|
lmata
|
552 |
} |
|
24a217e…
|
lmata
|
553 |
rotateReq.Header.Set("Authorization", "Bearer "+c.token) |
|
24a217e…
|
lmata
|
554 |
rotateResp, err := c.http.Do(rotateReq) |
|
24a217e…
|
lmata
|
555 |
if err != nil { |
|
24a217e…
|
lmata
|
556 |
return false, "", err |
|
24a217e…
|
lmata
|
557 |
} |
|
24a217e…
|
lmata
|
558 |
defer rotateResp.Body.Close() |
|
24a217e…
|
lmata
|
559 |
if rotateResp.StatusCode != http.StatusOK { |
|
24a217e…
|
lmata
|
560 |
return false, "", fmt.Errorf("sessionrelay: rotate %s: %s", c.nick, rotateResp.Status) |
|
24a217e…
|
lmata
|
561 |
} |
|
24a217e…
|
lmata
|
562 |
|
|
24a217e…
|
lmata
|
563 |
var rotated struct { |
|
24a217e…
|
lmata
|
564 |
Passphrase string `json:"passphrase"` |
|
24a217e…
|
lmata
|
565 |
} |
|
24a217e…
|
lmata
|
566 |
if err := json.NewDecoder(rotateResp.Body).Decode(&rotated); err != nil { |
|
24a217e…
|
lmata
|
567 |
return false, "", err |
|
24a217e…
|
lmata
|
568 |
} |
|
24a217e…
|
lmata
|
569 |
if rotated.Passphrase == "" { |
|
24a217e…
|
lmata
|
570 |
return false, "", fmt.Errorf("sessionrelay: rotate %s: empty passphrase", c.nick) |
|
24a217e…
|
lmata
|
571 |
} |
|
24a217e…
|
lmata
|
572 |
return false, rotated.Passphrase, nil |
|
24a217e…
|
lmata
|
573 |
} |
|
24a217e…
|
lmata
|
574 |
|
|
24a217e…
|
lmata
|
575 |
func (c *ircConnector) cleanupRegistration(ctx context.Context) error { |
|
24a217e…
|
lmata
|
576 |
if !c.deleteOnClose || !c.registeredByRelay || c.apiURL == "" || c.token == "" { |
|
24a217e…
|
lmata
|
577 |
return nil |
|
24a217e…
|
lmata
|
578 |
} |
|
24a217e…
|
lmata
|
579 |
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, c.apiURL+"/v1/agents/"+c.nick, nil) |
|
24a217e…
|
lmata
|
580 |
if err != nil { |
|
24a217e…
|
lmata
|
581 |
return err |
|
24a217e…
|
lmata
|
582 |
} |
|
24a217e…
|
lmata
|
583 |
req.Header.Set("Authorization", "Bearer "+c.token) |
|
24a217e…
|
lmata
|
584 |
resp, err := c.http.Do(req) |
|
24a217e…
|
lmata
|
585 |
if err != nil { |
|
24a217e…
|
lmata
|
586 |
return err |
|
24a217e…
|
lmata
|
587 |
} |
|
24a217e…
|
lmata
|
588 |
defer resp.Body.Close() |
|
24a217e…
|
lmata
|
589 |
if resp.StatusCode/100 != 2 && resp.StatusCode != http.StatusNotFound { |
|
24a217e…
|
lmata
|
590 |
return fmt.Errorf("sessionrelay: delete %s: %s", c.nick, resp.Status) |
|
24a217e…
|
lmata
|
591 |
} |
|
24a217e…
|
lmata
|
592 |
c.registeredByRelay = false |
|
24a217e…
|
lmata
|
593 |
return nil |
|
1d3caa2…
|
lmata
|
594 |
} |
|
1d3caa2…
|
lmata
|
595 |
|
|
1d3caa2…
|
lmata
|
596 |
func (c *ircConnector) hasChannel(channel string) bool { |
|
1d3caa2…
|
lmata
|
597 |
c.mu.RLock() |
|
1d3caa2…
|
lmata
|
598 |
defer c.mu.RUnlock() |
|
1d3caa2…
|
lmata
|
599 |
return slices.Contains(c.channels, channel) |
|
24a217e…
|
lmata
|
600 |
} |
|
24a217e…
|
lmata
|
601 |
|
|
24a217e…
|
lmata
|
602 |
func splitHostPort(addr string) (string, int, error) { |
|
24a217e…
|
lmata
|
603 |
host, portStr, err := net.SplitHostPort(addr) |
|
24a217e…
|
lmata
|
604 |
if err != nil { |
|
24a217e…
|
lmata
|
605 |
return "", 0, fmt.Errorf("sessionrelay: invalid irc address %q: %w", addr, err) |
|
24a217e…
|
lmata
|
606 |
} |
|
24a217e…
|
lmata
|
607 |
port, err := strconv.Atoi(portStr) |
|
24a217e…
|
lmata
|
608 |
if err != nil { |
|
24a217e…
|
lmata
|
609 |
return "", 0, fmt.Errorf("sessionrelay: invalid irc port in %q: %w", addr, err) |
|
24a217e…
|
lmata
|
610 |
} |
|
24a217e…
|
lmata
|
611 |
return host, port, nil |
|
24a217e…
|
lmata
|
612 |
} |