ScuttleBot

scuttlebot / pkg / sessionrelay / http.go
Blame History Raw 293 lines
1
package sessionrelay
2
3
import (
4
"bytes"
5
"context"
6
"encoding/json"
7
"errors"
8
"fmt"
9
"net/http"
10
"slices"
11
"sort"
12
"sync"
13
"time"
14
)
15
16
type httpConnector struct {
17
http *http.Client
18
baseURL string
19
token string
20
primary string
21
nick string
22
23
agentType string
24
deleteOnClose bool
25
registeredByConnector bool
26
27
mu sync.RWMutex
28
channels []string
29
}
30
31
type httpMessage struct {
32
At string `json:"at"`
33
Nick string `json:"nick"`
34
Text string `json:"text"`
35
}
36
37
func newHTTPConnector(cfg Config) Connector {
38
return &httpConnector{
39
http: cfg.HTTPClient,
40
baseURL: stringsTrimRightSlash(cfg.URL),
41
token: cfg.Token,
42
primary: normalizeChannel(cfg.Channel),
43
nick: cfg.Nick,
44
agentType: cfg.IRC.AgentType,
45
deleteOnClose: cfg.IRC.DeleteOnClose,
46
channels: append([]string(nil), cfg.Channels...),
47
}
48
}
49
50
func (c *httpConnector) Connect(ctx context.Context) error {
51
if c.baseURL == "" {
52
return fmt.Errorf("sessionrelay: http transport requires url")
53
}
54
if c.token == "" {
55
return fmt.Errorf("sessionrelay: http transport requires token")
56
}
57
if c.nick != "" {
58
if err := c.registerAgent(ctx); err != nil {
59
return err
60
}
61
}
62
return nil
63
}
64
65
func (c *httpConnector) registerAgent(ctx context.Context) error {
66
body, _ := json.Marshal(map[string]any{
67
"nick": c.nick,
68
"type": c.agentType,
69
"channels": c.Channels(),
70
})
71
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/v1/agents/register", bytes.NewReader(body))
72
if err != nil {
73
return err
74
}
75
c.authorize(req)
76
req.Header.Set("Content-Type", "application/json")
77
78
resp, err := c.http.Do(req)
79
if err != nil {
80
return err
81
}
82
defer resp.Body.Close()
83
84
switch resp.StatusCode {
85
case http.StatusCreated:
86
c.registeredByConnector = true
87
case http.StatusConflict:
88
// agent already exists; registration is best-effort, not an error
89
default:
90
return fmt.Errorf("sessionrelay: register %s: %s", c.nick, resp.Status)
91
}
92
return nil
93
}
94
95
func (c *httpConnector) Post(ctx context.Context, text string) error {
96
return c.PostWithMeta(ctx, text, nil)
97
}
98
99
func (c *httpConnector) PostTo(ctx context.Context, channel, text string) error {
100
return c.PostToWithMeta(ctx, channel, text, nil)
101
}
102
103
func (c *httpConnector) PostWithMeta(ctx context.Context, text string, meta json.RawMessage) error {
104
for _, channel := range c.Channels() {
105
if err := c.PostToWithMeta(ctx, channel, text, meta); err != nil {
106
return err
107
}
108
}
109
return nil
110
}
111
112
func (c *httpConnector) PostToWithMeta(ctx context.Context, channel, text string, meta json.RawMessage) error {
113
channel = channelSlug(channel)
114
if channel == "" {
115
return fmt.Errorf("sessionrelay: post channel is required")
116
}
117
body := map[string]any{
118
"nick": c.nick,
119
"text": text,
120
}
121
if len(meta) > 0 {
122
body["meta"] = json.RawMessage(meta)
123
}
124
return c.postJSON(ctx, "/v1/channels/"+channel+"/messages", body)
125
}
126
127
func (c *httpConnector) MessagesSince(ctx context.Context, since time.Time) ([]Message, error) {
128
out := make([]Message, 0, 32)
129
for _, channel := range c.Channels() {
130
url := c.baseURL + "/v1/channels/" + channelSlug(channel) + "/messages"
131
if !since.IsZero() {
132
url += "?since=" + since.UTC().Format(time.RFC3339Nano)
133
}
134
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
135
if err != nil {
136
return nil, err
137
}
138
c.authorize(req)
139
140
resp, err := c.http.Do(req)
141
if err != nil {
142
return nil, err
143
}
144
if resp.StatusCode/100 != 2 {
145
resp.Body.Close()
146
return nil, fmt.Errorf("sessionrelay: http messages: %s", resp.Status)
147
}
148
149
var payload struct {
150
Messages []httpMessage `json:"messages"`
151
}
152
err = json.NewDecoder(resp.Body).Decode(&payload)
153
resp.Body.Close()
154
if err != nil {
155
return nil, err
156
}
157
158
for _, msg := range payload.Messages {
159
at, err := time.Parse(time.RFC3339Nano, msg.At)
160
if err != nil {
161
continue
162
}
163
if !since.IsZero() && !at.After(since) {
164
continue // server-side filter is best-effort; guard here too
165
}
166
out = append(out, Message{At: at, Channel: channel, Nick: msg.Nick, Text: msg.Text})
167
}
168
}
169
sort.Slice(out, func(i, j int) bool { return out[i].At.Before(out[j].At) })
170
return out, nil
171
}
172
173
func (c *httpConnector) Touch(ctx context.Context) error {
174
for _, channel := range c.Channels() {
175
err := c.postJSON(ctx, "/v1/channels/"+channelSlug(channel)+"/presence", map[string]string{"nick": c.nick})
176
if err == nil {
177
continue
178
}
179
var statusErr *statusError
180
if errors.As(err, &statusErr) && (statusErr.StatusCode == http.StatusNotFound || statusErr.StatusCode == http.StatusMethodNotAllowed) {
181
continue
182
}
183
return err
184
}
185
return nil
186
}
187
188
func (c *httpConnector) JoinChannel(_ context.Context, channel string) error {
189
channel = normalizeChannel(channel)
190
if channel == "" {
191
return fmt.Errorf("sessionrelay: join channel is required")
192
}
193
c.mu.Lock()
194
defer c.mu.Unlock()
195
if slices.Contains(c.channels, channel) {
196
return nil
197
}
198
c.channels = append(c.channels, channel)
199
return nil
200
}
201
202
func (c *httpConnector) PartChannel(_ context.Context, channel string) error {
203
channel = normalizeChannel(channel)
204
if channel == "" {
205
return fmt.Errorf("sessionrelay: part channel is required")
206
}
207
if channel == c.primary {
208
return fmt.Errorf("sessionrelay: cannot part control channel %s", channel)
209
}
210
c.mu.Lock()
211
defer c.mu.Unlock()
212
filtered := c.channels[:0]
213
for _, existing := range c.channels {
214
if existing == channel {
215
continue
216
}
217
filtered = append(filtered, existing)
218
}
219
c.channels = filtered
220
return nil
221
}
222
223
func (c *httpConnector) Channels() []string {
224
c.mu.RLock()
225
defer c.mu.RUnlock()
226
return append([]string(nil), c.channels...)
227
}
228
229
func (c *httpConnector) ControlChannel() string {
230
return c.primary
231
}
232
233
func (c *httpConnector) Close(ctx context.Context) error {
234
if !c.deleteOnClose || !c.registeredByConnector || c.baseURL == "" || c.token == "" {
235
return nil
236
}
237
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, c.baseURL+"/v1/agents/"+c.nick, nil)
238
if err != nil {
239
return err
240
}
241
c.authorize(req)
242
resp, err := c.http.Do(req)
243
if err != nil {
244
return err
245
}
246
defer resp.Body.Close()
247
if resp.StatusCode/100 != 2 && resp.StatusCode != http.StatusNotFound {
248
return fmt.Errorf("sessionrelay: delete %s: %s", c.nick, resp.Status)
249
}
250
return nil
251
}
252
253
func (c *httpConnector) postJSON(ctx context.Context, path string, body any) error {
254
data, _ := json.Marshal(body)
255
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+path, bytes.NewReader(data))
256
if err != nil {
257
return err
258
}
259
c.authorize(req)
260
req.Header.Set("Content-Type", "application/json")
261
262
resp, err := c.http.Do(req)
263
if err != nil {
264
return err
265
}
266
defer resp.Body.Close()
267
if resp.StatusCode/100 != 2 {
268
return &statusError{Op: path, StatusCode: resp.StatusCode, Status: resp.Status}
269
}
270
return nil
271
}
272
273
func (c *httpConnector) authorize(req *http.Request) {
274
req.Header.Set("Authorization", "Bearer "+c.token)
275
}
276
277
type statusError struct {
278
Op string
279
StatusCode int
280
Status string
281
}
282
283
func (e *statusError) Error() string {
284
return fmt.Sprintf("sessionrelay: %s: %s", e.Op, e.Status)
285
}
286
287
func stringsTrimRightSlash(value string) string {
288
for len(value) > 0 && value[len(value)-1] == '/' {
289
value = value[:len(value)-1]
290
}
291
return value
292
}
293

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button