ScuttleBot

scuttlebot / pkg / sessionrelay / http.go
Source Blame History 292 lines
24a217e… lmata 1 package sessionrelay
24a217e… lmata 2
24a217e… lmata 3 import (
24a217e… lmata 4 "bytes"
24a217e… lmata 5 "context"
24a217e… lmata 6 "encoding/json"
24a217e… lmata 7 "errors"
24a217e… lmata 8 "fmt"
24a217e… lmata 9 "net/http"
1d3caa2… lmata 10 "slices"
1d3caa2… lmata 11 "sort"
1d3caa2… lmata 12 "sync"
24a217e… lmata 13 "time"
24a217e… lmata 14 )
24a217e… lmata 15
24a217e… lmata 16 type httpConnector struct {
24a217e… lmata 17 http *http.Client
24a217e… lmata 18 baseURL string
24a217e… lmata 19 token string
1d3caa2… lmata 20 primary string
24a217e… lmata 21 nick string
1d3caa2… lmata 22
763c873… lmata 23 agentType string
763c873… lmata 24 deleteOnClose bool
763c873… lmata 25 registeredByConnector bool
763c873… lmata 26
1d3caa2… lmata 27 mu sync.RWMutex
1d3caa2… lmata 28 channels []string
24a217e… lmata 29 }
24a217e… lmata 30
24a217e… lmata 31 type httpMessage struct {
24a217e… lmata 32 At string `json:"at"`
24a217e… lmata 33 Nick string `json:"nick"`
24a217e… lmata 34 Text string `json:"text"`
24a217e… lmata 35 }
24a217e… lmata 36
24a217e… lmata 37 func newHTTPConnector(cfg Config) Connector {
24a217e… lmata 38 return &httpConnector{
763c873… lmata 39 http: cfg.HTTPClient,
763c873… lmata 40 baseURL: stringsTrimRightSlash(cfg.URL),
763c873… lmata 41 token: cfg.Token,
763c873… lmata 42 primary: normalizeChannel(cfg.Channel),
763c873… lmata 43 nick: cfg.Nick,
763c873… lmata 44 agentType: cfg.IRC.AgentType,
763c873… lmata 45 deleteOnClose: cfg.IRC.DeleteOnClose,
763c873… lmata 46 channels: append([]string(nil), cfg.Channels...),
24a217e… lmata 47 }
24a217e… lmata 48 }
24a217e… lmata 49
763c873… lmata 50 func (c *httpConnector) Connect(ctx context.Context) error {
24a217e… lmata 51 if c.baseURL == "" {
24a217e… lmata 52 return fmt.Errorf("sessionrelay: http transport requires url")
24a217e… lmata 53 }
24a217e… lmata 54 if c.token == "" {
24a217e… lmata 55 return fmt.Errorf("sessionrelay: http transport requires token")
24a217e… lmata 56 }
763c873… lmata 57 if c.nick != "" {
763c873… lmata 58 if err := c.registerAgent(ctx); err != nil {
763c873… lmata 59 return err
763c873… lmata 60 }
763c873… lmata 61 }
763c873… lmata 62 return nil
763c873… lmata 63 }
763c873… lmata 64
763c873… lmata 65 func (c *httpConnector) registerAgent(ctx context.Context) error {
763c873… lmata 66 body, _ := json.Marshal(map[string]any{
763c873… lmata 67 "nick": c.nick,
763c873… lmata 68 "type": c.agentType,
763c873… lmata 69 "channels": c.Channels(),
763c873… lmata 70 })
763c873… lmata 71 req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/v1/agents/register", bytes.NewReader(body))
763c873… lmata 72 if err != nil {
763c873… lmata 73 return err
763c873… lmata 74 }
763c873… lmata 75 c.authorize(req)
763c873… lmata 76 req.Header.Set("Content-Type", "application/json")
763c873… lmata 77
763c873… lmata 78 resp, err := c.http.Do(req)
763c873… lmata 79 if err != nil {
763c873… lmata 80 return err
763c873… lmata 81 }
763c873… lmata 82 defer resp.Body.Close()
763c873… lmata 83
763c873… lmata 84 switch resp.StatusCode {
763c873… lmata 85 case http.StatusCreated:
763c873… lmata 86 c.registeredByConnector = true
763c873… lmata 87 case http.StatusConflict:
763c873… lmata 88 // agent already exists; registration is best-effort, not an error
763c873… lmata 89 default:
763c873… lmata 90 return fmt.Errorf("sessionrelay: register %s: %s", c.nick, resp.Status)
763c873… lmata 91 }
24a217e… lmata 92 return nil
24a217e… lmata 93 }
24a217e… lmata 94
24a217e… lmata 95 func (c *httpConnector) Post(ctx context.Context, text string) error {
f3c383e… noreply 96 return c.PostWithMeta(ctx, text, nil)
f3c383e… noreply 97 }
f3c383e… noreply 98
f3c383e… noreply 99 func (c *httpConnector) PostTo(ctx context.Context, channel, text string) error {
f3c383e… noreply 100 return c.PostToWithMeta(ctx, channel, text, nil)
f3c383e… noreply 101 }
f3c383e… noreply 102
f3c383e… noreply 103 func (c *httpConnector) PostWithMeta(ctx context.Context, text string, meta json.RawMessage) error {
1d3caa2… lmata 104 for _, channel := range c.Channels() {
f3c383e… noreply 105 if err := c.PostToWithMeta(ctx, channel, text, meta); err != nil {
1d3caa2… lmata 106 return err
1d3caa2… lmata 107 }
1d3caa2… lmata 108 }
1d3caa2… lmata 109 return nil
1d3caa2… lmata 110 }
1d3caa2… lmata 111
f3c383e… noreply 112 func (c *httpConnector) PostToWithMeta(ctx context.Context, channel, text string, meta json.RawMessage) error {
1d3caa2… lmata 113 channel = channelSlug(channel)
1d3caa2… lmata 114 if channel == "" {
1d3caa2… lmata 115 return fmt.Errorf("sessionrelay: post channel is required")
1d3caa2… lmata 116 }
f3c383e… noreply 117 body := map[string]any{
24a217e… lmata 118 "nick": c.nick,
24a217e… lmata 119 "text": text,
f3c383e… noreply 120 }
f3c383e… noreply 121 if len(meta) > 0 {
f3c383e… noreply 122 body["meta"] = json.RawMessage(meta)
f3c383e… noreply 123 }
f3c383e… noreply 124 return c.postJSON(ctx, "/v1/channels/"+channel+"/messages", body)
24a217e… lmata 125 }
24a217e… lmata 126
24a217e… lmata 127 func (c *httpConnector) MessagesSince(ctx context.Context, since time.Time) ([]Message, error) {
1d3caa2… lmata 128 out := make([]Message, 0, 32)
1d3caa2… lmata 129 for _, channel := range c.Channels() {
b71f8ab… lmata 130 url := c.baseURL + "/v1/channels/" + channelSlug(channel) + "/messages"
b71f8ab… lmata 131 if !since.IsZero() {
b71f8ab… lmata 132 url += "?since=" + since.UTC().Format(time.RFC3339Nano)
b71f8ab… lmata 133 }
b71f8ab… lmata 134 req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
1d3caa2… lmata 135 if err != nil {
1d3caa2… lmata 136 return nil, err
1d3caa2… lmata 137 }
1d3caa2… lmata 138 c.authorize(req)
1d3caa2… lmata 139
1d3caa2… lmata 140 resp, err := c.http.Do(req)
1d3caa2… lmata 141 if err != nil {
1d3caa2… lmata 142 return nil, err
1d3caa2… lmata 143 }
1d3caa2… lmata 144 if resp.StatusCode/100 != 2 {
1d3caa2… lmata 145 resp.Body.Close()
1d3caa2… lmata 146 return nil, fmt.Errorf("sessionrelay: http messages: %s", resp.Status)
1d3caa2… lmata 147 }
1d3caa2… lmata 148
1d3caa2… lmata 149 var payload struct {
1d3caa2… lmata 150 Messages []httpMessage `json:"messages"`
1d3caa2… lmata 151 }
1d3caa2… lmata 152 err = json.NewDecoder(resp.Body).Decode(&payload)
1d3caa2… lmata 153 resp.Body.Close()
1d3caa2… lmata 154 if err != nil {
1d3caa2… lmata 155 return nil, err
1d3caa2… lmata 156 }
1d3caa2… lmata 157
1d3caa2… lmata 158 for _, msg := range payload.Messages {
1d3caa2… lmata 159 at, err := time.Parse(time.RFC3339Nano, msg.At)
1d3caa2… lmata 160 if err != nil {
1d3caa2… lmata 161 continue
53c99b5… lmata 162 }
53c99b5… lmata 163 if !since.IsZero() && !at.After(since) {
53c99b5… lmata 164 continue // server-side filter is best-effort; guard here too
1d3caa2… lmata 165 }
1d3caa2… lmata 166 out = append(out, Message{At: at, Channel: channel, Nick: msg.Nick, Text: msg.Text})
1d3caa2… lmata 167 }
1d3caa2… lmata 168 }
1d3caa2… lmata 169 sort.Slice(out, func(i, j int) bool { return out[i].At.Before(out[j].At) })
24a217e… lmata 170 return out, nil
24a217e… lmata 171 }
24a217e… lmata 172
24a217e… lmata 173 func (c *httpConnector) Touch(ctx context.Context) error {
1d3caa2… lmata 174 for _, channel := range c.Channels() {
1d3caa2… lmata 175 err := c.postJSON(ctx, "/v1/channels/"+channelSlug(channel)+"/presence", map[string]string{"nick": c.nick})
1d3caa2… lmata 176 if err == nil {
1d3caa2… lmata 177 continue
1d3caa2… lmata 178 }
1d3caa2… lmata 179 var statusErr *statusError
1d3caa2… lmata 180 if errors.As(err, &statusErr) && (statusErr.StatusCode == http.StatusNotFound || statusErr.StatusCode == http.StatusMethodNotAllowed) {
1d3caa2… lmata 181 continue
1d3caa2… lmata 182 }
1d3caa2… lmata 183 return err
1d3caa2… lmata 184 }
1d3caa2… lmata 185 return nil
1d3caa2… lmata 186 }
1d3caa2… lmata 187
1d3caa2… lmata 188 func (c *httpConnector) JoinChannel(_ context.Context, channel string) error {
1d3caa2… lmata 189 channel = normalizeChannel(channel)
1d3caa2… lmata 190 if channel == "" {
1d3caa2… lmata 191 return fmt.Errorf("sessionrelay: join channel is required")
1d3caa2… lmata 192 }
1d3caa2… lmata 193 c.mu.Lock()
1d3caa2… lmata 194 defer c.mu.Unlock()
1d3caa2… lmata 195 if slices.Contains(c.channels, channel) {
24a217e… lmata 196 return nil
24a217e… lmata 197 }
1d3caa2… lmata 198 c.channels = append(c.channels, channel)
1d3caa2… lmata 199 return nil
1d3caa2… lmata 200 }
1d3caa2… lmata 201
1d3caa2… lmata 202 func (c *httpConnector) PartChannel(_ context.Context, channel string) error {
1d3caa2… lmata 203 channel = normalizeChannel(channel)
1d3caa2… lmata 204 if channel == "" {
1d3caa2… lmata 205 return fmt.Errorf("sessionrelay: part channel is required")
1d3caa2… lmata 206 }
1d3caa2… lmata 207 if channel == c.primary {
1d3caa2… lmata 208 return fmt.Errorf("sessionrelay: cannot part control channel %s", channel)
1d3caa2… lmata 209 }
1d3caa2… lmata 210 c.mu.Lock()
1d3caa2… lmata 211 defer c.mu.Unlock()
1d3caa2… lmata 212 filtered := c.channels[:0]
1d3caa2… lmata 213 for _, existing := range c.channels {
1d3caa2… lmata 214 if existing == channel {
1d3caa2… lmata 215 continue
1d3caa2… lmata 216 }
1d3caa2… lmata 217 filtered = append(filtered, existing)
1d3caa2… lmata 218 }
1d3caa2… lmata 219 c.channels = filtered
1d3caa2… lmata 220 return nil
1d3caa2… lmata 221 }
1d3caa2… lmata 222
1d3caa2… lmata 223 func (c *httpConnector) Channels() []string {
1d3caa2… lmata 224 c.mu.RLock()
1d3caa2… lmata 225 defer c.mu.RUnlock()
1d3caa2… lmata 226 return append([]string(nil), c.channels...)
1d3caa2… lmata 227 }
1d3caa2… lmata 228
1d3caa2… lmata 229 func (c *httpConnector) ControlChannel() string {
1d3caa2… lmata 230 return c.primary
1d3caa2… lmata 231 }
1d3caa2… lmata 232
763c873… lmata 233 func (c *httpConnector) Close(ctx context.Context) error {
763c873… lmata 234 if !c.deleteOnClose || !c.registeredByConnector || c.baseURL == "" || c.token == "" {
763c873… lmata 235 return nil
763c873… lmata 236 }
763c873… lmata 237 req, err := http.NewRequestWithContext(ctx, http.MethodDelete, c.baseURL+"/v1/agents/"+c.nick, nil)
763c873… lmata 238 if err != nil {
763c873… lmata 239 return err
763c873… lmata 240 }
763c873… lmata 241 c.authorize(req)
763c873… lmata 242 resp, err := c.http.Do(req)
763c873… lmata 243 if err != nil {
763c873… lmata 244 return err
763c873… lmata 245 }
763c873… lmata 246 defer resp.Body.Close()
763c873… lmata 247 if resp.StatusCode/100 != 2 && resp.StatusCode != http.StatusNotFound {
763c873… lmata 248 return fmt.Errorf("sessionrelay: delete %s: %s", c.nick, resp.Status)
763c873… lmata 249 }
24a217e… lmata 250 return nil
24a217e… lmata 251 }
24a217e… lmata 252
24a217e… lmata 253 func (c *httpConnector) postJSON(ctx context.Context, path string, body any) error {
24a217e… lmata 254 data, _ := json.Marshal(body)
24a217e… lmata 255 req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+path, bytes.NewReader(data))
24a217e… lmata 256 if err != nil {
24a217e… lmata 257 return err
24a217e… lmata 258 }
24a217e… lmata 259 c.authorize(req)
24a217e… lmata 260 req.Header.Set("Content-Type", "application/json")
24a217e… lmata 261
24a217e… lmata 262 resp, err := c.http.Do(req)
24a217e… lmata 263 if err != nil {
24a217e… lmata 264 return err
24a217e… lmata 265 }
24a217e… lmata 266 defer resp.Body.Close()
24a217e… lmata 267 if resp.StatusCode/100 != 2 {
24a217e… lmata 268 return &statusError{Op: path, StatusCode: resp.StatusCode, Status: resp.Status}
24a217e… lmata 269 }
24a217e… lmata 270 return nil
24a217e… lmata 271 }
24a217e… lmata 272
24a217e… lmata 273 func (c *httpConnector) authorize(req *http.Request) {
24a217e… lmata 274 req.Header.Set("Authorization", "Bearer "+c.token)
24a217e… lmata 275 }
24a217e… lmata 276
24a217e… lmata 277 type statusError struct {
24a217e… lmata 278 Op string
24a217e… lmata 279 StatusCode int
24a217e… lmata 280 Status string
24a217e… lmata 281 }
24a217e… lmata 282
24a217e… lmata 283 func (e *statusError) Error() string {
24a217e… lmata 284 return fmt.Sprintf("sessionrelay: %s: %s", e.Op, e.Status)
24a217e… lmata 285 }
24a217e… lmata 286
24a217e… lmata 287 func stringsTrimRightSlash(value string) string {
24a217e… lmata 288 for len(value) > 0 && value[len(value)-1] == '/' {
24a217e… lmata 289 value = value[:len(value)-1]
24a217e… lmata 290 }
24a217e… lmata 291 return value
24a217e… lmata 292 }

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button