ScuttleBot

scuttlebot / cmd / codex-relay / main.go
Blame History Raw 1371 lines
1
package main
2
3
import (
4
"bufio"
5
"context"
6
"encoding/json"
7
"errors"
8
"fmt"
9
"hash/crc32"
10
"io"
11
"os"
12
"os/exec"
13
"os/signal"
14
"path/filepath"
15
"regexp"
16
"sort"
17
"strings"
18
"sync"
19
"syscall"
20
"time"
21
22
"github.com/conflicthq/scuttlebot/pkg/ircagent"
23
"github.com/conflicthq/scuttlebot/pkg/relaymirror"
24
"github.com/conflicthq/scuttlebot/pkg/sessionrelay"
25
"github.com/creack/pty"
26
"golang.org/x/term"
27
"gopkg.in/yaml.v3"
28
)
29
30
const (
31
defaultRelayURL = "http://localhost:8080"
32
defaultIRCAddr = "127.0.0.1:6667"
33
defaultChannel = "general"
34
defaultTransport = sessionrelay.TransportHTTP
35
defaultPollInterval = 2 * time.Second
36
defaultConnectWait = 30 * time.Second
37
defaultInjectDelay = 150 * time.Millisecond
38
defaultBusyWindow = 1500 * time.Millisecond
39
defaultHeartbeat = 60 * time.Second
40
defaultConfigFile = ".config/scuttlebot-relay.env"
41
defaultScanInterval = 250 * time.Millisecond
42
defaultDiscoverWait = 20 * time.Second
43
defaultMirrorLineMax = 360
44
)
45
46
var serviceBots = map[string]struct{}{
47
"bridge": {},
48
"oracle": {},
49
"sentinel": {},
50
"steward": {},
51
"scribe": {},
52
"warden": {},
53
"snitch": {},
54
"herald": {},
55
"scroll": {},
56
"systembot": {},
57
"auditbot": {},
58
}
59
60
var (
61
secretHexPattern = regexp.MustCompile(`\b[a-f0-9]{32,}\b`)
62
secretKeyPattern = regexp.MustCompile(`\bsk-[A-Za-z0-9_-]+\b`)
63
bearerPattern = regexp.MustCompile(`(?i)(bearer\s+)([A-Za-z0-9._:-]+)`)
64
assignTokenPattern = regexp.MustCompile(`(?i)\b([A-Z0-9_]*(TOKEN|KEY|SECRET|PASSPHRASE)[A-Z0-9_]*=)([^ \t"'` + "`" + `]+)`)
65
)
66
67
type config struct {
68
CodexBin string
69
ConfigFile string
70
Transport sessionrelay.Transport
71
URL string
72
Token string
73
IRCAddr string
74
IRCPass string
75
IRCAgentType string
76
IRCDeleteOnClose bool
77
Channel string
78
Channels []string
79
ChannelStateFile string
80
SessionID string
81
Nick string
82
HooksEnabled bool
83
InterruptOnMessage bool
84
MirrorReasoning bool
85
PollInterval time.Duration
86
HeartbeatInterval time.Duration
87
TargetCWD string
88
Args []string
89
}
90
91
type message = sessionrelay.Message
92
93
// mirrorLine is a single line of relay output with optional structured metadata.
94
type mirrorLine struct {
95
Text string
96
Meta json.RawMessage
97
}
98
99
type relayState struct {
100
mu sync.RWMutex
101
lastBusy time.Time
102
}
103
104
type sessionEnvelope struct {
105
Type string `json:"type"`
106
Timestamp string `json:"timestamp"`
107
Payload json.RawMessage `json:"payload"`
108
}
109
110
type sessionMetaPayload struct {
111
ID string `json:"id"`
112
Timestamp string `json:"timestamp"`
113
Cwd string `json:"cwd"`
114
}
115
116
type sessionResponsePayload struct {
117
Type string `json:"type"`
118
Name string `json:"name"`
119
Arguments string `json:"arguments"`
120
Input string `json:"input"`
121
Role string `json:"role"`
122
Phase string `json:"phase"`
123
Content []sessionContent `json:"content"`
124
}
125
126
type sessionContent struct {
127
Type string `json:"type"`
128
Text string `json:"text"`
129
}
130
131
type execCommandArgs struct {
132
Cmd string `json:"cmd"`
133
}
134
135
type parallelArgs struct {
136
ToolUses []struct {
137
RecipientName string `json:"recipient_name"`
138
Parameters map[string]interface{} `json:"parameters"`
139
} `json:"tool_uses"`
140
}
141
142
func main() {
143
cfg, err := loadConfig(os.Args[1:])
144
if err != nil {
145
fmt.Fprintln(os.Stderr, "codex-relay:", err)
146
os.Exit(1)
147
}
148
149
if err := run(cfg); err != nil {
150
fmt.Fprintln(os.Stderr, "codex-relay:", err)
151
os.Exit(1)
152
}
153
}
154
155
func run(cfg config) error {
156
fmt.Fprintf(os.Stderr, "codex-relay: nick %s\n", cfg.Nick)
157
relayRequested := cfg.HooksEnabled && shouldRelaySession(cfg.Args)
158
159
ctx, cancel := context.WithCancel(context.Background())
160
defer cancel()
161
_ = sessionrelay.RemoveChannelStateFile(cfg.ChannelStateFile)
162
defer func() { _ = sessionrelay.RemoveChannelStateFile(cfg.ChannelStateFile) }()
163
164
var relay sessionrelay.Connector
165
relayActive := false
166
var onlineAt time.Time
167
if relayRequested {
168
conn, err := sessionrelay.New(sessionrelay.Config{
169
Transport: cfg.Transport,
170
URL: cfg.URL,
171
Token: cfg.Token,
172
Channel: cfg.Channel,
173
Channels: cfg.Channels,
174
Nick: cfg.Nick,
175
IRC: sessionrelay.IRCConfig{
176
Addr: cfg.IRCAddr,
177
Pass: cfg.IRCPass,
178
AgentType: cfg.IRCAgentType,
179
DeleteOnClose: cfg.IRCDeleteOnClose,
180
},
181
})
182
if err != nil {
183
fmt.Fprintf(os.Stderr, "codex-relay: relay disabled: %v\n", err)
184
} else {
185
connectCtx, connectCancel := context.WithTimeout(ctx, defaultConnectWait)
186
if err := conn.Connect(connectCtx); err != nil {
187
fmt.Fprintf(os.Stderr, "codex-relay: relay disabled: %v\n", err)
188
_ = conn.Close(context.Background())
189
} else {
190
relay = conn
191
relayActive = true
192
if err := sessionrelay.WriteChannelStateFile(cfg.ChannelStateFile, relay.ControlChannel(), relay.Channels()); err != nil {
193
fmt.Fprintf(os.Stderr, "codex-relay: channel state disabled: %v\n", err)
194
}
195
onlineAt = time.Now()
196
_ = relay.Post(context.Background(), fmt.Sprintf(
197
"online in %s; mention %s to interrupt before the next action",
198
filepath.Base(cfg.TargetCWD), cfg.Nick,
199
))
200
}
201
connectCancel()
202
}
203
}
204
if relay != nil {
205
defer func() {
206
closeCtx, closeCancel := context.WithTimeout(context.Background(), defaultConnectWait)
207
defer closeCancel()
208
_ = relay.Close(closeCtx)
209
}()
210
}
211
212
cmd := exec.Command(cfg.CodexBin, cfg.Args...)
213
// Snapshot existing session files before starting the subprocess so
214
// discovery can distinguish our session from pre-existing ones.
215
var preExisting map[string]struct{}
216
if sessRoot, err := codexSessionsRoot(); err == nil {
217
preExisting = snapshotSessionFiles(sessRoot)
218
}
219
220
startedAt := time.Now()
221
cmd.Env = append(os.Environ(),
222
"SCUTTLEBOT_CONFIG_FILE="+cfg.ConfigFile,
223
"SCUTTLEBOT_URL="+cfg.URL,
224
"SCUTTLEBOT_TOKEN="+cfg.Token,
225
"SCUTTLEBOT_CHANNEL="+cfg.Channel,
226
"SCUTTLEBOT_CHANNELS="+strings.Join(cfg.Channels, ","),
227
"SCUTTLEBOT_CHANNEL_STATE_FILE="+cfg.ChannelStateFile,
228
"SCUTTLEBOT_HOOKS_ENABLED="+boolString(cfg.HooksEnabled),
229
"SCUTTLEBOT_SESSION_ID="+cfg.SessionID,
230
"SCUTTLEBOT_NICK="+cfg.Nick,
231
"SCUTTLEBOT_ACTIVITY_VIA_BROKER="+boolString(relayActive),
232
)
233
var ptyMirror *relaymirror.PTYMirror
234
if relayActive {
235
ptyMirror = relaymirror.NewPTYMirror(defaultMirrorLineMax, 500*time.Millisecond, func(line string) {
236
// no-op: session file mirror handles IRC output
237
})
238
go mirrorSessionLoop(ctx, relay, cfg, startedAt, preExisting, ptyMirror)
239
go presenceLoopPtr(ctx, &relay, cfg.HeartbeatInterval)
240
}
241
242
if !isInteractiveTTY() {
243
cmd.Stdin = os.Stdin
244
cmd.Stdout = os.Stdout
245
cmd.Stderr = os.Stderr
246
err := cmd.Run()
247
if err != nil {
248
exitCode := exitStatus(err)
249
if relayActive {
250
_ = relay.Post(context.Background(), fmt.Sprintf("offline (exit %d)", exitCode))
251
}
252
return err
253
}
254
if relayActive {
255
_ = relay.Post(context.Background(), "offline (exit 0)")
256
}
257
return nil
258
}
259
260
ptmx, err := pty.Start(cmd)
261
if err != nil {
262
return err
263
}
264
defer func() { _ = ptmx.Close() }()
265
266
state := &relayState{}
267
268
if err := pty.InheritSize(os.Stdin, ptmx); err == nil {
269
resizeCh := make(chan os.Signal, 1)
270
signal.Notify(resizeCh, syscall.SIGWINCH)
271
defer signal.Stop(resizeCh)
272
go func() {
273
for range resizeCh {
274
_ = pty.InheritSize(os.Stdin, ptmx)
275
}
276
}()
277
resizeCh <- syscall.SIGWINCH
278
}
279
280
oldState, err := term.MakeRaw(int(os.Stdin.Fd()))
281
if err != nil {
282
return err
283
}
284
defer func() { _ = term.Restore(int(os.Stdin.Fd()), oldState) }()
285
286
go func() {
287
_, _ = io.Copy(ptmx, os.Stdin)
288
}()
289
if ptyMirror != nil {
290
ptyMirror.BusyCallback = func(now time.Time) {
291
state.mu.Lock()
292
state.lastBusy = now
293
state.mu.Unlock()
294
}
295
go func() {
296
_ = ptyMirror.Copy(ptmx, os.Stdout)
297
}()
298
} else {
299
go func() {
300
copyPTYOutput(ptmx, os.Stdout, state)
301
}()
302
}
303
if relayActive {
304
go relayInputLoop(ctx, relay, cfg, state, ptmx, onlineAt)
305
go handleReconnectSignal(ctx, &relay, cfg, state, ptmx, startedAt)
306
}
307
308
err = cmd.Wait()
309
cancel()
310
311
exitCode := exitStatus(err)
312
if relayActive {
313
_ = relay.Post(context.Background(), fmt.Sprintf("offline (exit %d)", exitCode))
314
}
315
return err
316
}
317
318
func relayInputLoop(ctx context.Context, relay sessionrelay.Connector, cfg config, state *relayState, ptyFile *os.File, since time.Time) {
319
lastSeen := since
320
ticker := time.NewTicker(cfg.PollInterval)
321
defer ticker.Stop()
322
323
for {
324
select {
325
case <-ctx.Done():
326
return
327
case <-ticker.C:
328
messages, err := relay.MessagesSince(ctx, lastSeen)
329
if err != nil {
330
continue
331
}
332
batch, newest := filterMessages(messages, lastSeen, cfg.Nick, cfg.IRCAgentType)
333
if len(batch) == 0 {
334
continue
335
}
336
lastSeen = newest
337
pending := make([]message, 0, len(batch))
338
for _, msg := range batch {
339
handled, err := handleRelayCommand(ctx, relay, cfg, msg)
340
if err != nil {
341
if ctx.Err() == nil {
342
_ = relay.Post(context.Background(), fmt.Sprintf("input loop error: %v — session may be unsteerable", err))
343
}
344
return
345
}
346
if handled {
347
continue
348
}
349
pending = append(pending, msg)
350
}
351
if len(pending) == 0 {
352
continue
353
}
354
if err := injectMessages(ptyFile, cfg, state, relay.ControlChannel(), pending); err != nil {
355
if ctx.Err() == nil {
356
_ = relay.Post(context.Background(), fmt.Sprintf("input loop error: %v — session may be unsteerable", err))
357
}
358
return
359
}
360
}
361
}
362
}
363
364
func handleReconnectSignal(ctx context.Context, relayPtr *sessionrelay.Connector, cfg config, state *relayState, ptmx *os.File, startedAt time.Time) {
365
sigCh := make(chan os.Signal, 1)
366
signal.Notify(sigCh, syscall.SIGUSR1)
367
defer signal.Stop(sigCh)
368
369
for {
370
select {
371
case <-ctx.Done():
372
return
373
case <-sigCh:
374
}
375
376
fmt.Fprintf(os.Stderr, "codex-relay: received SIGUSR1, reconnecting IRC...\n")
377
old := *relayPtr
378
if old != nil {
379
_ = old.Close(context.Background())
380
}
381
382
// Retry with backoff.
383
wait := 2 * time.Second
384
for attempt := 0; attempt < 10; attempt++ {
385
if ctx.Err() != nil {
386
return
387
}
388
time.Sleep(wait)
389
390
conn, err := sessionrelay.New(sessionrelay.Config{
391
Transport: cfg.Transport,
392
URL: cfg.URL,
393
Token: cfg.Token,
394
Channel: cfg.Channel,
395
Channels: cfg.Channels,
396
Nick: cfg.Nick,
397
IRC: sessionrelay.IRCConfig{
398
Addr: cfg.IRCAddr,
399
Pass: "", // force re-registration
400
AgentType: cfg.IRCAgentType,
401
DeleteOnClose: cfg.IRCDeleteOnClose,
402
},
403
})
404
if err != nil {
405
wait = min(wait*2, 30*time.Second)
406
continue
407
}
408
409
connectCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
410
if err := conn.Connect(connectCtx); err != nil {
411
_ = conn.Close(context.Background())
412
cancel()
413
wait = min(wait*2, 30*time.Second)
414
continue
415
}
416
cancel()
417
418
*relayPtr = conn
419
now := time.Now()
420
_ = conn.Post(context.Background(), fmt.Sprintf(
421
"reconnected in %s; mention %s to interrupt",
422
filepath.Base(cfg.TargetCWD), cfg.Nick,
423
))
424
fmt.Fprintf(os.Stderr, "codex-relay: reconnected, restarting mirror and input loops\n")
425
426
// Restart mirror and input loops with the new connector.
427
// Use epoch time for mirror so it finds the existing session file
428
// regardless of when it was last modified.
429
go mirrorSessionLoop(ctx, conn, cfg, time.Time{}, nil, nil)
430
go relayInputLoop(ctx, conn, cfg, state, ptmx, now)
431
break
432
}
433
}
434
}
435
436
func presenceLoopPtr(ctx context.Context, relayPtr *sessionrelay.Connector, interval time.Duration) {
437
if interval <= 0 {
438
return
439
}
440
ticker := time.NewTicker(interval)
441
defer ticker.Stop()
442
for {
443
select {
444
case <-ctx.Done():
445
return
446
case <-ticker.C:
447
if r := *relayPtr; r != nil {
448
_ = r.Touch(ctx)
449
}
450
}
451
}
452
}
453
454
func injectMessages(writer io.Writer, cfg config, state *relayState, controlChannel string, batch []message) error {
455
lines := make([]string, 0, len(batch))
456
for _, msg := range batch {
457
text := ircagent.TrimAddressedText(strings.TrimSpace(msg.Text), cfg.Nick)
458
if text == "" {
459
text = strings.TrimSpace(msg.Text)
460
}
461
channelPrefix := ""
462
if msg.Channel != "" {
463
channelPrefix = "[" + strings.TrimPrefix(msg.Channel, "#") + "] "
464
}
465
if msg.Channel == "" || msg.Channel == controlChannel {
466
channelPrefix = "[" + strings.TrimPrefix(controlChannel, "#") + "] "
467
}
468
lines = append(lines, fmt.Sprintf("%s%s: %s", channelPrefix, msg.Nick, text))
469
}
470
471
var block strings.Builder
472
block.WriteString("[IRC operator messages]\n")
473
for _, line := range lines {
474
block.WriteString(line)
475
block.WriteByte('\n')
476
}
477
478
notice := "\r\n" + block.String() + "\r\n"
479
_, _ = os.Stdout.WriteString(notice)
480
481
if cfg.InterruptOnMessage && state.shouldInterrupt(time.Now()) {
482
if _, err := writer.Write([]byte{3}); err != nil {
483
return err
484
}
485
time.Sleep(defaultInjectDelay)
486
}
487
488
if _, err := writer.Write([]byte(block.String())); err != nil {
489
return err
490
}
491
_, err := writer.Write([]byte{'\r'})
492
return err
493
}
494
495
func handleRelayCommand(ctx context.Context, relay sessionrelay.Connector, cfg config, msg message) (bool, error) {
496
text := ircagent.TrimAddressedText(strings.TrimSpace(msg.Text), cfg.Nick)
497
if text == "" {
498
text = strings.TrimSpace(msg.Text)
499
}
500
501
cmd, ok := sessionrelay.ParseBrokerCommand(text)
502
if !ok {
503
return false, nil
504
}
505
506
postStatus := func(channel, text string) error {
507
if channel == "" {
508
channel = relay.ControlChannel()
509
}
510
return relay.PostTo(ctx, channel, text)
511
}
512
513
switch cmd.Name {
514
case "channels":
515
return true, postStatus(msg.Channel, fmt.Sprintf("channels: %s (control %s)", sessionrelay.FormatChannels(relay.Channels()), relay.ControlChannel()))
516
case "join":
517
if cmd.Channel == "" {
518
return true, postStatus(msg.Channel, "usage: /join #channel")
519
}
520
if err := relay.JoinChannel(ctx, cmd.Channel); err != nil {
521
return true, postStatus(msg.Channel, fmt.Sprintf("join %s failed: %v", cmd.Channel, err))
522
}
523
if err := sessionrelay.WriteChannelStateFile(cfg.ChannelStateFile, relay.ControlChannel(), relay.Channels()); err != nil {
524
return true, postStatus(msg.Channel, fmt.Sprintf("joined %s, but channel state update failed: %v", cmd.Channel, err))
525
}
526
return true, postStatus(msg.Channel, fmt.Sprintf("joined %s; channels: %s", cmd.Channel, sessionrelay.FormatChannels(relay.Channels())))
527
case "part":
528
if cmd.Channel == "" {
529
return true, postStatus(msg.Channel, "usage: /part #channel")
530
}
531
if err := relay.PartChannel(ctx, cmd.Channel); err != nil {
532
return true, postStatus(msg.Channel, fmt.Sprintf("part %s failed: %v", cmd.Channel, err))
533
}
534
if err := sessionrelay.WriteChannelStateFile(cfg.ChannelStateFile, relay.ControlChannel(), relay.Channels()); err != nil {
535
return true, postStatus(msg.Channel, fmt.Sprintf("parted %s, but channel state update failed: %v", cmd.Channel, err))
536
}
537
replyChannel := msg.Channel
538
if sameChannel(replyChannel, cmd.Channel) {
539
replyChannel = relay.ControlChannel()
540
}
541
return true, postStatus(replyChannel, fmt.Sprintf("parted %s; channels: %s", cmd.Channel, sessionrelay.FormatChannels(relay.Channels())))
542
default:
543
return false, nil
544
}
545
}
546
547
func copyPTYOutput(src io.Reader, dst io.Writer, state *relayState) {
548
buf := make([]byte, 4096)
549
for {
550
n, err := src.Read(buf)
551
if n > 0 {
552
state.observeOutput(buf[:n], time.Now())
553
if _, writeErr := dst.Write(buf[:n]); writeErr != nil {
554
return
555
}
556
}
557
if err != nil {
558
return
559
}
560
}
561
}
562
563
func (s *relayState) observeOutput(data []byte, now time.Time) {
564
if s == nil {
565
return
566
}
567
if strings.Contains(strings.ToLower(string(data)), "esc to interrupt") {
568
s.mu.Lock()
569
s.lastBusy = now
570
s.mu.Unlock()
571
}
572
}
573
574
func (s *relayState) shouldInterrupt(now time.Time) bool {
575
if s == nil {
576
return false
577
}
578
s.mu.RLock()
579
lastBusy := s.lastBusy
580
s.mu.RUnlock()
581
return !lastBusy.IsZero() && now.Sub(lastBusy) <= defaultBusyWindow
582
}
583
584
func filterMessages(messages []message, since time.Time, nick, agentType string) ([]message, time.Time) {
585
filtered := make([]message, 0, len(messages))
586
newest := since
587
for _, msg := range messages {
588
if msg.At.IsZero() || !msg.At.After(since) {
589
continue
590
}
591
if msg.At.After(newest) {
592
newest = msg.At
593
}
594
if msg.Nick == nick {
595
continue
596
}
597
if _, ok := serviceBots[msg.Nick]; ok {
598
continue
599
}
600
if ircagent.HasAnyPrefix(msg.Nick, ircagent.DefaultActivityPrefixes()) {
601
continue
602
}
603
if !ircagent.MentionsNick(msg.Text, nick) && !ircagent.MatchesGroupMention(msg.Text, nick, agentType) {
604
continue
605
}
606
filtered = append(filtered, msg)
607
}
608
sort.Slice(filtered, func(i, j int) bool {
609
return filtered[i].At.Before(filtered[j].At)
610
})
611
return filtered, newest
612
}
613
614
func loadConfig(args []string) (config, error) {
615
fileConfig := readEnvFile(configFilePath())
616
617
cfg := config{
618
CodexBin: getenvOr(fileConfig, "CODEX_BIN", "codex"),
619
ConfigFile: getenvOr(fileConfig, "SCUTTLEBOT_CONFIG_FILE", configFilePath()),
620
Transport: sessionrelay.Transport(strings.ToLower(getenvOr(fileConfig, "SCUTTLEBOT_TRANSPORT", string(defaultTransport)))),
621
URL: getenvOr(fileConfig, "SCUTTLEBOT_URL", defaultRelayURL),
622
Token: getenvOr(fileConfig, "SCUTTLEBOT_TOKEN", ""),
623
IRCAddr: getenvOr(fileConfig, "SCUTTLEBOT_IRC_ADDR", defaultIRCAddr),
624
IRCPass: getenvOr(fileConfig, "SCUTTLEBOT_IRC_PASS", ""),
625
IRCAgentType: getenvOr(fileConfig, "SCUTTLEBOT_IRC_AGENT_TYPE", "worker"),
626
IRCDeleteOnClose: getenvBoolOr(fileConfig, "SCUTTLEBOT_IRC_DELETE_ON_CLOSE", true),
627
HooksEnabled: getenvBoolOr(fileConfig, "SCUTTLEBOT_HOOKS_ENABLED", true),
628
InterruptOnMessage: getenvBoolOr(fileConfig, "SCUTTLEBOT_INTERRUPT_ON_MESSAGE", true),
629
MirrorReasoning: getenvBoolOr(fileConfig, "SCUTTLEBOT_MIRROR_REASONING", true),
630
PollInterval: getenvDurationOr(fileConfig, "SCUTTLEBOT_POLL_INTERVAL", defaultPollInterval),
631
HeartbeatInterval: getenvDurationAllowZeroOr(fileConfig, "SCUTTLEBOT_PRESENCE_HEARTBEAT", defaultHeartbeat),
632
Args: append([]string(nil), args...),
633
}
634
635
controlChannel := getenvOr(fileConfig, "SCUTTLEBOT_CHANNEL", defaultChannel)
636
cfg.Channels = sessionrelay.ChannelSlugs(sessionrelay.ParseEnvChannels(controlChannel, getenvOr(fileConfig, "SCUTTLEBOT_CHANNELS", "")))
637
if len(cfg.Channels) > 0 {
638
cfg.Channel = cfg.Channels[0]
639
}
640
641
target, err := targetCWD(args)
642
if err != nil {
643
return config{}, err
644
}
645
cfg.TargetCWD = target
646
647
// Merge per-repo config if present.
648
if rc, err := loadRepoConfig(target); err == nil && rc != nil {
649
cfg.Channels = mergeChannels(cfg.Channels, rc.allChannels())
650
}
651
652
sessionID := getenvOr(fileConfig, "SCUTTLEBOT_SESSION_ID", "")
653
if sessionID == "" {
654
sessionID = getenvOr(fileConfig, "CODEX_SESSION_ID", "")
655
}
656
if sessionID == "" {
657
sessionID = defaultSessionID(target)
658
}
659
cfg.SessionID = sanitize(sessionID)
660
661
nick := getenvOr(fileConfig, "SCUTTLEBOT_NICK", "")
662
if nick == "" {
663
nick = fmt.Sprintf("codex-%s-%s", sanitize(filepath.Base(target)), cfg.SessionID)
664
}
665
cfg.Nick = sanitize(nick)
666
cfg.ChannelStateFile = getenvOr(fileConfig, "SCUTTLEBOT_CHANNEL_STATE_FILE", defaultChannelStateFile(cfg.Nick))
667
668
if cfg.Channel == "" {
669
cfg.Channel = defaultChannel
670
cfg.Channels = []string{defaultChannel}
671
}
672
if cfg.Transport == sessionrelay.TransportHTTP && cfg.Token == "" {
673
cfg.HooksEnabled = false
674
}
675
return cfg, nil
676
}
677
678
func defaultChannelStateFile(nick string) string {
679
return filepath.Join(os.TempDir(), fmt.Sprintf(".scuttlebot-channels-%s.env", sanitize(nick)))
680
}
681
682
func sameChannel(a, b string) bool {
683
return strings.TrimPrefix(a, "#") == strings.TrimPrefix(b, "#")
684
}
685
686
func configFilePath() string {
687
if value := os.Getenv("SCUTTLEBOT_CONFIG_FILE"); value != "" {
688
return value
689
}
690
home, err := os.UserHomeDir()
691
if err != nil {
692
return defaultConfigFile
693
}
694
return filepath.Join(home, defaultConfigFile)
695
}
696
697
func readEnvFile(path string) map[string]string {
698
values := make(map[string]string)
699
file, err := os.Open(path)
700
if err != nil {
701
return values
702
}
703
defer file.Close()
704
705
scanner := bufio.NewScanner(file)
706
for scanner.Scan() {
707
line := strings.TrimSpace(scanner.Text())
708
if line == "" || strings.HasPrefix(line, "#") {
709
continue
710
}
711
line = strings.TrimPrefix(line, "export ")
712
key, value, ok := strings.Cut(line, "=")
713
if !ok {
714
continue
715
}
716
values[strings.TrimSpace(key)] = strings.TrimSpace(strings.Trim(value, `"'`))
717
}
718
return values
719
}
720
721
func getenvOr(file map[string]string, key, fallback string) string {
722
if value := os.Getenv(key); value != "" {
723
return value
724
}
725
if value := file[key]; value != "" {
726
return value
727
}
728
return fallback
729
}
730
731
func getenvBoolOr(file map[string]string, key string, fallback bool) bool {
732
value := getenvOr(file, key, "")
733
if value == "" {
734
return fallback
735
}
736
switch strings.ToLower(value) {
737
case "0", "false", "no", "off":
738
return false
739
default:
740
return true
741
}
742
}
743
744
func getenvDurationOr(file map[string]string, key string, fallback time.Duration) time.Duration {
745
value := getenvOr(file, key, "")
746
if value == "" {
747
return fallback
748
}
749
if strings.IndexFunc(value, func(r rune) bool { return r < '0' || r > '9' }) == -1 {
750
value += "s"
751
}
752
d, err := time.ParseDuration(value)
753
if err != nil || d <= 0 {
754
return fallback
755
}
756
return d
757
}
758
759
func getenvDurationAllowZeroOr(file map[string]string, key string, fallback time.Duration) time.Duration {
760
value := getenvOr(file, key, "")
761
if value == "" {
762
return fallback
763
}
764
if strings.IndexFunc(value, func(r rune) bool { return r < '0' || r > '9' }) == -1 {
765
value += "s"
766
}
767
d, err := time.ParseDuration(value)
768
if err != nil || d < 0 {
769
return fallback
770
}
771
return d
772
}
773
774
func targetCWD(args []string) (string, error) {
775
cwd, err := os.Getwd()
776
if err != nil {
777
return "", err
778
}
779
target := cwd
780
var prev string
781
for _, arg := range args {
782
switch {
783
case prev == "-C" || prev == "--cd":
784
target = arg
785
prev = ""
786
continue
787
case arg == "-C" || arg == "--cd":
788
prev = arg
789
continue
790
case strings.HasPrefix(arg, "-C="):
791
target = strings.TrimPrefix(arg, "-C=")
792
case strings.HasPrefix(arg, "--cd="):
793
target = strings.TrimPrefix(arg, "--cd=")
794
}
795
}
796
if filepath.IsAbs(target) {
797
return target, nil
798
}
799
return filepath.Abs(target)
800
}
801
802
func sanitize(value string) string {
803
var b strings.Builder
804
for _, r := range value {
805
switch {
806
case r >= 'a' && r <= 'z':
807
b.WriteRune(r)
808
case r >= 'A' && r <= 'Z':
809
b.WriteRune(r)
810
case r >= '0' && r <= '9':
811
b.WriteRune(r)
812
case r == '-' || r == '_':
813
b.WriteRune(r)
814
default:
815
b.WriteRune('-')
816
}
817
}
818
result := strings.Trim(b.String(), "-")
819
if result == "" {
820
return "session"
821
}
822
return result
823
}
824
825
func defaultSessionID(target string) string {
826
sum := crc32.ChecksumIEEE([]byte(fmt.Sprintf("%s|%d|%d|%d", target, os.Getpid(), os.Getppid(), time.Now().UnixNano())))
827
return fmt.Sprintf("%08x", sum)
828
}
829
830
func mirrorSessionLoop(ctx context.Context, relay sessionrelay.Connector, cfg config, startedAt time.Time, preExisting map[string]struct{}, ptyDedup *relaymirror.PTYMirror) {
831
for {
832
if ctx.Err() != nil {
833
return
834
}
835
sessionPath, err := discoverSessionPath(ctx, cfg, startedAt, preExisting)
836
if err != nil {
837
if ctx.Err() != nil {
838
return
839
}
840
time.Sleep(10 * time.Second)
841
continue
842
}
843
if err := tailSessionFile(ctx, sessionPath, cfg.MirrorReasoning, func(ml mirrorLine) {
844
for _, line := range splitMirrorText(ml.Text) {
845
if line == "" {
846
continue
847
}
848
if ptyDedup != nil {
849
ptyDedup.MarkSeen(line)
850
}
851
if len(ml.Meta) > 0 {
852
_ = relay.PostWithMeta(ctx, line, ml.Meta)
853
} else {
854
_ = relay.Post(ctx, line)
855
}
856
}
857
}); err != nil && ctx.Err() == nil {
858
time.Sleep(5 * time.Second)
859
continue
860
}
861
return
862
}
863
}
864
865
func discoverSessionPath(ctx context.Context, cfg config, startedAt time.Time, preExisting map[string]struct{}) (string, error) {
866
root, err := codexSessionsRoot()
867
if err != nil {
868
return "", err
869
}
870
871
if threadID := explicitThreadID(cfg.Args); threadID != "" {
872
return waitForSessionPath(ctx, func() (string, error) {
873
return findSessionPathByThreadID(root, threadID)
874
})
875
}
876
877
target := filepath.Clean(cfg.TargetCWD)
878
return waitForSessionPath(ctx, func() (string, error) {
879
return findLatestSessionPath(root, target, startedAt.Add(-2*time.Second), preExisting)
880
})
881
}
882
883
func waitForSessionPath(ctx context.Context, find func() (string, error)) (string, error) {
884
ctx, cancel := context.WithTimeout(ctx, defaultDiscoverWait)
885
defer cancel()
886
887
ticker := time.NewTicker(defaultScanInterval)
888
defer ticker.Stop()
889
890
for {
891
path, err := find()
892
if err == nil && path != "" {
893
return path, nil
894
}
895
select {
896
case <-ctx.Done():
897
return "", ctx.Err()
898
case <-ticker.C:
899
}
900
}
901
}
902
903
func tailSessionFile(ctx context.Context, path string, mirrorReasoning bool, emit func(mirrorLine)) error {
904
file, err := os.Open(path)
905
if err != nil {
906
return err
907
}
908
defer file.Close()
909
910
if _, err := file.Seek(0, io.SeekEnd); err != nil {
911
return err
912
}
913
914
reader := bufio.NewReader(file)
915
for {
916
line, err := reader.ReadBytes('\n')
917
if len(line) > 0 {
918
for _, ml := range sessionMessages(line, mirrorReasoning) {
919
if ml.Text != "" {
920
emit(ml)
921
}
922
}
923
}
924
if err == nil {
925
continue
926
}
927
if errors.Is(err, io.EOF) {
928
select {
929
case <-ctx.Done():
930
return nil
931
case <-time.After(defaultScanInterval):
932
}
933
continue
934
}
935
return err
936
}
937
}
938
939
func sessionMessages(line []byte, mirrorReasoning bool) []mirrorLine {
940
var env sessionEnvelope
941
if err := json.Unmarshal(line, &env); err != nil {
942
return nil
943
}
944
if env.Type != "response_item" {
945
return nil
946
}
947
948
var payload sessionResponsePayload
949
if err := json.Unmarshal(env.Payload, &payload); err != nil {
950
return nil
951
}
952
953
switch payload.Type {
954
case "function_call":
955
if msg := summarizeFunctionCall(payload.Name, payload.Arguments); msg != "" {
956
meta := codexToolMeta(payload.Name, payload.Arguments)
957
return []mirrorLine{{Text: msg, Meta: meta}}
958
}
959
case "custom_tool_call":
960
if msg := summarizeCustomToolCall(payload.Name, payload.Input); msg != "" {
961
meta := codexToolMeta(payload.Name, payload.Input)
962
return []mirrorLine{{Text: msg, Meta: meta}}
963
}
964
case "message":
965
if payload.Role != "assistant" {
966
return nil
967
}
968
return flattenAssistantContent(payload.Content, mirrorReasoning)
969
}
970
return nil
971
}
972
973
// codexToolMeta builds a JSON metadata envelope for a Codex tool call.
974
func codexToolMeta(name, argsJSON string) json.RawMessage {
975
data := map[string]string{"tool": name}
976
switch name {
977
case "exec_command":
978
var args execCommandArgs
979
if err := json.Unmarshal([]byte(argsJSON), &args); err == nil && args.Cmd != "" {
980
data["command"] = sanitizeSecrets(args.Cmd)
981
}
982
case "apply_patch":
983
files := patchTargets(argsJSON)
984
if len(files) > 0 {
985
data["file"] = files[0]
986
}
987
}
988
meta := map[string]any{"type": "tool_result", "data": data}
989
b, _ := json.Marshal(meta)
990
return b
991
}
992
993
func summarizeFunctionCall(name, argsJSON string) string {
994
switch name {
995
case "exec_command":
996
var args execCommandArgs
997
if err := json.Unmarshal([]byte(argsJSON), &args); err == nil && strings.TrimSpace(args.Cmd) != "" {
998
return "› " + sanitizeSecrets(compactCommand(args.Cmd))
999
}
1000
return "› command"
1001
case "parallel":
1002
var args parallelArgs
1003
if err := json.Unmarshal([]byte(argsJSON), &args); err == nil && len(args.ToolUses) > 0 {
1004
return fmt.Sprintf("parallel %d tools", len(args.ToolUses))
1005
}
1006
return "parallel"
1007
case "update_plan":
1008
return "plan updated"
1009
case "spawn_agent":
1010
return "spawn agent"
1011
default:
1012
if name == "" {
1013
return ""
1014
}
1015
return name
1016
}
1017
}
1018
1019
func summarizeCustomToolCall(name, input string) string {
1020
switch name {
1021
case "apply_patch":
1022
files := patchTargets(input)
1023
if len(files) == 0 {
1024
return "patch"
1025
}
1026
if len(files) == 1 {
1027
return "patch " + files[0]
1028
}
1029
return fmt.Sprintf("patch %d files: %s", len(files), strings.Join(files, ", "))
1030
default:
1031
if name == "" {
1032
return ""
1033
}
1034
return name
1035
}
1036
}
1037
1038
func flattenAssistantContent(content []sessionContent, mirrorReasoning bool) []mirrorLine {
1039
var lines []mirrorLine
1040
for _, item := range content {
1041
switch item.Type {
1042
case "output_text":
1043
for _, line := range splitMirrorText(item.Text) {
1044
if line != "" {
1045
lines = append(lines, mirrorLine{Text: line})
1046
}
1047
}
1048
case "reasoning":
1049
if mirrorReasoning {
1050
for _, line := range splitMirrorText(item.Text) {
1051
if line != "" {
1052
lines = append(lines, mirrorLine{Text: "💭 " + line})
1053
}
1054
}
1055
}
1056
}
1057
}
1058
return lines
1059
}
1060
1061
func compactCommand(cmd string) string {
1062
trimmed := strings.TrimSpace(cmd)
1063
trimmed = strings.Join(strings.Fields(trimmed), " ")
1064
if strings.HasPrefix(trimmed, "cd ") {
1065
if idx := strings.Index(trimmed, " && "); idx > 0 {
1066
trimmed = strings.TrimSpace(trimmed[idx+4:])
1067
}
1068
}
1069
if len(trimmed) > 140 {
1070
return trimmed[:140] + "..."
1071
}
1072
return trimmed
1073
}
1074
1075
func sanitizeSecrets(text string) string {
1076
if text == "" {
1077
return ""
1078
}
1079
text = bearerPattern.ReplaceAllString(text, "${1}[redacted]")
1080
text = assignTokenPattern.ReplaceAllString(text, "${1}[redacted]")
1081
text = secretKeyPattern.ReplaceAllString(text, "[redacted]")
1082
text = secretHexPattern.ReplaceAllString(text, "[redacted]")
1083
return text
1084
}
1085
1086
func splitMirrorText(text string) []string {
1087
clean := strings.ReplaceAll(text, "\r\n", "\n")
1088
clean = strings.ReplaceAll(clean, "\r", "\n")
1089
raw := strings.Split(clean, "\n")
1090
var out []string
1091
for _, line := range raw {
1092
line = strings.TrimSpace(line)
1093
if line == "" {
1094
continue
1095
}
1096
for len(line) > defaultMirrorLineMax {
1097
cut := strings.LastIndex(line[:defaultMirrorLineMax], " ")
1098
if cut <= 0 {
1099
cut = defaultMirrorLineMax
1100
}
1101
out = append(out, line[:cut])
1102
line = strings.TrimSpace(line[cut:])
1103
}
1104
if line != "" {
1105
out = append(out, line)
1106
}
1107
}
1108
return out
1109
}
1110
1111
func patchTargets(input string) []string {
1112
var files []string
1113
for _, line := range strings.Split(input, "\n") {
1114
line = strings.TrimSpace(line)
1115
for _, prefix := range []string{"*** Add File: ", "*** Update File: ", "*** Delete File: "} {
1116
if strings.HasPrefix(line, prefix) {
1117
files = append(files, strings.TrimSpace(strings.TrimPrefix(line, prefix)))
1118
break
1119
}
1120
}
1121
}
1122
return files
1123
}
1124
1125
func explicitThreadID(args []string) string {
1126
for i := 0; i < len(args)-1; i++ {
1127
if args[i] == "resume" {
1128
return strings.TrimSpace(args[i+1])
1129
}
1130
}
1131
return ""
1132
}
1133
1134
// snapshotSessionFiles returns the set of .jsonl file paths currently under root.
1135
// Called before starting the Codex subprocess so discovery can skip pre-existing
1136
// sessions and deterministically find the one our subprocess creates.
1137
func snapshotSessionFiles(root string) map[string]struct{} {
1138
existing := make(map[string]struct{})
1139
_ = filepath.WalkDir(root, func(path string, d os.DirEntry, err error) error {
1140
if err != nil || d.IsDir() || !strings.HasSuffix(path, ".jsonl") {
1141
return nil
1142
}
1143
existing[path] = struct{}{}
1144
return nil
1145
})
1146
return existing
1147
}
1148
1149
func codexSessionsRoot() (string, error) {
1150
if value := os.Getenv("CODEX_HOME"); value != "" {
1151
return filepath.Join(value, "sessions"), nil
1152
}
1153
home, err := os.UserHomeDir()
1154
if err != nil {
1155
return "", err
1156
}
1157
return filepath.Join(home, ".codex", "sessions"), nil
1158
}
1159
1160
func findSessionPathByThreadID(root, threadID string) (string, error) {
1161
var match string
1162
err := filepath.WalkDir(root, func(path string, d os.DirEntry, walkErr error) error {
1163
if walkErr != nil || d.IsDir() || !strings.HasSuffix(path, ".jsonl") {
1164
return nil
1165
}
1166
if strings.Contains(path, threadID) {
1167
match = path
1168
return io.EOF
1169
}
1170
return nil
1171
})
1172
if err != nil && !errors.Is(err, io.EOF) {
1173
return "", err
1174
}
1175
if match == "" {
1176
return "", os.ErrNotExist
1177
}
1178
return match, nil
1179
}
1180
1181
// findLatestSessionPath finds the .jsonl file in root that was created by our
1182
// subprocess. It uses a pre-existing file snapshot to skip sessions that
1183
// existed before the subprocess started, then filters by CWD and picks the
1184
// oldest new match. When preExisting is nil (reconnect), it falls back to
1185
// accepting any file whose timestamp is >= notBefore.
1186
func findLatestSessionPath(root, target string, notBefore time.Time, preExisting map[string]struct{}) (string, error) {
1187
type candidate struct {
1188
path string
1189
ts time.Time
1190
}
1191
var candidates []candidate
1192
1193
err := filepath.WalkDir(root, func(path string, d os.DirEntry, walkErr error) error {
1194
if walkErr != nil || d.IsDir() || !strings.HasSuffix(path, ".jsonl") {
1195
return nil
1196
}
1197
if preExisting != nil {
1198
if _, existed := preExisting[path]; existed {
1199
return nil
1200
}
1201
}
1202
meta, ts, err := readSessionMeta(path)
1203
if err != nil {
1204
return nil
1205
}
1206
if filepath.Clean(meta.Cwd) != target {
1207
return nil
1208
}
1209
if ts.Before(notBefore) {
1210
return nil
1211
}
1212
candidates = append(candidates, candidate{path: path, ts: ts})
1213
return nil
1214
})
1215
if err != nil {
1216
return "", err
1217
}
1218
if len(candidates) == 0 {
1219
return "", os.ErrNotExist
1220
}
1221
// Pick the oldest new session — the first file created after our
1222
// subprocess started is most likely ours.
1223
sort.Slice(candidates, func(i, j int) bool {
1224
return candidates[i].ts.Before(candidates[j].ts)
1225
})
1226
return candidates[0].path, nil
1227
}
1228
1229
func readSessionMeta(path string) (sessionMetaPayload, time.Time, error) {
1230
file, err := os.Open(path)
1231
if err != nil {
1232
return sessionMetaPayload{}, time.Time{}, err
1233
}
1234
defer file.Close()
1235
1236
scanner := bufio.NewScanner(file)
1237
if !scanner.Scan() {
1238
if err := scanner.Err(); err != nil {
1239
return sessionMetaPayload{}, time.Time{}, err
1240
}
1241
return sessionMetaPayload{}, time.Time{}, io.EOF
1242
}
1243
1244
var env sessionEnvelope
1245
if err := json.Unmarshal(scanner.Bytes(), &env); err != nil {
1246
return sessionMetaPayload{}, time.Time{}, err
1247
}
1248
if env.Type != "session_meta" {
1249
return sessionMetaPayload{}, time.Time{}, io.EOF
1250
}
1251
1252
var meta sessionMetaPayload
1253
if err := json.Unmarshal(env.Payload, &meta); err != nil {
1254
return sessionMetaPayload{}, time.Time{}, err
1255
}
1256
1257
if ts, err := time.Parse(time.RFC3339Nano, meta.Timestamp); err == nil {
1258
return meta, ts, nil
1259
}
1260
info, err := file.Stat()
1261
if err != nil {
1262
return meta, time.Time{}, nil
1263
}
1264
return meta, info.ModTime(), nil
1265
}
1266
1267
func isInteractiveTTY() bool {
1268
return term.IsTerminal(int(os.Stdin.Fd())) && term.IsTerminal(int(os.Stdout.Fd()))
1269
}
1270
1271
func boolString(v bool) string {
1272
if v {
1273
return "1"
1274
}
1275
return "0"
1276
}
1277
1278
func shouldRelaySession(args []string) bool {
1279
for _, arg := range args {
1280
switch arg {
1281
case "-h", "--help", "-V", "--version":
1282
return false
1283
}
1284
}
1285
1286
for _, arg := range args {
1287
if strings.HasPrefix(arg, "-") {
1288
continue
1289
}
1290
switch arg {
1291
case "help", "completion":
1292
return false
1293
default:
1294
return true
1295
}
1296
}
1297
1298
return true
1299
}
1300
1301
func exitStatus(err error) int {
1302
if err == nil {
1303
return 0
1304
}
1305
var exitErr *exec.ExitError
1306
if errors.As(err, &exitErr) {
1307
return exitErr.ExitCode()
1308
}
1309
return 1
1310
}
1311
1312
// repoConfig is the per-repo .scuttlebot.yaml format.
1313
type repoConfig struct {
1314
Channel string `yaml:"channel"`
1315
Channels []string `yaml:"channels"`
1316
}
1317
1318
// allChannels returns the singular channel (if set) prepended to the channels list.
1319
func (rc *repoConfig) allChannels() []string {
1320
if rc.Channel == "" {
1321
return rc.Channels
1322
}
1323
return append([]string{rc.Channel}, rc.Channels...)
1324
}
1325
1326
// loadRepoConfig walks up from dir looking for .scuttlebot.yaml.
1327
// Stops at the git root (directory containing .git) or the filesystem root.
1328
// Returns nil, nil if no config file is found.
1329
func loadRepoConfig(dir string) (*repoConfig, error) {
1330
current := dir
1331
for {
1332
candidate := filepath.Join(current, ".scuttlebot.yaml")
1333
if data, err := os.ReadFile(candidate); err == nil {
1334
var rc repoConfig
1335
if err := yaml.Unmarshal(data, &rc); err != nil {
1336
return nil, fmt.Errorf("loadRepoConfig: parse %s: %w", candidate, err)
1337
}
1338
fmt.Fprintf(os.Stderr, "scuttlebot: loaded repo config from %s\n", candidate)
1339
return &rc, nil
1340
}
1341
1342
// Stop if this directory is a git root.
1343
if info, err := os.Stat(filepath.Join(current, ".git")); err == nil && info.IsDir() {
1344
return nil, nil
1345
}
1346
1347
parent := filepath.Dir(current)
1348
if parent == current {
1349
return nil, nil
1350
}
1351
current = parent
1352
}
1353
}
1354
1355
// mergeChannels appends extra channels to existing, deduplicating.
1356
func mergeChannels(existing, extra []string) []string {
1357
seen := make(map[string]struct{}, len(existing))
1358
for _, ch := range existing {
1359
seen[ch] = struct{}{}
1360
}
1361
merged := append([]string(nil), existing...)
1362
for _, ch := range extra {
1363
if _, ok := seen[ch]; ok {
1364
continue
1365
}
1366
seen[ch] = struct{}{}
1367
merged = append(merged, ch)
1368
}
1369
return merged
1370
}
1371

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button