|
24a217e…
|
lmata
|
1 |
package sessionrelay |
|
24a217e…
|
lmata
|
2 |
|
|
24a217e…
|
lmata
|
3 |
import ( |
|
24a217e…
|
lmata
|
4 |
"bytes" |
|
24a217e…
|
lmata
|
5 |
"context" |
|
24a217e…
|
lmata
|
6 |
"encoding/json" |
|
24a217e…
|
lmata
|
7 |
"errors" |
|
24a217e…
|
lmata
|
8 |
"fmt" |
|
24a217e…
|
lmata
|
9 |
"net/http" |
|
1d3caa2…
|
lmata
|
10 |
"slices" |
|
1d3caa2…
|
lmata
|
11 |
"sort" |
|
1d3caa2…
|
lmata
|
12 |
"sync" |
|
24a217e…
|
lmata
|
13 |
"time" |
|
24a217e…
|
lmata
|
14 |
) |
|
24a217e…
|
lmata
|
15 |
|
|
24a217e…
|
lmata
|
16 |
type httpConnector struct { |
|
24a217e…
|
lmata
|
17 |
http *http.Client |
|
24a217e…
|
lmata
|
18 |
baseURL string |
|
24a217e…
|
lmata
|
19 |
token string |
|
1d3caa2…
|
lmata
|
20 |
primary string |
|
24a217e…
|
lmata
|
21 |
nick string |
|
1d3caa2…
|
lmata
|
22 |
|
|
763c873…
|
lmata
|
23 |
agentType string |
|
763c873…
|
lmata
|
24 |
deleteOnClose bool |
|
763c873…
|
lmata
|
25 |
registeredByConnector bool |
|
763c873…
|
lmata
|
26 |
|
|
1d3caa2…
|
lmata
|
27 |
mu sync.RWMutex |
|
1d3caa2…
|
lmata
|
28 |
channels []string |
|
24a217e…
|
lmata
|
29 |
} |
|
24a217e…
|
lmata
|
30 |
|
|
24a217e…
|
lmata
|
31 |
type httpMessage struct { |
|
24a217e…
|
lmata
|
32 |
At string `json:"at"` |
|
24a217e…
|
lmata
|
33 |
Nick string `json:"nick"` |
|
24a217e…
|
lmata
|
34 |
Text string `json:"text"` |
|
24a217e…
|
lmata
|
35 |
} |
|
24a217e…
|
lmata
|
36 |
|
|
24a217e…
|
lmata
|
37 |
func newHTTPConnector(cfg Config) Connector { |
|
24a217e…
|
lmata
|
38 |
return &httpConnector{ |
|
763c873…
|
lmata
|
39 |
http: cfg.HTTPClient, |
|
763c873…
|
lmata
|
40 |
baseURL: stringsTrimRightSlash(cfg.URL), |
|
763c873…
|
lmata
|
41 |
token: cfg.Token, |
|
763c873…
|
lmata
|
42 |
primary: normalizeChannel(cfg.Channel), |
|
763c873…
|
lmata
|
43 |
nick: cfg.Nick, |
|
763c873…
|
lmata
|
44 |
agentType: cfg.IRC.AgentType, |
|
763c873…
|
lmata
|
45 |
deleteOnClose: cfg.IRC.DeleteOnClose, |
|
763c873…
|
lmata
|
46 |
channels: append([]string(nil), cfg.Channels...), |
|
24a217e…
|
lmata
|
47 |
} |
|
24a217e…
|
lmata
|
48 |
} |
|
24a217e…
|
lmata
|
49 |
|
|
763c873…
|
lmata
|
50 |
func (c *httpConnector) Connect(ctx context.Context) error { |
|
24a217e…
|
lmata
|
51 |
if c.baseURL == "" { |
|
24a217e…
|
lmata
|
52 |
return fmt.Errorf("sessionrelay: http transport requires url") |
|
24a217e…
|
lmata
|
53 |
} |
|
24a217e…
|
lmata
|
54 |
if c.token == "" { |
|
24a217e…
|
lmata
|
55 |
return fmt.Errorf("sessionrelay: http transport requires token") |
|
24a217e…
|
lmata
|
56 |
} |
|
763c873…
|
lmata
|
57 |
if c.nick != "" { |
|
763c873…
|
lmata
|
58 |
if err := c.registerAgent(ctx); err != nil { |
|
763c873…
|
lmata
|
59 |
return err |
|
763c873…
|
lmata
|
60 |
} |
|
763c873…
|
lmata
|
61 |
} |
|
763c873…
|
lmata
|
62 |
return nil |
|
763c873…
|
lmata
|
63 |
} |
|
763c873…
|
lmata
|
64 |
|
|
763c873…
|
lmata
|
65 |
func (c *httpConnector) registerAgent(ctx context.Context) error { |
|
763c873…
|
lmata
|
66 |
body, _ := json.Marshal(map[string]any{ |
|
763c873…
|
lmata
|
67 |
"nick": c.nick, |
|
763c873…
|
lmata
|
68 |
"type": c.agentType, |
|
763c873…
|
lmata
|
69 |
"channels": c.Channels(), |
|
763c873…
|
lmata
|
70 |
}) |
|
763c873…
|
lmata
|
71 |
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/v1/agents/register", bytes.NewReader(body)) |
|
763c873…
|
lmata
|
72 |
if err != nil { |
|
763c873…
|
lmata
|
73 |
return err |
|
763c873…
|
lmata
|
74 |
} |
|
763c873…
|
lmata
|
75 |
c.authorize(req) |
|
763c873…
|
lmata
|
76 |
req.Header.Set("Content-Type", "application/json") |
|
763c873…
|
lmata
|
77 |
|
|
763c873…
|
lmata
|
78 |
resp, err := c.http.Do(req) |
|
763c873…
|
lmata
|
79 |
if err != nil { |
|
763c873…
|
lmata
|
80 |
return err |
|
763c873…
|
lmata
|
81 |
} |
|
763c873…
|
lmata
|
82 |
defer resp.Body.Close() |
|
763c873…
|
lmata
|
83 |
|
|
763c873…
|
lmata
|
84 |
switch resp.StatusCode { |
|
763c873…
|
lmata
|
85 |
case http.StatusCreated: |
|
763c873…
|
lmata
|
86 |
c.registeredByConnector = true |
|
763c873…
|
lmata
|
87 |
case http.StatusConflict: |
|
763c873…
|
lmata
|
88 |
// agent already exists; registration is best-effort, not an error |
|
763c873…
|
lmata
|
89 |
default: |
|
763c873…
|
lmata
|
90 |
return fmt.Errorf("sessionrelay: register %s: %s", c.nick, resp.Status) |
|
763c873…
|
lmata
|
91 |
} |
|
24a217e…
|
lmata
|
92 |
return nil |
|
24a217e…
|
lmata
|
93 |
} |
|
24a217e…
|
lmata
|
94 |
|
|
24a217e…
|
lmata
|
95 |
func (c *httpConnector) Post(ctx context.Context, text string) error { |
|
f3c383e…
|
noreply
|
96 |
return c.PostWithMeta(ctx, text, nil) |
|
f3c383e…
|
noreply
|
97 |
} |
|
f3c383e…
|
noreply
|
98 |
|
|
f3c383e…
|
noreply
|
99 |
func (c *httpConnector) PostTo(ctx context.Context, channel, text string) error { |
|
f3c383e…
|
noreply
|
100 |
return c.PostToWithMeta(ctx, channel, text, nil) |
|
f3c383e…
|
noreply
|
101 |
} |
|
f3c383e…
|
noreply
|
102 |
|
|
f3c383e…
|
noreply
|
103 |
func (c *httpConnector) PostWithMeta(ctx context.Context, text string, meta json.RawMessage) error { |
|
1d3caa2…
|
lmata
|
104 |
for _, channel := range c.Channels() { |
|
f3c383e…
|
noreply
|
105 |
if err := c.PostToWithMeta(ctx, channel, text, meta); err != nil { |
|
1d3caa2…
|
lmata
|
106 |
return err |
|
1d3caa2…
|
lmata
|
107 |
} |
|
1d3caa2…
|
lmata
|
108 |
} |
|
1d3caa2…
|
lmata
|
109 |
return nil |
|
1d3caa2…
|
lmata
|
110 |
} |
|
1d3caa2…
|
lmata
|
111 |
|
|
f3c383e…
|
noreply
|
112 |
func (c *httpConnector) PostToWithMeta(ctx context.Context, channel, text string, meta json.RawMessage) error { |
|
1d3caa2…
|
lmata
|
113 |
channel = channelSlug(channel) |
|
1d3caa2…
|
lmata
|
114 |
if channel == "" { |
|
1d3caa2…
|
lmata
|
115 |
return fmt.Errorf("sessionrelay: post channel is required") |
|
1d3caa2…
|
lmata
|
116 |
} |
|
f3c383e…
|
noreply
|
117 |
body := map[string]any{ |
|
24a217e…
|
lmata
|
118 |
"nick": c.nick, |
|
24a217e…
|
lmata
|
119 |
"text": text, |
|
f3c383e…
|
noreply
|
120 |
} |
|
f3c383e…
|
noreply
|
121 |
if len(meta) > 0 { |
|
f3c383e…
|
noreply
|
122 |
body["meta"] = json.RawMessage(meta) |
|
f3c383e…
|
noreply
|
123 |
} |
|
f3c383e…
|
noreply
|
124 |
return c.postJSON(ctx, "/v1/channels/"+channel+"/messages", body) |
|
24a217e…
|
lmata
|
125 |
} |
|
24a217e…
|
lmata
|
126 |
|
|
24a217e…
|
lmata
|
127 |
func (c *httpConnector) MessagesSince(ctx context.Context, since time.Time) ([]Message, error) { |
|
1d3caa2…
|
lmata
|
128 |
out := make([]Message, 0, 32) |
|
1d3caa2…
|
lmata
|
129 |
for _, channel := range c.Channels() { |
|
b71f8ab…
|
lmata
|
130 |
url := c.baseURL + "/v1/channels/" + channelSlug(channel) + "/messages" |
|
b71f8ab…
|
lmata
|
131 |
if !since.IsZero() { |
|
b71f8ab…
|
lmata
|
132 |
url += "?since=" + since.UTC().Format(time.RFC3339Nano) |
|
b71f8ab…
|
lmata
|
133 |
} |
|
b71f8ab…
|
lmata
|
134 |
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) |
|
1d3caa2…
|
lmata
|
135 |
if err != nil { |
|
1d3caa2…
|
lmata
|
136 |
return nil, err |
|
1d3caa2…
|
lmata
|
137 |
} |
|
1d3caa2…
|
lmata
|
138 |
c.authorize(req) |
|
1d3caa2…
|
lmata
|
139 |
|
|
1d3caa2…
|
lmata
|
140 |
resp, err := c.http.Do(req) |
|
1d3caa2…
|
lmata
|
141 |
if err != nil { |
|
1d3caa2…
|
lmata
|
142 |
return nil, err |
|
1d3caa2…
|
lmata
|
143 |
} |
|
1d3caa2…
|
lmata
|
144 |
if resp.StatusCode/100 != 2 { |
|
1d3caa2…
|
lmata
|
145 |
resp.Body.Close() |
|
1d3caa2…
|
lmata
|
146 |
return nil, fmt.Errorf("sessionrelay: http messages: %s", resp.Status) |
|
1d3caa2…
|
lmata
|
147 |
} |
|
1d3caa2…
|
lmata
|
148 |
|
|
1d3caa2…
|
lmata
|
149 |
var payload struct { |
|
1d3caa2…
|
lmata
|
150 |
Messages []httpMessage `json:"messages"` |
|
1d3caa2…
|
lmata
|
151 |
} |
|
1d3caa2…
|
lmata
|
152 |
err = json.NewDecoder(resp.Body).Decode(&payload) |
|
1d3caa2…
|
lmata
|
153 |
resp.Body.Close() |
|
1d3caa2…
|
lmata
|
154 |
if err != nil { |
|
1d3caa2…
|
lmata
|
155 |
return nil, err |
|
1d3caa2…
|
lmata
|
156 |
} |
|
1d3caa2…
|
lmata
|
157 |
|
|
1d3caa2…
|
lmata
|
158 |
for _, msg := range payload.Messages { |
|
1d3caa2…
|
lmata
|
159 |
at, err := time.Parse(time.RFC3339Nano, msg.At) |
|
1d3caa2…
|
lmata
|
160 |
if err != nil { |
|
1d3caa2…
|
lmata
|
161 |
continue |
|
53c99b5…
|
lmata
|
162 |
} |
|
53c99b5…
|
lmata
|
163 |
if !since.IsZero() && !at.After(since) { |
|
53c99b5…
|
lmata
|
164 |
continue // server-side filter is best-effort; guard here too |
|
1d3caa2…
|
lmata
|
165 |
} |
|
1d3caa2…
|
lmata
|
166 |
out = append(out, Message{At: at, Channel: channel, Nick: msg.Nick, Text: msg.Text}) |
|
1d3caa2…
|
lmata
|
167 |
} |
|
1d3caa2…
|
lmata
|
168 |
} |
|
1d3caa2…
|
lmata
|
169 |
sort.Slice(out, func(i, j int) bool { return out[i].At.Before(out[j].At) }) |
|
24a217e…
|
lmata
|
170 |
return out, nil |
|
24a217e…
|
lmata
|
171 |
} |
|
24a217e…
|
lmata
|
172 |
|
|
24a217e…
|
lmata
|
173 |
func (c *httpConnector) Touch(ctx context.Context) error { |
|
1d3caa2…
|
lmata
|
174 |
for _, channel := range c.Channels() { |
|
1d3caa2…
|
lmata
|
175 |
err := c.postJSON(ctx, "/v1/channels/"+channelSlug(channel)+"/presence", map[string]string{"nick": c.nick}) |
|
1d3caa2…
|
lmata
|
176 |
if err == nil { |
|
1d3caa2…
|
lmata
|
177 |
continue |
|
1d3caa2…
|
lmata
|
178 |
} |
|
1d3caa2…
|
lmata
|
179 |
var statusErr *statusError |
|
1d3caa2…
|
lmata
|
180 |
if errors.As(err, &statusErr) && (statusErr.StatusCode == http.StatusNotFound || statusErr.StatusCode == http.StatusMethodNotAllowed) { |
|
1d3caa2…
|
lmata
|
181 |
continue |
|
1d3caa2…
|
lmata
|
182 |
} |
|
1d3caa2…
|
lmata
|
183 |
return err |
|
1d3caa2…
|
lmata
|
184 |
} |
|
1d3caa2…
|
lmata
|
185 |
return nil |
|
1d3caa2…
|
lmata
|
186 |
} |
|
1d3caa2…
|
lmata
|
187 |
|
|
1d3caa2…
|
lmata
|
188 |
func (c *httpConnector) JoinChannel(_ context.Context, channel string) error { |
|
1d3caa2…
|
lmata
|
189 |
channel = normalizeChannel(channel) |
|
1d3caa2…
|
lmata
|
190 |
if channel == "" { |
|
1d3caa2…
|
lmata
|
191 |
return fmt.Errorf("sessionrelay: join channel is required") |
|
1d3caa2…
|
lmata
|
192 |
} |
|
1d3caa2…
|
lmata
|
193 |
c.mu.Lock() |
|
1d3caa2…
|
lmata
|
194 |
defer c.mu.Unlock() |
|
1d3caa2…
|
lmata
|
195 |
if slices.Contains(c.channels, channel) { |
|
24a217e…
|
lmata
|
196 |
return nil |
|
24a217e…
|
lmata
|
197 |
} |
|
1d3caa2…
|
lmata
|
198 |
c.channels = append(c.channels, channel) |
|
1d3caa2…
|
lmata
|
199 |
return nil |
|
1d3caa2…
|
lmata
|
200 |
} |
|
1d3caa2…
|
lmata
|
201 |
|
|
1d3caa2…
|
lmata
|
202 |
func (c *httpConnector) PartChannel(_ context.Context, channel string) error { |
|
1d3caa2…
|
lmata
|
203 |
channel = normalizeChannel(channel) |
|
1d3caa2…
|
lmata
|
204 |
if channel == "" { |
|
1d3caa2…
|
lmata
|
205 |
return fmt.Errorf("sessionrelay: part channel is required") |
|
1d3caa2…
|
lmata
|
206 |
} |
|
1d3caa2…
|
lmata
|
207 |
if channel == c.primary { |
|
1d3caa2…
|
lmata
|
208 |
return fmt.Errorf("sessionrelay: cannot part control channel %s", channel) |
|
1d3caa2…
|
lmata
|
209 |
} |
|
1d3caa2…
|
lmata
|
210 |
c.mu.Lock() |
|
1d3caa2…
|
lmata
|
211 |
defer c.mu.Unlock() |
|
1d3caa2…
|
lmata
|
212 |
filtered := c.channels[:0] |
|
1d3caa2…
|
lmata
|
213 |
for _, existing := range c.channels { |
|
1d3caa2…
|
lmata
|
214 |
if existing == channel { |
|
1d3caa2…
|
lmata
|
215 |
continue |
|
1d3caa2…
|
lmata
|
216 |
} |
|
1d3caa2…
|
lmata
|
217 |
filtered = append(filtered, existing) |
|
1d3caa2…
|
lmata
|
218 |
} |
|
1d3caa2…
|
lmata
|
219 |
c.channels = filtered |
|
1d3caa2…
|
lmata
|
220 |
return nil |
|
1d3caa2…
|
lmata
|
221 |
} |
|
1d3caa2…
|
lmata
|
222 |
|
|
1d3caa2…
|
lmata
|
223 |
func (c *httpConnector) Channels() []string { |
|
1d3caa2…
|
lmata
|
224 |
c.mu.RLock() |
|
1d3caa2…
|
lmata
|
225 |
defer c.mu.RUnlock() |
|
1d3caa2…
|
lmata
|
226 |
return append([]string(nil), c.channels...) |
|
1d3caa2…
|
lmata
|
227 |
} |
|
1d3caa2…
|
lmata
|
228 |
|
|
1d3caa2…
|
lmata
|
229 |
func (c *httpConnector) ControlChannel() string { |
|
1d3caa2…
|
lmata
|
230 |
return c.primary |
|
1d3caa2…
|
lmata
|
231 |
} |
|
1d3caa2…
|
lmata
|
232 |
|
|
763c873…
|
lmata
|
233 |
func (c *httpConnector) Close(ctx context.Context) error { |
|
763c873…
|
lmata
|
234 |
if !c.deleteOnClose || !c.registeredByConnector || c.baseURL == "" || c.token == "" { |
|
763c873…
|
lmata
|
235 |
return nil |
|
763c873…
|
lmata
|
236 |
} |
|
763c873…
|
lmata
|
237 |
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, c.baseURL+"/v1/agents/"+c.nick, nil) |
|
763c873…
|
lmata
|
238 |
if err != nil { |
|
763c873…
|
lmata
|
239 |
return err |
|
763c873…
|
lmata
|
240 |
} |
|
763c873…
|
lmata
|
241 |
c.authorize(req) |
|
763c873…
|
lmata
|
242 |
resp, err := c.http.Do(req) |
|
763c873…
|
lmata
|
243 |
if err != nil { |
|
763c873…
|
lmata
|
244 |
return err |
|
763c873…
|
lmata
|
245 |
} |
|
763c873…
|
lmata
|
246 |
defer resp.Body.Close() |
|
763c873…
|
lmata
|
247 |
if resp.StatusCode/100 != 2 && resp.StatusCode != http.StatusNotFound { |
|
763c873…
|
lmata
|
248 |
return fmt.Errorf("sessionrelay: delete %s: %s", c.nick, resp.Status) |
|
763c873…
|
lmata
|
249 |
} |
|
24a217e…
|
lmata
|
250 |
return nil |
|
24a217e…
|
lmata
|
251 |
} |
|
24a217e…
|
lmata
|
252 |
|
|
24a217e…
|
lmata
|
253 |
func (c *httpConnector) postJSON(ctx context.Context, path string, body any) error { |
|
24a217e…
|
lmata
|
254 |
data, _ := json.Marshal(body) |
|
24a217e…
|
lmata
|
255 |
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+path, bytes.NewReader(data)) |
|
24a217e…
|
lmata
|
256 |
if err != nil { |
|
24a217e…
|
lmata
|
257 |
return err |
|
24a217e…
|
lmata
|
258 |
} |
|
24a217e…
|
lmata
|
259 |
c.authorize(req) |
|
24a217e…
|
lmata
|
260 |
req.Header.Set("Content-Type", "application/json") |
|
24a217e…
|
lmata
|
261 |
|
|
24a217e…
|
lmata
|
262 |
resp, err := c.http.Do(req) |
|
24a217e…
|
lmata
|
263 |
if err != nil { |
|
24a217e…
|
lmata
|
264 |
return err |
|
24a217e…
|
lmata
|
265 |
} |
|
24a217e…
|
lmata
|
266 |
defer resp.Body.Close() |
|
24a217e…
|
lmata
|
267 |
if resp.StatusCode/100 != 2 { |
|
24a217e…
|
lmata
|
268 |
return &statusError{Op: path, StatusCode: resp.StatusCode, Status: resp.Status} |
|
24a217e…
|
lmata
|
269 |
} |
|
24a217e…
|
lmata
|
270 |
return nil |
|
24a217e…
|
lmata
|
271 |
} |
|
24a217e…
|
lmata
|
272 |
|
|
24a217e…
|
lmata
|
273 |
func (c *httpConnector) authorize(req *http.Request) { |
|
24a217e…
|
lmata
|
274 |
req.Header.Set("Authorization", "Bearer "+c.token) |
|
24a217e…
|
lmata
|
275 |
} |
|
24a217e…
|
lmata
|
276 |
|
|
24a217e…
|
lmata
|
277 |
type statusError struct { |
|
24a217e…
|
lmata
|
278 |
Op string |
|
24a217e…
|
lmata
|
279 |
StatusCode int |
|
24a217e…
|
lmata
|
280 |
Status string |
|
24a217e…
|
lmata
|
281 |
} |
|
24a217e…
|
lmata
|
282 |
|
|
24a217e…
|
lmata
|
283 |
func (e *statusError) Error() string { |
|
24a217e…
|
lmata
|
284 |
return fmt.Sprintf("sessionrelay: %s: %s", e.Op, e.Status) |
|
24a217e…
|
lmata
|
285 |
} |
|
24a217e…
|
lmata
|
286 |
|
|
24a217e…
|
lmata
|
287 |
func stringsTrimRightSlash(value string) string { |
|
24a217e…
|
lmata
|
288 |
for len(value) > 0 && value[len(value)-1] == '/' { |
|
24a217e…
|
lmata
|
289 |
value = value[:len(value)-1] |
|
24a217e…
|
lmata
|
290 |
} |
|
24a217e…
|
lmata
|
291 |
return value |
|
24a217e…
|
lmata
|
292 |
} |