ScuttleBot

scuttlebot / pkg / agentrelay / relay.go
Source Blame History 255 lines
5ac549c… lmata 1 // Package agentrelay lets any agent post status to a scuttlebot IRC channel
5ac549c… lmata 2 // and receive human instructions mid-work.
5ac549c… lmata 3 //
5ac549c… lmata 4 // Typical usage:
5ac549c… lmata 5 //
5ac549c… lmata 6 // relay, err := agentrelay.New(agentrelay.Config{
5ac549c… lmata 7 // ServerURL: "http://localhost:8080",
5ac549c… lmata 8 // Token: os.Getenv("SCUTTLEBOT_TOKEN"),
5ac549c… lmata 9 // Nick: "my-agent",
5ac549c… lmata 10 // Channel: "#fleet",
5ac549c… lmata 11 // })
5ac549c… lmata 12 // relay.Post("starting task: rewrite auth module")
5ac549c… lmata 13 //
5ac549c… lmata 14 // // between steps, check for human instructions
5ac549c… lmata 15 // if msg, ok := relay.Poll(); ok {
5ac549c… lmata 16 // // msg.Text is what the human said — incorporate or surface to agent
5ac549c… lmata 17 // }
5ac549c… lmata 18 //
5ac549c… lmata 19 // // or block waiting for approval
5ac549c… lmata 20 // relay.Post("about to drop table users — approve?")
5ac549c… lmata 21 // if err := relay.WaitFor("yes", 2*time.Minute); err != nil {
5ac549c… lmata 22 // // timed out or got "no" — abort
5ac549c… lmata 23 // }
5ac549c… lmata 24 package agentrelay
5ac549c… lmata 25
5ac549c… lmata 26 import (
5ac549c… lmata 27 "bufio"
5ac549c… lmata 28 "bytes"
5ac549c… lmata 29 "context"
5ac549c… lmata 30 "encoding/json"
5ac549c… lmata 31 "fmt"
5ac549c… lmata 32 "io"
5ac549c… lmata 33 "net/http"
5ac549c… lmata 34 "strings"
5ac549c… lmata 35 "sync"
5ac549c… lmata 36 "time"
5ac549c… lmata 37 )
5ac549c… lmata 38
5ac549c… lmata 39 // Config configures a Relay.
5ac549c… lmata 40 type Config struct {
5ac549c… lmata 41 // ServerURL is the scuttlebot HTTP API base URL.
5ac549c… lmata 42 ServerURL string
5ac549c… lmata 43 // Token is the scuttlebot bearer token.
5ac549c… lmata 44 Token string
5ac549c… lmata 45 // Nick is this agent's IRC nick — used to filter out its own messages.
5ac549c… lmata 46 Nick string
5ac549c… lmata 47 // Channel is the IRC channel to post to and listen on.
5ac549c… lmata 48 Channel string
5ac549c… lmata 49 }
5ac549c… lmata 50
5ac549c… lmata 51 // Message is an inbound message from the channel.
5ac549c… lmata 52 type Message struct {
5ac549c… lmata 53 At time.Time `json:"at"`
5ac549c… lmata 54 Nick string `json:"nick"`
5ac549c… lmata 55 Text string `json:"text"`
5ac549c… lmata 56 Channel string `json:"channel"`
5ac549c… lmata 57 }
5ac549c… lmata 58
5ac549c… lmata 59 // Relay posts status messages to an IRC channel and surfaces inbound
5ac549c… lmata 60 // human messages to the running agent. It is safe for concurrent use.
5ac549c… lmata 61 type Relay struct {
1066004… lmata 62 cfg Config
1066004… lmata 63 http *http.Client
5ac549c… lmata 64
1066004… lmata 65 mu sync.Mutex
1066004… lmata 66 inbox []Message // buffered inbound messages not yet consumed
1066004… lmata 67 cancel context.CancelFunc
5ac549c… lmata 68 }
5ac549c… lmata 69
5ac549c… lmata 70 // New creates a Relay and starts listening for inbound messages via SSE.
5ac549c… lmata 71 // Call Close when done.
5ac549c… lmata 72 func New(cfg Config) (*Relay, error) {
5ac549c… lmata 73 if cfg.ServerURL == "" || cfg.Token == "" || cfg.Nick == "" || cfg.Channel == "" {
5ac549c… lmata 74 return nil, fmt.Errorf("agentrelay: ServerURL, Token, Nick, and Channel are required")
5ac549c… lmata 75 }
5ac549c… lmata 76 r := &Relay{
5ac549c… lmata 77 cfg: cfg,
5ac549c… lmata 78 http: &http.Client{Timeout: 0}, // no timeout for SSE
5ac549c… lmata 79 }
5ac549c… lmata 80 ctx, cancel := context.WithCancel(context.Background())
5ac549c… lmata 81 r.cancel = cancel
5ac549c… lmata 82 go r.streamLoop(ctx)
5ac549c… lmata 83 return r, nil
5ac549c… lmata 84 }
5ac549c… lmata 85
5ac549c… lmata 86 // Post sends a status message to the channel. Non-blocking.
5ac549c… lmata 87 func (r *Relay) Post(text string) error {
5ac549c… lmata 88 body, _ := json.Marshal(map[string]string{
5ac549c… lmata 89 "text": text,
5ac549c… lmata 90 "nick": r.cfg.Nick,
5ac549c… lmata 91 })
5ac549c… lmata 92 slug := strings.TrimPrefix(r.cfg.Channel, "#")
5ac549c… lmata 93 req, err := http.NewRequest("POST", r.cfg.ServerURL+"/v1/channels/"+slug+"/messages", bytes.NewReader(body))
5ac549c… lmata 94 if err != nil {
5ac549c… lmata 95 return err
5ac549c… lmata 96 }
5ac549c… lmata 97 req.Header.Set("Content-Type", "application/json")
5ac549c… lmata 98 req.Header.Set("Authorization", "Bearer "+r.cfg.Token)
5ac549c… lmata 99 resp, err := r.http.Do(req)
5ac549c… lmata 100 if err != nil {
5ac549c… lmata 101 return fmt.Errorf("agentrelay post: %w", err)
5ac549c… lmata 102 }
5ac549c… lmata 103 resp.Body.Close()
5ac549c… lmata 104 return nil
5ac549c… lmata 105 }
5ac549c… lmata 106
5ac549c… lmata 107 // Poll returns the oldest unread inbound message, if any.
5ac549c… lmata 108 // Returns false if there are no pending messages.
5ac549c… lmata 109 func (r *Relay) Poll() (Message, bool) {
5ac549c… lmata 110 r.mu.Lock()
5ac549c… lmata 111 defer r.mu.Unlock()
5ac549c… lmata 112 if len(r.inbox) == 0 {
5ac549c… lmata 113 return Message{}, false
5ac549c… lmata 114 }
5ac549c… lmata 115 msg := r.inbox[0]
5ac549c… lmata 116 r.inbox = r.inbox[1:]
5ac549c… lmata 117 return msg, true
5ac549c… lmata 118 }
5ac549c… lmata 119
5ac549c… lmata 120 // Drain returns all buffered inbound messages and clears the inbox.
5ac549c… lmata 121 func (r *Relay) Drain() []Message {
5ac549c… lmata 122 r.mu.Lock()
5ac549c… lmata 123 defer r.mu.Unlock()
5ac549c… lmata 124 msgs := r.inbox
5ac549c… lmata 125 r.inbox = nil
5ac549c… lmata 126 return msgs
5ac549c… lmata 127 }
5ac549c… lmata 128
5ac549c… lmata 129 // WaitFor blocks until a message containing keyword arrives (case-insensitive),
5ac549c… lmata 130 // or until timeout. Returns an error if the timeout elapses or a message
5ac549c… lmata 131 // containing "no" or "stop" arrives first.
5ac549c… lmata 132 func (r *Relay) WaitFor(keyword string, timeout time.Duration) error {
5ac549c… lmata 133 deadline := time.Now().Add(timeout)
5ac549c… lmata 134 for time.Now().Before(deadline) {
5ac549c… lmata 135 msg, ok := r.Poll()
5ac549c… lmata 136 if ok {
5ac549c… lmata 137 lower := strings.ToLower(msg.Text)
5ac549c… lmata 138 if strings.Contains(lower, strings.ToLower(keyword)) {
5ac549c… lmata 139 return nil
5ac549c… lmata 140 }
5ac549c… lmata 141 if strings.Contains(lower, "no") || strings.Contains(lower, "stop") || strings.Contains(lower, "abort") {
5ac549c… lmata 142 return fmt.Errorf("agentrelay: operator said %q", msg.Text)
5ac549c… lmata 143 }
5ac549c… lmata 144 }
5ac549c… lmata 145 time.Sleep(250 * time.Millisecond)
5ac549c… lmata 146 }
5ac549c… lmata 147 return fmt.Errorf("agentrelay: timed out waiting for %q", keyword)
5ac549c… lmata 148 }
5ac549c… lmata 149
5ac549c… lmata 150 // Close stops the background SSE listener.
5ac549c… lmata 151 func (r *Relay) Close() {
5ac549c… lmata 152 if r.cancel != nil {
5ac549c… lmata 153 r.cancel()
5ac549c… lmata 154 }
5ac549c… lmata 155 }
5ac549c… lmata 156
5ac549c… lmata 157 // streamLoop maintains an SSE connection and feeds inbound messages into inbox.
5ac549c… lmata 158 func (r *Relay) streamLoop(ctx context.Context) {
5ac549c… lmata 159 slug := strings.TrimPrefix(r.cfg.Channel, "#")
5ac549c… lmata 160 url := r.cfg.ServerURL + "/v1/channels/" + slug + "/stream?token=" + r.cfg.Token
5ac549c… lmata 161
5ac549c… lmata 162 for {
5ac549c… lmata 163 if ctx.Err() != nil {
5ac549c… lmata 164 return
5ac549c… lmata 165 }
5ac549c… lmata 166 if err := r.stream(ctx, url); err != nil && ctx.Err() == nil {
5ac549c… lmata 167 // Back off briefly before reconnecting.
5ac549c… lmata 168 select {
5ac549c… lmata 169 case <-ctx.Done():
5ac549c… lmata 170 return
5ac549c… lmata 171 case <-time.After(3 * time.Second):
5ac549c… lmata 172 }
5ac549c… lmata 173 }
5ac549c… lmata 174 }
5ac549c… lmata 175 }
5ac549c… lmata 176
5ac549c… lmata 177 func (r *Relay) stream(ctx context.Context, url string) error {
5ac549c… lmata 178 req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
5ac549c… lmata 179 if err != nil {
5ac549c… lmata 180 return err
5ac549c… lmata 181 }
5ac549c… lmata 182 req.Header.Set("Accept", "text/event-stream")
5ac549c… lmata 183 resp, err := r.http.Do(req)
5ac549c… lmata 184 if err != nil {
5ac549c… lmata 185 return err
5ac549c… lmata 186 }
5ac549c… lmata 187 defer resp.Body.Close()
5ac549c… lmata 188
5ac549c… lmata 189 scanner := bufio.NewScanner(resp.Body)
5ac549c… lmata 190 for scanner.Scan() {
5ac549c… lmata 191 line := scanner.Text()
5ac549c… lmata 192 if !strings.HasPrefix(line, "data: ") {
5ac549c… lmata 193 continue
5ac549c… lmata 194 }
5ac549c… lmata 195 data := line[6:]
5ac549c… lmata 196 var msg Message
5ac549c… lmata 197 if err := json.Unmarshal([]byte(data), &msg); err != nil {
5ac549c… lmata 198 continue
5ac549c… lmata 199 }
5ac549c… lmata 200 // Skip own messages and heartbeats.
5ac549c… lmata 201 if msg.Nick == r.cfg.Nick || msg.Text == "" {
5ac549c… lmata 202 continue
5ac549c… lmata 203 }
5ac549c… lmata 204 r.mu.Lock()
5ac549c… lmata 205 r.inbox = append(r.inbox, msg)
5ac549c… lmata 206 r.mu.Unlock()
5ac549c… lmata 207 }
5ac549c… lmata 208 return scanner.Err()
5ac549c… lmata 209 }
5ac549c… lmata 210
5ac549c… lmata 211 // Postf is a convenience wrapper for fmt.Sprintf + Post.
5ac549c… lmata 212 func (r *Relay) Postf(format string, args ...any) error {
5ac549c… lmata 213 return r.Post(fmt.Sprintf(format, args...))
5ac549c… lmata 214 }
5ac549c… lmata 215
5ac549c… lmata 216 // MustPost posts and panics on error. Useful in quick scripts.
5ac549c… lmata 217 func (r *Relay) MustPost(text string) {
5ac549c… lmata 218 if err := r.Post(text); err != nil {
5ac549c… lmata 219 panic(err)
5ac549c… lmata 220 }
5ac549c… lmata 221 }
5ac549c… lmata 222
5ac549c… lmata 223 // FetchHistory returns recent messages from the channel (for catching up).
5ac549c… lmata 224 func (r *Relay) FetchHistory(limit int) ([]Message, error) {
5ac549c… lmata 225 slug := strings.TrimPrefix(r.cfg.Channel, "#")
5ac549c… lmata 226 req, err := http.NewRequest("GET", fmt.Sprintf("%s/v1/channels/%s/messages", r.cfg.ServerURL, slug), nil)
5ac549c… lmata 227 if err != nil {
5ac549c… lmata 228 return nil, err
5ac549c… lmata 229 }
5ac549c… lmata 230 req.Header.Set("Authorization", "Bearer "+r.cfg.Token)
5ac549c… lmata 231 resp, err := r.http.Do(req)
5ac549c… lmata 232 if err != nil {
5ac549c… lmata 233 return nil, fmt.Errorf("agentrelay history: %w", err)
5ac549c… lmata 234 }
5ac549c… lmata 235 defer resp.Body.Close()
5ac549c… lmata 236 raw, _ := io.ReadAll(resp.Body)
5ac549c… lmata 237 var result struct {
5ac549c… lmata 238 Messages []Message `json:"messages"`
5ac549c… lmata 239 }
5ac549c… lmata 240 if err := json.Unmarshal(raw, &result); err != nil {
5ac549c… lmata 241 return nil, fmt.Errorf("agentrelay history parse: %w", err)
5ac549c… lmata 242 }
5ac549c… lmata 243 msgs := result.Messages
5ac549c… lmata 244 if limit > 0 && len(msgs) > limit {
5ac549c… lmata 245 msgs = msgs[len(msgs)-limit:]
5ac549c… lmata 246 }
5ac549c… lmata 247 // Filter out own messages.
5ac549c… lmata 248 out := msgs[:0]
5ac549c… lmata 249 for _, m := range msgs {
5ac549c… lmata 250 if m.Nick != r.cfg.Nick {
5ac549c… lmata 251 out = append(out, m)
5ac549c… lmata 252 }
5ac549c… lmata 253 }
5ac549c… lmata 254 return out, nil
5ac549c… lmata 255 }

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button