ScuttleBot

scuttlebot / pkg / agentrelay / relay.go
Blame History Raw 256 lines
1
// Package agentrelay lets any agent post status to a scuttlebot IRC channel
2
// and receive human instructions mid-work.
3
//
4
// Typical usage:
5
//
6
// relay, err := agentrelay.New(agentrelay.Config{
7
// ServerURL: "http://localhost:8080",
8
// Token: os.Getenv("SCUTTLEBOT_TOKEN"),
9
// Nick: "my-agent",
10
// Channel: "#fleet",
11
// })
12
// relay.Post("starting task: rewrite auth module")
13
//
14
// // between steps, check for human instructions
15
// if msg, ok := relay.Poll(); ok {
16
// // msg.Text is what the human said — incorporate or surface to agent
17
// }
18
//
19
// // or block waiting for approval
20
// relay.Post("about to drop table users — approve?")
21
// if err := relay.WaitFor("yes", 2*time.Minute); err != nil {
22
// // timed out or got "no" — abort
23
// }
24
package agentrelay
25
26
import (
27
"bufio"
28
"bytes"
29
"context"
30
"encoding/json"
31
"fmt"
32
"io"
33
"net/http"
34
"strings"
35
"sync"
36
"time"
37
)
38
39
// Config configures a Relay.
40
type Config struct {
41
// ServerURL is the scuttlebot HTTP API base URL.
42
ServerURL string
43
// Token is the scuttlebot bearer token.
44
Token string
45
// Nick is this agent's IRC nick — used to filter out its own messages.
46
Nick string
47
// Channel is the IRC channel to post to and listen on.
48
Channel string
49
}
50
51
// Message is an inbound message from the channel.
52
type Message struct {
53
At time.Time `json:"at"`
54
Nick string `json:"nick"`
55
Text string `json:"text"`
56
Channel string `json:"channel"`
57
}
58
59
// Relay posts status messages to an IRC channel and surfaces inbound
60
// human messages to the running agent. It is safe for concurrent use.
61
type Relay struct {
62
cfg Config
63
http *http.Client
64
65
mu sync.Mutex
66
inbox []Message // buffered inbound messages not yet consumed
67
cancel context.CancelFunc
68
}
69
70
// New creates a Relay and starts listening for inbound messages via SSE.
71
// Call Close when done.
72
func New(cfg Config) (*Relay, error) {
73
if cfg.ServerURL == "" || cfg.Token == "" || cfg.Nick == "" || cfg.Channel == "" {
74
return nil, fmt.Errorf("agentrelay: ServerURL, Token, Nick, and Channel are required")
75
}
76
r := &Relay{
77
cfg: cfg,
78
http: &http.Client{Timeout: 0}, // no timeout for SSE
79
}
80
ctx, cancel := context.WithCancel(context.Background())
81
r.cancel = cancel
82
go r.streamLoop(ctx)
83
return r, nil
84
}
85
86
// Post sends a status message to the channel. Non-blocking.
87
func (r *Relay) Post(text string) error {
88
body, _ := json.Marshal(map[string]string{
89
"text": text,
90
"nick": r.cfg.Nick,
91
})
92
slug := strings.TrimPrefix(r.cfg.Channel, "#")
93
req, err := http.NewRequest("POST", r.cfg.ServerURL+"/v1/channels/"+slug+"/messages", bytes.NewReader(body))
94
if err != nil {
95
return err
96
}
97
req.Header.Set("Content-Type", "application/json")
98
req.Header.Set("Authorization", "Bearer "+r.cfg.Token)
99
resp, err := r.http.Do(req)
100
if err != nil {
101
return fmt.Errorf("agentrelay post: %w", err)
102
}
103
resp.Body.Close()
104
return nil
105
}
106
107
// Poll returns the oldest unread inbound message, if any.
108
// Returns false if there are no pending messages.
109
func (r *Relay) Poll() (Message, bool) {
110
r.mu.Lock()
111
defer r.mu.Unlock()
112
if len(r.inbox) == 0 {
113
return Message{}, false
114
}
115
msg := r.inbox[0]
116
r.inbox = r.inbox[1:]
117
return msg, true
118
}
119
120
// Drain returns all buffered inbound messages and clears the inbox.
121
func (r *Relay) Drain() []Message {
122
r.mu.Lock()
123
defer r.mu.Unlock()
124
msgs := r.inbox
125
r.inbox = nil
126
return msgs
127
}
128
129
// WaitFor blocks until a message containing keyword arrives (case-insensitive),
130
// or until timeout. Returns an error if the timeout elapses or a message
131
// containing "no" or "stop" arrives first.
132
func (r *Relay) WaitFor(keyword string, timeout time.Duration) error {
133
deadline := time.Now().Add(timeout)
134
for time.Now().Before(deadline) {
135
msg, ok := r.Poll()
136
if ok {
137
lower := strings.ToLower(msg.Text)
138
if strings.Contains(lower, strings.ToLower(keyword)) {
139
return nil
140
}
141
if strings.Contains(lower, "no") || strings.Contains(lower, "stop") || strings.Contains(lower, "abort") {
142
return fmt.Errorf("agentrelay: operator said %q", msg.Text)
143
}
144
}
145
time.Sleep(250 * time.Millisecond)
146
}
147
return fmt.Errorf("agentrelay: timed out waiting for %q", keyword)
148
}
149
150
// Close stops the background SSE listener.
151
func (r *Relay) Close() {
152
if r.cancel != nil {
153
r.cancel()
154
}
155
}
156
157
// streamLoop maintains an SSE connection and feeds inbound messages into inbox.
158
func (r *Relay) streamLoop(ctx context.Context) {
159
slug := strings.TrimPrefix(r.cfg.Channel, "#")
160
url := r.cfg.ServerURL + "/v1/channels/" + slug + "/stream?token=" + r.cfg.Token
161
162
for {
163
if ctx.Err() != nil {
164
return
165
}
166
if err := r.stream(ctx, url); err != nil && ctx.Err() == nil {
167
// Back off briefly before reconnecting.
168
select {
169
case <-ctx.Done():
170
return
171
case <-time.After(3 * time.Second):
172
}
173
}
174
}
175
}
176
177
func (r *Relay) stream(ctx context.Context, url string) error {
178
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
179
if err != nil {
180
return err
181
}
182
req.Header.Set("Accept", "text/event-stream")
183
resp, err := r.http.Do(req)
184
if err != nil {
185
return err
186
}
187
defer resp.Body.Close()
188
189
scanner := bufio.NewScanner(resp.Body)
190
for scanner.Scan() {
191
line := scanner.Text()
192
if !strings.HasPrefix(line, "data: ") {
193
continue
194
}
195
data := line[6:]
196
var msg Message
197
if err := json.Unmarshal([]byte(data), &msg); err != nil {
198
continue
199
}
200
// Skip own messages and heartbeats.
201
if msg.Nick == r.cfg.Nick || msg.Text == "" {
202
continue
203
}
204
r.mu.Lock()
205
r.inbox = append(r.inbox, msg)
206
r.mu.Unlock()
207
}
208
return scanner.Err()
209
}
210
211
// Postf is a convenience wrapper for fmt.Sprintf + Post.
212
func (r *Relay) Postf(format string, args ...any) error {
213
return r.Post(fmt.Sprintf(format, args...))
214
}
215
216
// MustPost posts and panics on error. Useful in quick scripts.
217
func (r *Relay) MustPost(text string) {
218
if err := r.Post(text); err != nil {
219
panic(err)
220
}
221
}
222
223
// FetchHistory returns recent messages from the channel (for catching up).
224
func (r *Relay) FetchHistory(limit int) ([]Message, error) {
225
slug := strings.TrimPrefix(r.cfg.Channel, "#")
226
req, err := http.NewRequest("GET", fmt.Sprintf("%s/v1/channels/%s/messages", r.cfg.ServerURL, slug), nil)
227
if err != nil {
228
return nil, err
229
}
230
req.Header.Set("Authorization", "Bearer "+r.cfg.Token)
231
resp, err := r.http.Do(req)
232
if err != nil {
233
return nil, fmt.Errorf("agentrelay history: %w", err)
234
}
235
defer resp.Body.Close()
236
raw, _ := io.ReadAll(resp.Body)
237
var result struct {
238
Messages []Message `json:"messages"`
239
}
240
if err := json.Unmarshal(raw, &result); err != nil {
241
return nil, fmt.Errorf("agentrelay history parse: %w", err)
242
}
243
msgs := result.Messages
244
if limit > 0 && len(msgs) > limit {
245
msgs = msgs[len(msgs)-limit:]
246
}
247
// Filter out own messages.
248
out := msgs[:0]
249
for _, m := range msgs {
250
if m.Nick != r.cfg.Nick {
251
out = append(out, m)
252
}
253
}
254
return out, nil
255
}
256

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button