ScuttleBot
Merge pull request #149 from ConflictHQ/feature/62-relay-dual-mirror feat: dual-path relay mirroring — PTY + session files for all 3 relays
Commit
3be3167b80b6990d1085ee7418669c5d60e5b8cd6d280f18b4a605351ea8f49c
Parent
4bb45bdcb34f63a…
6 files changed
+30
-7
+25
-6
+103
-13
+119
+128
+123
+30
-7
| --- cmd/claude-relay/main.go | ||
| +++ cmd/claude-relay/main.go | ||
| @@ -18,10 +18,11 @@ | ||
| 18 | 18 | "sync" |
| 19 | 19 | "syscall" |
| 20 | 20 | "time" |
| 21 | 21 | |
| 22 | 22 | "github.com/conflicthq/scuttlebot/pkg/ircagent" |
| 23 | + "github.com/conflicthq/scuttlebot/pkg/relaymirror" | |
| 23 | 24 | "github.com/conflicthq/scuttlebot/pkg/sessionrelay" |
| 24 | 25 | "github.com/creack/pty" |
| 25 | 26 | "github.com/google/uuid" |
| 26 | 27 | "golang.org/x/term" |
| 27 | 28 | "gopkg.in/yaml.v3" |
| @@ -211,12 +212,17 @@ | ||
| 211 | 212 | "SCUTTLEBOT_HOOKS_ENABLED="+boolString(cfg.HooksEnabled), |
| 212 | 213 | "SCUTTLEBOT_SESSION_ID="+cfg.SessionID, |
| 213 | 214 | "SCUTTLEBOT_NICK="+cfg.Nick, |
| 214 | 215 | "SCUTTLEBOT_ACTIVITY_VIA_BROKER="+boolString(relayActive), |
| 215 | 216 | ) |
| 217 | + // Create PTY mirror early so session file loop can dedup against it. | |
| 218 | + var ptyMirror *relaymirror.PTYMirror | |
| 216 | 219 | if relayActive { |
| 217 | - go mirrorSessionLoop(ctx, relay, cfg, startedAt) | |
| 220 | + ptyMirror = relaymirror.NewPTYMirror(defaultMirrorLineMax, 500*time.Millisecond, func(line string) { | |
| 221 | + _ = relay.Post(ctx, line) | |
| 222 | + }) | |
| 223 | + go mirrorSessionLoop(ctx, relay, cfg, startedAt, ptyMirror) | |
| 218 | 224 | go presenceLoopPtr(ctx, &relay, cfg.HeartbeatInterval) |
| 219 | 225 | } |
| 220 | 226 | |
| 221 | 227 | if !isInteractiveTTY() { |
| 222 | 228 | cmd.Stdin = os.Stdin |
| @@ -263,13 +269,25 @@ | ||
| 263 | 269 | defer func() { _ = term.Restore(int(os.Stdin.Fd()), oldState) }() |
| 264 | 270 | |
| 265 | 271 | go func() { |
| 266 | 272 | _, _ = io.Copy(ptmx, os.Stdin) |
| 267 | 273 | }() |
| 268 | - go func() { | |
| 269 | - copyPTYOutput(ptmx, os.Stdout, state) | |
| 270 | - }() | |
| 274 | + // Wire PTY mirror for dual-path: real-time text to IRC + session file for metadata. | |
| 275 | + if ptyMirror != nil { | |
| 276 | + ptyMirror.BusyCallback = func(now time.Time) { | |
| 277 | + state.mu.Lock() | |
| 278 | + state.lastBusy = now | |
| 279 | + state.mu.Unlock() | |
| 280 | + } | |
| 281 | + go func() { | |
| 282 | + _ = ptyMirror.Copy(ptmx, os.Stdout) | |
| 283 | + }() | |
| 284 | + } else { | |
| 285 | + go func() { | |
| 286 | + copyPTYOutput(ptmx, os.Stdout, state) | |
| 287 | + }() | |
| 288 | + } | |
| 271 | 289 | if relayActive { |
| 272 | 290 | go relayInputLoop(ctx, relay, cfg, state, ptmx, onlineAt) |
| 273 | 291 | go handleReconnectSignal(ctx, &relay, cfg, state, ptmx, startedAt) |
| 274 | 292 | } |
| 275 | 293 | |
| @@ -283,28 +301,33 @@ | ||
| 283 | 301 | return err |
| 284 | 302 | } |
| 285 | 303 | |
| 286 | 304 | // --- Session mirroring --- |
| 287 | 305 | |
| 288 | -func mirrorSessionLoop(ctx context.Context, relay sessionrelay.Connector, cfg config, startedAt time.Time) { | |
| 306 | +func mirrorSessionLoop(ctx context.Context, relay sessionrelay.Connector, cfg config, startedAt time.Time, ptyDedup *relaymirror.PTYMirror) { | |
| 289 | 307 | for { |
| 290 | 308 | if ctx.Err() != nil { |
| 291 | 309 | return |
| 292 | 310 | } |
| 293 | 311 | sessionPath, err := discoverSessionPath(ctx, cfg, startedAt) |
| 294 | 312 | if err != nil { |
| 295 | 313 | if ctx.Err() != nil { |
| 296 | 314 | return |
| 297 | 315 | } |
| 298 | - // Session not found yet — wait and retry instead of giving up. | |
| 316 | + fmt.Fprintf(os.Stderr, "claude-relay: session discovery failed: %v (retrying in 10s)\n", err) | |
| 299 | 317 | time.Sleep(10 * time.Second) |
| 300 | 318 | continue |
| 301 | 319 | } |
| 320 | + fmt.Fprintf(os.Stderr, "claude-relay: session file discovered: %s\n", sessionPath) | |
| 302 | 321 | if err := tailSessionFile(ctx, sessionPath, cfg.MirrorReasoning, func(ml mirrorLine) { |
| 303 | 322 | for _, line := range splitMirrorText(ml.Text) { |
| 304 | 323 | if line == "" { |
| 305 | 324 | continue |
| 325 | + } | |
| 326 | + // Mark as seen so PTY mirror deduplicates. | |
| 327 | + if ptyDedup != nil { | |
| 328 | + ptyDedup.MarkSeen(line) | |
| 306 | 329 | } |
| 307 | 330 | if len(ml.Meta) > 0 { |
| 308 | 331 | _ = relay.PostWithMeta(ctx, line, ml.Meta) |
| 309 | 332 | } else { |
| 310 | 333 | _ = relay.Post(ctx, line) |
| @@ -748,11 +771,11 @@ | ||
| 748 | 771 | fmt.Fprintf(os.Stderr, "claude-relay: reconnected, restarting mirror and input loops\n") |
| 749 | 772 | |
| 750 | 773 | // Restart mirror and input loops with the new connector. |
| 751 | 774 | // Use epoch time for mirror so it finds the existing session file |
| 752 | 775 | // regardless of when it was last modified. |
| 753 | - go mirrorSessionLoop(ctx, conn, cfg, time.Time{}) | |
| 776 | + go mirrorSessionLoop(ctx, conn, cfg, time.Time{}, nil) | |
| 754 | 777 | go relayInputLoop(ctx, conn, cfg, state, ptmx, now) |
| 755 | 778 | break |
| 756 | 779 | } |
| 757 | 780 | } |
| 758 | 781 | } |
| 759 | 782 |
| --- cmd/claude-relay/main.go | |
| +++ cmd/claude-relay/main.go | |
| @@ -18,10 +18,11 @@ | |
| 18 | "sync" |
| 19 | "syscall" |
| 20 | "time" |
| 21 | |
| 22 | "github.com/conflicthq/scuttlebot/pkg/ircagent" |
| 23 | "github.com/conflicthq/scuttlebot/pkg/sessionrelay" |
| 24 | "github.com/creack/pty" |
| 25 | "github.com/google/uuid" |
| 26 | "golang.org/x/term" |
| 27 | "gopkg.in/yaml.v3" |
| @@ -211,12 +212,17 @@ | |
| 211 | "SCUTTLEBOT_HOOKS_ENABLED="+boolString(cfg.HooksEnabled), |
| 212 | "SCUTTLEBOT_SESSION_ID="+cfg.SessionID, |
| 213 | "SCUTTLEBOT_NICK="+cfg.Nick, |
| 214 | "SCUTTLEBOT_ACTIVITY_VIA_BROKER="+boolString(relayActive), |
| 215 | ) |
| 216 | if relayActive { |
| 217 | go mirrorSessionLoop(ctx, relay, cfg, startedAt) |
| 218 | go presenceLoopPtr(ctx, &relay, cfg.HeartbeatInterval) |
| 219 | } |
| 220 | |
| 221 | if !isInteractiveTTY() { |
| 222 | cmd.Stdin = os.Stdin |
| @@ -263,13 +269,25 @@ | |
| 263 | defer func() { _ = term.Restore(int(os.Stdin.Fd()), oldState) }() |
| 264 | |
| 265 | go func() { |
| 266 | _, _ = io.Copy(ptmx, os.Stdin) |
| 267 | }() |
| 268 | go func() { |
| 269 | copyPTYOutput(ptmx, os.Stdout, state) |
| 270 | }() |
| 271 | if relayActive { |
| 272 | go relayInputLoop(ctx, relay, cfg, state, ptmx, onlineAt) |
| 273 | go handleReconnectSignal(ctx, &relay, cfg, state, ptmx, startedAt) |
| 274 | } |
| 275 | |
| @@ -283,28 +301,33 @@ | |
| 283 | return err |
| 284 | } |
| 285 | |
| 286 | // --- Session mirroring --- |
| 287 | |
| 288 | func mirrorSessionLoop(ctx context.Context, relay sessionrelay.Connector, cfg config, startedAt time.Time) { |
| 289 | for { |
| 290 | if ctx.Err() != nil { |
| 291 | return |
| 292 | } |
| 293 | sessionPath, err := discoverSessionPath(ctx, cfg, startedAt) |
| 294 | if err != nil { |
| 295 | if ctx.Err() != nil { |
| 296 | return |
| 297 | } |
| 298 | // Session not found yet — wait and retry instead of giving up. |
| 299 | time.Sleep(10 * time.Second) |
| 300 | continue |
| 301 | } |
| 302 | if err := tailSessionFile(ctx, sessionPath, cfg.MirrorReasoning, func(ml mirrorLine) { |
| 303 | for _, line := range splitMirrorText(ml.Text) { |
| 304 | if line == "" { |
| 305 | continue |
| 306 | } |
| 307 | if len(ml.Meta) > 0 { |
| 308 | _ = relay.PostWithMeta(ctx, line, ml.Meta) |
| 309 | } else { |
| 310 | _ = relay.Post(ctx, line) |
| @@ -748,11 +771,11 @@ | |
| 748 | fmt.Fprintf(os.Stderr, "claude-relay: reconnected, restarting mirror and input loops\n") |
| 749 | |
| 750 | // Restart mirror and input loops with the new connector. |
| 751 | // Use epoch time for mirror so it finds the existing session file |
| 752 | // regardless of when it was last modified. |
| 753 | go mirrorSessionLoop(ctx, conn, cfg, time.Time{}) |
| 754 | go relayInputLoop(ctx, conn, cfg, state, ptmx, now) |
| 755 | break |
| 756 | } |
| 757 | } |
| 758 | } |
| 759 |
| --- cmd/claude-relay/main.go | |
| +++ cmd/claude-relay/main.go | |
| @@ -18,10 +18,11 @@ | |
| 18 | "sync" |
| 19 | "syscall" |
| 20 | "time" |
| 21 | |
| 22 | "github.com/conflicthq/scuttlebot/pkg/ircagent" |
| 23 | "github.com/conflicthq/scuttlebot/pkg/relaymirror" |
| 24 | "github.com/conflicthq/scuttlebot/pkg/sessionrelay" |
| 25 | "github.com/creack/pty" |
| 26 | "github.com/google/uuid" |
| 27 | "golang.org/x/term" |
| 28 | "gopkg.in/yaml.v3" |
| @@ -211,12 +212,17 @@ | |
| 212 | "SCUTTLEBOT_HOOKS_ENABLED="+boolString(cfg.HooksEnabled), |
| 213 | "SCUTTLEBOT_SESSION_ID="+cfg.SessionID, |
| 214 | "SCUTTLEBOT_NICK="+cfg.Nick, |
| 215 | "SCUTTLEBOT_ACTIVITY_VIA_BROKER="+boolString(relayActive), |
| 216 | ) |
| 217 | // Create PTY mirror early so session file loop can dedup against it. |
| 218 | var ptyMirror *relaymirror.PTYMirror |
| 219 | if relayActive { |
| 220 | ptyMirror = relaymirror.NewPTYMirror(defaultMirrorLineMax, 500*time.Millisecond, func(line string) { |
| 221 | _ = relay.Post(ctx, line) |
| 222 | }) |
| 223 | go mirrorSessionLoop(ctx, relay, cfg, startedAt, ptyMirror) |
| 224 | go presenceLoopPtr(ctx, &relay, cfg.HeartbeatInterval) |
| 225 | } |
| 226 | |
| 227 | if !isInteractiveTTY() { |
| 228 | cmd.Stdin = os.Stdin |
| @@ -263,13 +269,25 @@ | |
| 269 | defer func() { _ = term.Restore(int(os.Stdin.Fd()), oldState) }() |
| 270 | |
| 271 | go func() { |
| 272 | _, _ = io.Copy(ptmx, os.Stdin) |
| 273 | }() |
| 274 | // Wire PTY mirror for dual-path: real-time text to IRC + session file for metadata. |
| 275 | if ptyMirror != nil { |
| 276 | ptyMirror.BusyCallback = func(now time.Time) { |
| 277 | state.mu.Lock() |
| 278 | state.lastBusy = now |
| 279 | state.mu.Unlock() |
| 280 | } |
| 281 | go func() { |
| 282 | _ = ptyMirror.Copy(ptmx, os.Stdout) |
| 283 | }() |
| 284 | } else { |
| 285 | go func() { |
| 286 | copyPTYOutput(ptmx, os.Stdout, state) |
| 287 | }() |
| 288 | } |
| 289 | if relayActive { |
| 290 | go relayInputLoop(ctx, relay, cfg, state, ptmx, onlineAt) |
| 291 | go handleReconnectSignal(ctx, &relay, cfg, state, ptmx, startedAt) |
| 292 | } |
| 293 | |
| @@ -283,28 +301,33 @@ | |
| 301 | return err |
| 302 | } |
| 303 | |
| 304 | // --- Session mirroring --- |
| 305 | |
| 306 | func mirrorSessionLoop(ctx context.Context, relay sessionrelay.Connector, cfg config, startedAt time.Time, ptyDedup *relaymirror.PTYMirror) { |
| 307 | for { |
| 308 | if ctx.Err() != nil { |
| 309 | return |
| 310 | } |
| 311 | sessionPath, err := discoverSessionPath(ctx, cfg, startedAt) |
| 312 | if err != nil { |
| 313 | if ctx.Err() != nil { |
| 314 | return |
| 315 | } |
| 316 | fmt.Fprintf(os.Stderr, "claude-relay: session discovery failed: %v (retrying in 10s)\n", err) |
| 317 | time.Sleep(10 * time.Second) |
| 318 | continue |
| 319 | } |
| 320 | fmt.Fprintf(os.Stderr, "claude-relay: session file discovered: %s\n", sessionPath) |
| 321 | if err := tailSessionFile(ctx, sessionPath, cfg.MirrorReasoning, func(ml mirrorLine) { |
| 322 | for _, line := range splitMirrorText(ml.Text) { |
| 323 | if line == "" { |
| 324 | continue |
| 325 | } |
| 326 | // Mark as seen so PTY mirror deduplicates. |
| 327 | if ptyDedup != nil { |
| 328 | ptyDedup.MarkSeen(line) |
| 329 | } |
| 330 | if len(ml.Meta) > 0 { |
| 331 | _ = relay.PostWithMeta(ctx, line, ml.Meta) |
| 332 | } else { |
| 333 | _ = relay.Post(ctx, line) |
| @@ -748,11 +771,11 @@ | |
| 771 | fmt.Fprintf(os.Stderr, "claude-relay: reconnected, restarting mirror and input loops\n") |
| 772 | |
| 773 | // Restart mirror and input loops with the new connector. |
| 774 | // Use epoch time for mirror so it finds the existing session file |
| 775 | // regardless of when it was last modified. |
| 776 | go mirrorSessionLoop(ctx, conn, cfg, time.Time{}, nil) |
| 777 | go relayInputLoop(ctx, conn, cfg, state, ptmx, now) |
| 778 | break |
| 779 | } |
| 780 | } |
| 781 | } |
| 782 |
+25
-6
| --- cmd/codex-relay/main.go | ||
| +++ cmd/codex-relay/main.go | ||
| @@ -18,10 +18,11 @@ | ||
| 18 | 18 | "sync" |
| 19 | 19 | "syscall" |
| 20 | 20 | "time" |
| 21 | 21 | |
| 22 | 22 | "github.com/conflicthq/scuttlebot/pkg/ircagent" |
| 23 | + "github.com/conflicthq/scuttlebot/pkg/relaymirror" | |
| 23 | 24 | "github.com/conflicthq/scuttlebot/pkg/sessionrelay" |
| 24 | 25 | "github.com/creack/pty" |
| 25 | 26 | "golang.org/x/term" |
| 26 | 27 | "gopkg.in/yaml.v3" |
| 27 | 28 | ) |
| @@ -227,12 +228,16 @@ | ||
| 227 | 228 | "SCUTTLEBOT_HOOKS_ENABLED="+boolString(cfg.HooksEnabled), |
| 228 | 229 | "SCUTTLEBOT_SESSION_ID="+cfg.SessionID, |
| 229 | 230 | "SCUTTLEBOT_NICK="+cfg.Nick, |
| 230 | 231 | "SCUTTLEBOT_ACTIVITY_VIA_BROKER="+boolString(relayActive), |
| 231 | 232 | ) |
| 233 | + var ptyMirror *relaymirror.PTYMirror | |
| 232 | 234 | if relayActive { |
| 233 | - go mirrorSessionLoop(ctx, relay, cfg, startedAt, preExisting) | |
| 235 | + ptyMirror = relaymirror.NewPTYMirror(defaultMirrorLineMax, 500*time.Millisecond, func(line string) { | |
| 236 | + _ = relay.Post(ctx, line) | |
| 237 | + }) | |
| 238 | + go mirrorSessionLoop(ctx, relay, cfg, startedAt, preExisting, ptyMirror) | |
| 234 | 239 | go presenceLoopPtr(ctx, &relay, cfg.HeartbeatInterval) |
| 235 | 240 | } |
| 236 | 241 | |
| 237 | 242 | if !isInteractiveTTY() { |
| 238 | 243 | cmd.Stdin = os.Stdin |
| @@ -279,13 +284,24 @@ | ||
| 279 | 284 | defer func() { _ = term.Restore(int(os.Stdin.Fd()), oldState) }() |
| 280 | 285 | |
| 281 | 286 | go func() { |
| 282 | 287 | _, _ = io.Copy(ptmx, os.Stdin) |
| 283 | 288 | }() |
| 284 | - go func() { | |
| 285 | - copyPTYOutput(ptmx, os.Stdout, state) | |
| 286 | - }() | |
| 289 | + if ptyMirror != nil { | |
| 290 | + ptyMirror.BusyCallback = func(now time.Time) { | |
| 291 | + state.mu.Lock() | |
| 292 | + state.lastBusy = now | |
| 293 | + state.mu.Unlock() | |
| 294 | + } | |
| 295 | + go func() { | |
| 296 | + _ = ptyMirror.Copy(ptmx, os.Stdout) | |
| 297 | + }() | |
| 298 | + } else { | |
| 299 | + go func() { | |
| 300 | + copyPTYOutput(ptmx, os.Stdout, state) | |
| 301 | + }() | |
| 302 | + } | |
| 287 | 303 | if relayActive { |
| 288 | 304 | go relayInputLoop(ctx, relay, cfg, state, ptmx, onlineAt) |
| 289 | 305 | go handleReconnectSignal(ctx, &relay, cfg, state, ptmx, startedAt) |
| 290 | 306 | } |
| 291 | 307 | |
| @@ -408,11 +424,11 @@ | ||
| 408 | 424 | fmt.Fprintf(os.Stderr, "codex-relay: reconnected, restarting mirror and input loops\n") |
| 409 | 425 | |
| 410 | 426 | // Restart mirror and input loops with the new connector. |
| 411 | 427 | // Use epoch time for mirror so it finds the existing session file |
| 412 | 428 | // regardless of when it was last modified. |
| 413 | - go mirrorSessionLoop(ctx, conn, cfg, time.Time{}, nil) | |
| 429 | + go mirrorSessionLoop(ctx, conn, cfg, time.Time{}, nil, nil) | |
| 414 | 430 | go relayInputLoop(ctx, conn, cfg, state, ptmx, now) |
| 415 | 431 | break |
| 416 | 432 | } |
| 417 | 433 | } |
| 418 | 434 | } |
| @@ -809,11 +825,11 @@ | ||
| 809 | 825 | func defaultSessionID(target string) string { |
| 810 | 826 | sum := crc32.ChecksumIEEE([]byte(fmt.Sprintf("%s|%d|%d|%d", target, os.Getpid(), os.Getppid(), time.Now().UnixNano()))) |
| 811 | 827 | return fmt.Sprintf("%08x", sum) |
| 812 | 828 | } |
| 813 | 829 | |
| 814 | -func mirrorSessionLoop(ctx context.Context, relay sessionrelay.Connector, cfg config, startedAt time.Time, preExisting map[string]struct{}) { | |
| 830 | +func mirrorSessionLoop(ctx context.Context, relay sessionrelay.Connector, cfg config, startedAt time.Time, preExisting map[string]struct{}, ptyDedup *relaymirror.PTYMirror) { | |
| 815 | 831 | for { |
| 816 | 832 | if ctx.Err() != nil { |
| 817 | 833 | return |
| 818 | 834 | } |
| 819 | 835 | sessionPath, err := discoverSessionPath(ctx, cfg, startedAt, preExisting) |
| @@ -827,10 +843,13 @@ | ||
| 827 | 843 | if err := tailSessionFile(ctx, sessionPath, cfg.MirrorReasoning, func(ml mirrorLine) { |
| 828 | 844 | for _, line := range splitMirrorText(ml.Text) { |
| 829 | 845 | if line == "" { |
| 830 | 846 | continue |
| 831 | 847 | } |
| 848 | + if ptyDedup != nil { | |
| 849 | + ptyDedup.MarkSeen(line) | |
| 850 | + } | |
| 832 | 851 | if len(ml.Meta) > 0 { |
| 833 | 852 | _ = relay.PostWithMeta(ctx, line, ml.Meta) |
| 834 | 853 | } else { |
| 835 | 854 | _ = relay.Post(ctx, line) |
| 836 | 855 | } |
| 837 | 856 |
| --- cmd/codex-relay/main.go | |
| +++ cmd/codex-relay/main.go | |
| @@ -18,10 +18,11 @@ | |
| 18 | "sync" |
| 19 | "syscall" |
| 20 | "time" |
| 21 | |
| 22 | "github.com/conflicthq/scuttlebot/pkg/ircagent" |
| 23 | "github.com/conflicthq/scuttlebot/pkg/sessionrelay" |
| 24 | "github.com/creack/pty" |
| 25 | "golang.org/x/term" |
| 26 | "gopkg.in/yaml.v3" |
| 27 | ) |
| @@ -227,12 +228,16 @@ | |
| 227 | "SCUTTLEBOT_HOOKS_ENABLED="+boolString(cfg.HooksEnabled), |
| 228 | "SCUTTLEBOT_SESSION_ID="+cfg.SessionID, |
| 229 | "SCUTTLEBOT_NICK="+cfg.Nick, |
| 230 | "SCUTTLEBOT_ACTIVITY_VIA_BROKER="+boolString(relayActive), |
| 231 | ) |
| 232 | if relayActive { |
| 233 | go mirrorSessionLoop(ctx, relay, cfg, startedAt, preExisting) |
| 234 | go presenceLoopPtr(ctx, &relay, cfg.HeartbeatInterval) |
| 235 | } |
| 236 | |
| 237 | if !isInteractiveTTY() { |
| 238 | cmd.Stdin = os.Stdin |
| @@ -279,13 +284,24 @@ | |
| 279 | defer func() { _ = term.Restore(int(os.Stdin.Fd()), oldState) }() |
| 280 | |
| 281 | go func() { |
| 282 | _, _ = io.Copy(ptmx, os.Stdin) |
| 283 | }() |
| 284 | go func() { |
| 285 | copyPTYOutput(ptmx, os.Stdout, state) |
| 286 | }() |
| 287 | if relayActive { |
| 288 | go relayInputLoop(ctx, relay, cfg, state, ptmx, onlineAt) |
| 289 | go handleReconnectSignal(ctx, &relay, cfg, state, ptmx, startedAt) |
| 290 | } |
| 291 | |
| @@ -408,11 +424,11 @@ | |
| 408 | fmt.Fprintf(os.Stderr, "codex-relay: reconnected, restarting mirror and input loops\n") |
| 409 | |
| 410 | // Restart mirror and input loops with the new connector. |
| 411 | // Use epoch time for mirror so it finds the existing session file |
| 412 | // regardless of when it was last modified. |
| 413 | go mirrorSessionLoop(ctx, conn, cfg, time.Time{}, nil) |
| 414 | go relayInputLoop(ctx, conn, cfg, state, ptmx, now) |
| 415 | break |
| 416 | } |
| 417 | } |
| 418 | } |
| @@ -809,11 +825,11 @@ | |
| 809 | func defaultSessionID(target string) string { |
| 810 | sum := crc32.ChecksumIEEE([]byte(fmt.Sprintf("%s|%d|%d|%d", target, os.Getpid(), os.Getppid(), time.Now().UnixNano()))) |
| 811 | return fmt.Sprintf("%08x", sum) |
| 812 | } |
| 813 | |
| 814 | func mirrorSessionLoop(ctx context.Context, relay sessionrelay.Connector, cfg config, startedAt time.Time, preExisting map[string]struct{}) { |
| 815 | for { |
| 816 | if ctx.Err() != nil { |
| 817 | return |
| 818 | } |
| 819 | sessionPath, err := discoverSessionPath(ctx, cfg, startedAt, preExisting) |
| @@ -827,10 +843,13 @@ | |
| 827 | if err := tailSessionFile(ctx, sessionPath, cfg.MirrorReasoning, func(ml mirrorLine) { |
| 828 | for _, line := range splitMirrorText(ml.Text) { |
| 829 | if line == "" { |
| 830 | continue |
| 831 | } |
| 832 | if len(ml.Meta) > 0 { |
| 833 | _ = relay.PostWithMeta(ctx, line, ml.Meta) |
| 834 | } else { |
| 835 | _ = relay.Post(ctx, line) |
| 836 | } |
| 837 |
| --- cmd/codex-relay/main.go | |
| +++ cmd/codex-relay/main.go | |
| @@ -18,10 +18,11 @@ | |
| 18 | "sync" |
| 19 | "syscall" |
| 20 | "time" |
| 21 | |
| 22 | "github.com/conflicthq/scuttlebot/pkg/ircagent" |
| 23 | "github.com/conflicthq/scuttlebot/pkg/relaymirror" |
| 24 | "github.com/conflicthq/scuttlebot/pkg/sessionrelay" |
| 25 | "github.com/creack/pty" |
| 26 | "golang.org/x/term" |
| 27 | "gopkg.in/yaml.v3" |
| 28 | ) |
| @@ -227,12 +228,16 @@ | |
| 228 | "SCUTTLEBOT_HOOKS_ENABLED="+boolString(cfg.HooksEnabled), |
| 229 | "SCUTTLEBOT_SESSION_ID="+cfg.SessionID, |
| 230 | "SCUTTLEBOT_NICK="+cfg.Nick, |
| 231 | "SCUTTLEBOT_ACTIVITY_VIA_BROKER="+boolString(relayActive), |
| 232 | ) |
| 233 | var ptyMirror *relaymirror.PTYMirror |
| 234 | if relayActive { |
| 235 | ptyMirror = relaymirror.NewPTYMirror(defaultMirrorLineMax, 500*time.Millisecond, func(line string) { |
| 236 | _ = relay.Post(ctx, line) |
| 237 | }) |
| 238 | go mirrorSessionLoop(ctx, relay, cfg, startedAt, preExisting, ptyMirror) |
| 239 | go presenceLoopPtr(ctx, &relay, cfg.HeartbeatInterval) |
| 240 | } |
| 241 | |
| 242 | if !isInteractiveTTY() { |
| 243 | cmd.Stdin = os.Stdin |
| @@ -279,13 +284,24 @@ | |
| 284 | defer func() { _ = term.Restore(int(os.Stdin.Fd()), oldState) }() |
| 285 | |
| 286 | go func() { |
| 287 | _, _ = io.Copy(ptmx, os.Stdin) |
| 288 | }() |
| 289 | if ptyMirror != nil { |
| 290 | ptyMirror.BusyCallback = func(now time.Time) { |
| 291 | state.mu.Lock() |
| 292 | state.lastBusy = now |
| 293 | state.mu.Unlock() |
| 294 | } |
| 295 | go func() { |
| 296 | _ = ptyMirror.Copy(ptmx, os.Stdout) |
| 297 | }() |
| 298 | } else { |
| 299 | go func() { |
| 300 | copyPTYOutput(ptmx, os.Stdout, state) |
| 301 | }() |
| 302 | } |
| 303 | if relayActive { |
| 304 | go relayInputLoop(ctx, relay, cfg, state, ptmx, onlineAt) |
| 305 | go handleReconnectSignal(ctx, &relay, cfg, state, ptmx, startedAt) |
| 306 | } |
| 307 | |
| @@ -408,11 +424,11 @@ | |
| 424 | fmt.Fprintf(os.Stderr, "codex-relay: reconnected, restarting mirror and input loops\n") |
| 425 | |
| 426 | // Restart mirror and input loops with the new connector. |
| 427 | // Use epoch time for mirror so it finds the existing session file |
| 428 | // regardless of when it was last modified. |
| 429 | go mirrorSessionLoop(ctx, conn, cfg, time.Time{}, nil, nil) |
| 430 | go relayInputLoop(ctx, conn, cfg, state, ptmx, now) |
| 431 | break |
| 432 | } |
| 433 | } |
| 434 | } |
| @@ -809,11 +825,11 @@ | |
| 825 | func defaultSessionID(target string) string { |
| 826 | sum := crc32.ChecksumIEEE([]byte(fmt.Sprintf("%s|%d|%d|%d", target, os.Getpid(), os.Getppid(), time.Now().UnixNano()))) |
| 827 | return fmt.Sprintf("%08x", sum) |
| 828 | } |
| 829 | |
| 830 | func mirrorSessionLoop(ctx context.Context, relay sessionrelay.Connector, cfg config, startedAt time.Time, preExisting map[string]struct{}, ptyDedup *relaymirror.PTYMirror) { |
| 831 | for { |
| 832 | if ctx.Err() != nil { |
| 833 | return |
| 834 | } |
| 835 | sessionPath, err := discoverSessionPath(ctx, cfg, startedAt, preExisting) |
| @@ -827,10 +843,13 @@ | |
| 843 | if err := tailSessionFile(ctx, sessionPath, cfg.MirrorReasoning, func(ml mirrorLine) { |
| 844 | for _, line := range splitMirrorText(ml.Text) { |
| 845 | if line == "" { |
| 846 | continue |
| 847 | } |
| 848 | if ptyDedup != nil { |
| 849 | ptyDedup.MarkSeen(line) |
| 850 | } |
| 851 | if len(ml.Meta) > 0 { |
| 852 | _ = relay.PostWithMeta(ctx, line, ml.Meta) |
| 853 | } else { |
| 854 | _ = relay.Post(ctx, line) |
| 855 | } |
| 856 |
+103
-13
| --- cmd/gemini-relay/main.go | ||
| +++ cmd/gemini-relay/main.go | ||
| @@ -14,31 +14,35 @@ | ||
| 14 | 14 | "sort" |
| 15 | 15 | "strings" |
| 16 | 16 | "sync" |
| 17 | 17 | "syscall" |
| 18 | 18 | "time" |
| 19 | + | |
| 20 | + "encoding/json" | |
| 19 | 21 | |
| 20 | 22 | "github.com/conflicthq/scuttlebot/pkg/ircagent" |
| 23 | + "github.com/conflicthq/scuttlebot/pkg/relaymirror" | |
| 21 | 24 | "github.com/conflicthq/scuttlebot/pkg/sessionrelay" |
| 22 | 25 | "github.com/creack/pty" |
| 23 | 26 | "golang.org/x/term" |
| 24 | 27 | "gopkg.in/yaml.v3" |
| 25 | 28 | ) |
| 26 | 29 | |
| 27 | 30 | const ( |
| 28 | - defaultRelayURL = "http://localhost:8080" | |
| 29 | - defaultIRCAddr = "127.0.0.1:6667" | |
| 30 | - defaultChannel = "general" | |
| 31 | - defaultTransport = sessionrelay.TransportHTTP | |
| 32 | - defaultPollInterval = 2 * time.Second | |
| 33 | - defaultConnectWait = 10 * time.Second | |
| 34 | - defaultInjectDelay = 150 * time.Millisecond | |
| 35 | - defaultBusyWindow = 1500 * time.Millisecond | |
| 36 | - defaultHeartbeat = 60 * time.Second | |
| 37 | - defaultConfigFile = ".config/scuttlebot-relay.env" | |
| 38 | - bracketedPasteStart = "\x1b[200~" | |
| 39 | - bracketedPasteEnd = "\x1b[201~" | |
| 31 | + defaultRelayURL = "http://localhost:8080" | |
| 32 | + defaultIRCAddr = "127.0.0.1:6667" | |
| 33 | + defaultChannel = "general" | |
| 34 | + defaultTransport = sessionrelay.TransportHTTP | |
| 35 | + defaultPollInterval = 2 * time.Second | |
| 36 | + defaultConnectWait = 10 * time.Second | |
| 37 | + defaultInjectDelay = 150 * time.Millisecond | |
| 38 | + defaultBusyWindow = 1500 * time.Millisecond | |
| 39 | + defaultMirrorLineMax = 360 | |
| 40 | + defaultHeartbeat = 60 * time.Second | |
| 41 | + defaultConfigFile = ".config/scuttlebot-relay.env" | |
| 42 | + bracketedPasteStart = "\x1b[200~" | |
| 43 | + bracketedPasteEnd = "\x1b[201~" | |
| 40 | 44 | ) |
| 41 | 45 | |
| 42 | 46 | var serviceBots = map[string]struct{}{ |
| 43 | 47 | "bridge": {}, |
| 44 | 48 | "oracle": {}, |
| @@ -215,14 +219,27 @@ | ||
| 215 | 219 | defer func() { _ = term.Restore(int(os.Stdin.Fd()), oldState) }() |
| 216 | 220 | |
| 217 | 221 | go func() { |
| 218 | 222 | _, _ = io.Copy(ptmx, os.Stdin) |
| 219 | 223 | }() |
| 224 | + // Dual-path mirroring: PTY for real-time text + session file for metadata. | |
| 225 | + ptyMirror := relaymirror.NewPTYMirror(defaultMirrorLineMax, 500*time.Millisecond, func(line string) { | |
| 226 | + if relayActive { | |
| 227 | + _ = relay.Post(ctx, line) | |
| 228 | + } | |
| 229 | + }) | |
| 230 | + ptyMirror.BusyCallback = func(now time.Time) { | |
| 231 | + state.mu.Lock() | |
| 232 | + state.lastBusy = now | |
| 233 | + state.mu.Unlock() | |
| 234 | + } | |
| 220 | 235 | go func() { |
| 221 | - copyPTYOutput(ptmx, os.Stdout, state) | |
| 236 | + _ = ptyMirror.Copy(ptmx, os.Stdout) | |
| 222 | 237 | }() |
| 223 | 238 | if relayActive { |
| 239 | + // Start Gemini session file tailing for structured metadata. | |
| 240 | + go geminiSessionMirrorLoop(ctx, relay, cfg, ptyMirror) | |
| 224 | 241 | go relayInputLoop(ctx, relay, cfg, state, ptmx, onlineAt) |
| 225 | 242 | go handleReconnectSignal(ctx, &relay, cfg, state, ptmx, startedAt) |
| 226 | 243 | } |
| 227 | 244 | |
| 228 | 245 | err = cmd.Wait() |
| @@ -502,10 +519,83 @@ | ||
| 502 | 519 | s.mu.RLock() |
| 503 | 520 | lastBusy := s.lastBusy |
| 504 | 521 | s.mu.RUnlock() |
| 505 | 522 | return !lastBusy.IsZero() && now.Sub(lastBusy) <= defaultBusyWindow |
| 506 | 523 | } |
| 524 | + | |
| 525 | +// geminiSessionMirrorLoop discovers and polls a Gemini CLI session file | |
| 526 | +// for structured tool call metadata, emitting it via PostWithMeta. | |
| 527 | +func geminiSessionMirrorLoop(ctx context.Context, relay sessionrelay.Connector, cfg config, ptyDedup *relaymirror.PTYMirror) { | |
| 528 | + // Discover the Gemini session file directory. | |
| 529 | + home, err := os.UserHomeDir() | |
| 530 | + if err != nil { | |
| 531 | + fmt.Fprintf(os.Stderr, "gemini-relay: session mirror: %v\n", err) | |
| 532 | + return | |
| 533 | + } | |
| 534 | + chatsDir := filepath.Join(home, ".gemini", "tmp", slugify(cfg.TargetCWD), "chats") | |
| 535 | + if err := os.MkdirAll(chatsDir, 0755); err != nil { | |
| 536 | + // Directory doesn't exist yet — Gemini CLI creates it on first run. | |
| 537 | + } | |
| 538 | + existing := relaymirror.SnapshotDir(chatsDir) | |
| 539 | + | |
| 540 | + // Wait for a new session file. | |
| 541 | + watcher := relaymirror.NewSessionWatcher(chatsDir, "session-", 60*time.Second) | |
| 542 | + sessionPath, err := watcher.Discover(ctx, existing) | |
| 543 | + if err != nil { | |
| 544 | + if ctx.Err() == nil { | |
| 545 | + fmt.Fprintf(os.Stderr, "gemini-relay: session discovery: %v\n", err) | |
| 546 | + } | |
| 547 | + return | |
| 548 | + } | |
| 549 | + fmt.Fprintf(os.Stderr, "gemini-relay: session file discovered: %s\n", sessionPath) | |
| 550 | + | |
| 551 | + // Poll the session file for new messages. | |
| 552 | + msgIdx := 0 | |
| 553 | + tick := time.NewTicker(2 * time.Second) | |
| 554 | + defer tick.Stop() | |
| 555 | + for { | |
| 556 | + select { | |
| 557 | + case <-ctx.Done(): | |
| 558 | + return | |
| 559 | + case <-tick.C: | |
| 560 | + msgs, newIdx, err := relaymirror.PollGeminiSession(sessionPath, msgIdx) | |
| 561 | + if err != nil { | |
| 562 | + continue | |
| 563 | + } | |
| 564 | + msgIdx = newIdx | |
| 565 | + for _, msg := range msgs { | |
| 566 | + if msg.Type != "gemini" { | |
| 567 | + continue | |
| 568 | + } | |
| 569 | + for _, tc := range msg.ToolCalls { | |
| 570 | + meta, _ := json.Marshal(map[string]any{ | |
| 571 | + "type": "tool_result", | |
| 572 | + "data": map[string]any{ | |
| 573 | + "tool": tc.Name, | |
| 574 | + "status": tc.Status, | |
| 575 | + "args": tc.Args, | |
| 576 | + }, | |
| 577 | + }) | |
| 578 | + text := fmt.Sprintf("[%s] %s", tc.Name, tc.Status) | |
| 579 | + if ptyDedup != nil { | |
| 580 | + ptyDedup.MarkSeen(text) | |
| 581 | + } | |
| 582 | + _ = relay.PostWithMeta(ctx, text, meta) | |
| 583 | + } | |
| 584 | + } | |
| 585 | + } | |
| 586 | + } | |
| 587 | +} | |
| 588 | + | |
| 589 | +func slugify(s string) string { | |
| 590 | + s = strings.ReplaceAll(s, "/", "-") | |
| 591 | + s = strings.TrimPrefix(s, "-") | |
| 592 | + if s == "" { | |
| 593 | + return "default" | |
| 594 | + } | |
| 595 | + return s | |
| 596 | +} | |
| 507 | 597 | |
| 508 | 598 | func filterMessages(messages []message, since time.Time, nick, agentType string) ([]message, time.Time) { |
| 509 | 599 | filtered := make([]message, 0, len(messages)) |
| 510 | 600 | newest := since |
| 511 | 601 | for _, msg := range messages { |
| 512 | 602 | |
| 513 | 603 | ADDED pkg/relaymirror/pty.go |
| 514 | 604 | ADDED pkg/relaymirror/pty_test.go |
| 515 | 605 | ADDED pkg/relaymirror/session.go |
| --- cmd/gemini-relay/main.go | |
| +++ cmd/gemini-relay/main.go | |
| @@ -14,31 +14,35 @@ | |
| 14 | "sort" |
| 15 | "strings" |
| 16 | "sync" |
| 17 | "syscall" |
| 18 | "time" |
| 19 | |
| 20 | "github.com/conflicthq/scuttlebot/pkg/ircagent" |
| 21 | "github.com/conflicthq/scuttlebot/pkg/sessionrelay" |
| 22 | "github.com/creack/pty" |
| 23 | "golang.org/x/term" |
| 24 | "gopkg.in/yaml.v3" |
| 25 | ) |
| 26 | |
| 27 | const ( |
| 28 | defaultRelayURL = "http://localhost:8080" |
| 29 | defaultIRCAddr = "127.0.0.1:6667" |
| 30 | defaultChannel = "general" |
| 31 | defaultTransport = sessionrelay.TransportHTTP |
| 32 | defaultPollInterval = 2 * time.Second |
| 33 | defaultConnectWait = 10 * time.Second |
| 34 | defaultInjectDelay = 150 * time.Millisecond |
| 35 | defaultBusyWindow = 1500 * time.Millisecond |
| 36 | defaultHeartbeat = 60 * time.Second |
| 37 | defaultConfigFile = ".config/scuttlebot-relay.env" |
| 38 | bracketedPasteStart = "\x1b[200~" |
| 39 | bracketedPasteEnd = "\x1b[201~" |
| 40 | ) |
| 41 | |
| 42 | var serviceBots = map[string]struct{}{ |
| 43 | "bridge": {}, |
| 44 | "oracle": {}, |
| @@ -215,14 +219,27 @@ | |
| 215 | defer func() { _ = term.Restore(int(os.Stdin.Fd()), oldState) }() |
| 216 | |
| 217 | go func() { |
| 218 | _, _ = io.Copy(ptmx, os.Stdin) |
| 219 | }() |
| 220 | go func() { |
| 221 | copyPTYOutput(ptmx, os.Stdout, state) |
| 222 | }() |
| 223 | if relayActive { |
| 224 | go relayInputLoop(ctx, relay, cfg, state, ptmx, onlineAt) |
| 225 | go handleReconnectSignal(ctx, &relay, cfg, state, ptmx, startedAt) |
| 226 | } |
| 227 | |
| 228 | err = cmd.Wait() |
| @@ -502,10 +519,83 @@ | |
| 502 | s.mu.RLock() |
| 503 | lastBusy := s.lastBusy |
| 504 | s.mu.RUnlock() |
| 505 | return !lastBusy.IsZero() && now.Sub(lastBusy) <= defaultBusyWindow |
| 506 | } |
| 507 | |
| 508 | func filterMessages(messages []message, since time.Time, nick, agentType string) ([]message, time.Time) { |
| 509 | filtered := make([]message, 0, len(messages)) |
| 510 | newest := since |
| 511 | for _, msg := range messages { |
| 512 | |
| 513 | DDED pkg/relaymirror/pty.go |
| 514 | DDED pkg/relaymirror/pty_test.go |
| 515 | DDED pkg/relaymirror/session.go |
| --- cmd/gemini-relay/main.go | |
| +++ cmd/gemini-relay/main.go | |
| @@ -14,31 +14,35 @@ | |
| 14 | "sort" |
| 15 | "strings" |
| 16 | "sync" |
| 17 | "syscall" |
| 18 | "time" |
| 19 | |
| 20 | "encoding/json" |
| 21 | |
| 22 | "github.com/conflicthq/scuttlebot/pkg/ircagent" |
| 23 | "github.com/conflicthq/scuttlebot/pkg/relaymirror" |
| 24 | "github.com/conflicthq/scuttlebot/pkg/sessionrelay" |
| 25 | "github.com/creack/pty" |
| 26 | "golang.org/x/term" |
| 27 | "gopkg.in/yaml.v3" |
| 28 | ) |
| 29 | |
| 30 | const ( |
| 31 | defaultRelayURL = "http://localhost:8080" |
| 32 | defaultIRCAddr = "127.0.0.1:6667" |
| 33 | defaultChannel = "general" |
| 34 | defaultTransport = sessionrelay.TransportHTTP |
| 35 | defaultPollInterval = 2 * time.Second |
| 36 | defaultConnectWait = 10 * time.Second |
| 37 | defaultInjectDelay = 150 * time.Millisecond |
| 38 | defaultBusyWindow = 1500 * time.Millisecond |
| 39 | defaultMirrorLineMax = 360 |
| 40 | defaultHeartbeat = 60 * time.Second |
| 41 | defaultConfigFile = ".config/scuttlebot-relay.env" |
| 42 | bracketedPasteStart = "\x1b[200~" |
| 43 | bracketedPasteEnd = "\x1b[201~" |
| 44 | ) |
| 45 | |
| 46 | var serviceBots = map[string]struct{}{ |
| 47 | "bridge": {}, |
| 48 | "oracle": {}, |
| @@ -215,14 +219,27 @@ | |
| 219 | defer func() { _ = term.Restore(int(os.Stdin.Fd()), oldState) }() |
| 220 | |
| 221 | go func() { |
| 222 | _, _ = io.Copy(ptmx, os.Stdin) |
| 223 | }() |
| 224 | // Dual-path mirroring: PTY for real-time text + session file for metadata. |
| 225 | ptyMirror := relaymirror.NewPTYMirror(defaultMirrorLineMax, 500*time.Millisecond, func(line string) { |
| 226 | if relayActive { |
| 227 | _ = relay.Post(ctx, line) |
| 228 | } |
| 229 | }) |
| 230 | ptyMirror.BusyCallback = func(now time.Time) { |
| 231 | state.mu.Lock() |
| 232 | state.lastBusy = now |
| 233 | state.mu.Unlock() |
| 234 | } |
| 235 | go func() { |
| 236 | _ = ptyMirror.Copy(ptmx, os.Stdout) |
| 237 | }() |
| 238 | if relayActive { |
| 239 | // Start Gemini session file tailing for structured metadata. |
| 240 | go geminiSessionMirrorLoop(ctx, relay, cfg, ptyMirror) |
| 241 | go relayInputLoop(ctx, relay, cfg, state, ptmx, onlineAt) |
| 242 | go handleReconnectSignal(ctx, &relay, cfg, state, ptmx, startedAt) |
| 243 | } |
| 244 | |
| 245 | err = cmd.Wait() |
| @@ -502,10 +519,83 @@ | |
| 519 | s.mu.RLock() |
| 520 | lastBusy := s.lastBusy |
| 521 | s.mu.RUnlock() |
| 522 | return !lastBusy.IsZero() && now.Sub(lastBusy) <= defaultBusyWindow |
| 523 | } |
| 524 | |
| 525 | // geminiSessionMirrorLoop discovers and polls a Gemini CLI session file |
| 526 | // for structured tool call metadata, emitting it via PostWithMeta. |
| 527 | func geminiSessionMirrorLoop(ctx context.Context, relay sessionrelay.Connector, cfg config, ptyDedup *relaymirror.PTYMirror) { |
| 528 | // Discover the Gemini session file directory. |
| 529 | home, err := os.UserHomeDir() |
| 530 | if err != nil { |
| 531 | fmt.Fprintf(os.Stderr, "gemini-relay: session mirror: %v\n", err) |
| 532 | return |
| 533 | } |
| 534 | chatsDir := filepath.Join(home, ".gemini", "tmp", slugify(cfg.TargetCWD), "chats") |
| 535 | if err := os.MkdirAll(chatsDir, 0755); err != nil { |
| 536 | // Directory doesn't exist yet — Gemini CLI creates it on first run. |
| 537 | } |
| 538 | existing := relaymirror.SnapshotDir(chatsDir) |
| 539 | |
| 540 | // Wait for a new session file. |
| 541 | watcher := relaymirror.NewSessionWatcher(chatsDir, "session-", 60*time.Second) |
| 542 | sessionPath, err := watcher.Discover(ctx, existing) |
| 543 | if err != nil { |
| 544 | if ctx.Err() == nil { |
| 545 | fmt.Fprintf(os.Stderr, "gemini-relay: session discovery: %v\n", err) |
| 546 | } |
| 547 | return |
| 548 | } |
| 549 | fmt.Fprintf(os.Stderr, "gemini-relay: session file discovered: %s\n", sessionPath) |
| 550 | |
| 551 | // Poll the session file for new messages. |
| 552 | msgIdx := 0 |
| 553 | tick := time.NewTicker(2 * time.Second) |
| 554 | defer tick.Stop() |
| 555 | for { |
| 556 | select { |
| 557 | case <-ctx.Done(): |
| 558 | return |
| 559 | case <-tick.C: |
| 560 | msgs, newIdx, err := relaymirror.PollGeminiSession(sessionPath, msgIdx) |
| 561 | if err != nil { |
| 562 | continue |
| 563 | } |
| 564 | msgIdx = newIdx |
| 565 | for _, msg := range msgs { |
| 566 | if msg.Type != "gemini" { |
| 567 | continue |
| 568 | } |
| 569 | for _, tc := range msg.ToolCalls { |
| 570 | meta, _ := json.Marshal(map[string]any{ |
| 571 | "type": "tool_result", |
| 572 | "data": map[string]any{ |
| 573 | "tool": tc.Name, |
| 574 | "status": tc.Status, |
| 575 | "args": tc.Args, |
| 576 | }, |
| 577 | }) |
| 578 | text := fmt.Sprintf("[%s] %s", tc.Name, tc.Status) |
| 579 | if ptyDedup != nil { |
| 580 | ptyDedup.MarkSeen(text) |
| 581 | } |
| 582 | _ = relay.PostWithMeta(ctx, text, meta) |
| 583 | } |
| 584 | } |
| 585 | } |
| 586 | } |
| 587 | } |
| 588 | |
| 589 | func slugify(s string) string { |
| 590 | s = strings.ReplaceAll(s, "/", "-") |
| 591 | s = strings.TrimPrefix(s, "-") |
| 592 | if s == "" { |
| 593 | return "default" |
| 594 | } |
| 595 | return s |
| 596 | } |
| 597 | |
| 598 | func filterMessages(messages []message, since time.Time, nick, agentType string) ([]message, time.Time) { |
| 599 | filtered := make([]message, 0, len(messages)) |
| 600 | newest := since |
| 601 | for _, msg := range messages { |
| 602 | |
| 603 | DDED pkg/relaymirror/pty.go |
| 604 | DDED pkg/relaymirror/pty_test.go |
| 605 | DDED pkg/relaymirror/session.go |
+119
| --- a/pkg/relaymirror/pty.go | ||
| +++ b/pkg/relaymirror/pty.go | ||
| @@ -0,0 +1,119 @@ | ||
| 1 | +// Package relaymirror provides shared PTY output mirroring for relay binaries. | |
| 2 | +// | |
| 3 | +// PTYMirror reads from a PTY file descriptor and emits lines to a callback. | |
| 4 | +// It handles ANSI escape stripping and line buffering for clean IRC output. | |
| 5 | +package relaymirror | |
| 6 | + | |
| 7 | +import ( | |
| 8 | + "bytes" | |
| 9 | + "io" | |
| 10 | + "regexp" | |
| 11 | + "strings" | |
| 12 | + "sync" | |
| 13 | + "time" | |
| 14 | +) | |
| 15 | + | |
| 16 | +// ansiRE ma. | |
| 17 | +var ansiRE = regep.MustCompile(`\x1b\[[0-9;?]*[a-\x07|\x1b\(B|\[\?[0-9]+[hl]`) | |
| 18 | + | |
| 19 | +// noiseRE matches common terminal noise: spinner chars, progress bars, cursor movement. | |
| 20 | +var noiseRE = regexp.MustCompile(`^[⠋�s*$|^[.]+$|^\[?\?[0-9]+[hl]`) | |
| 21 | + | |
| 22 | +// PTYMirror reads PTY output and emits clean text lines to IRC. | |
| 23 | +// It includes rate limiting and noise filtering for clean IRC output. | |
| 24 | +type PTYMirror struct { | |
| 25 | + maxLineLen int | |
| 26 | + minInterval time.Duration // minimum time between emitted lines | |
| 27 | + mu sync.Mutex | |
| 28 | + lastEmit time.Time | |
| 29 | + recentLines map[string]time.Time // dedup: line hash → last seen | |
| 30 | + onLine func(line string) | |
| 31 | + // BusyCallback is called when PTY output suggests the agent is busy | |
| 32 | + // (e.g. "esc to interrupt", "working..."). Optional. | |
| 33 | + BusyCallback func(now time.Time) | |
| 34 | +} | |
| 35 | + | |
| 36 | +// NewPTYMirror creates a mirror that calls onLine for each output line. | |
| 37 | +// maxLineLen truncates long lines (0 = no limit). | |
| 38 | +// minInterval throttles output (0 = no throttle, recommended: 500ms for IRC). | |
| 39 | +func NewPTYMirror(maxLineLen int, minInterval time.Duration, onLine func(line string)) *PTYMirror { | |
| 40 | + return &PTYMirror{ | |
| 41 | + maxLineLen: maxLineLen, | |
| 42 | + minInterval: minInterval, | |
| 43 | + recentLines: make(map[string]time.Time), | |
| 44 | + onLine: onLine, | |
| 45 | + } | |
| 46 | +} | |
| 47 | + | |
| 48 | +// Copy reads from r (typically a PTY fd) and also writes to w (typically | |
| 49 | +// os.Stdout for the interactive terminal). Lines are emitted via onLine. | |
| 50 | +// Blocks until r returns EOF or error. | |
| 51 | +func (m *PTYMirror) Copy(r io.Reader, w io.Writer) error { | |
| 52 | + buf := make([]byte, 4096) | |
| 53 | + lineCh := make(chan []byte, | |
| 54 | + lineBuf.Write(chunk) | |
| 55 | +buf) | |
| 56 | + if n > 0 { | |
| 57 | + // Detect busy signals for interrupt logic. | |
| 58 | + if m.BusyCallback != nil { | |
| 59 | + lower := strings.ToLower(string(buf[:n])) | |
| 60 | + if strings.Contains(lower, "esc to interrupt") || strings.Contains(lower, "working...") { | |
| 61 | + m.BusyCallback(time.Now()) | |
| 62 | + } | |
| 63 | + } | |
| 64 | + // Pass through to terminal — ALWAYS immediate, neveine hash → last seen | |
| 65 | + onLine Buffer and emit lines.hared PTY output m// Packerr != nil { | |
| 66 | + / m.emitLine(lineBuf.String()r := r.Read(buf) | |
| 67 | + if n > 0 { | |
| 68 | + }ror provides shared PTY output mirroring for relay binaries. | |
| 69 | +// | |
| 70 | +// PTYMirror reads from a PTY file descriptor and emits lines to a callback. | |
| 71 | +// It handles ANSI escape stripping and line buffering for clean IRC output. | |
| 72 | +package relaymirror | |
| 73 | + | |
| 74 | +import ( | |
| 75 | + "bytes" | |
| 76 | + "io" | |
| 77 | + "regexp" | |
| 78 | + "strings" | |
| 79 | + "sync" | |
| 80 | + "time" | |
| 81 | +) | |
| 82 | + | |
| 83 | +// ansiRE ma. | |
| 84 | +var ansiRE = regep.MustCompile(`\x1b\[[0-9;?]*[a-\x07|\x1b\(B|\[\?[0-9]+[hl]`) | |
| 85 | + | |
| 86 | +// noiseRE matches common terminal noise: spinner chars, progress bars, cursor movement. | |
| 87 | +var noiseRE = regexp.MustCompile(`^[⠋�s*$|^[.]+$|^\[?\?[0-9]+[hl]`) | |
| 88 | + | |
| 89 | +// PTYMirror reads PTY output and emits clean text lines to IRC. | |
| 90 | +// It includes rate limiting and noise filtering for clean IRC output. | |
| 91 | +type PTYMirror struct { | |
| 92 | + maxLineLen int | |
| 93 | + minInterval time.Duration // minimum time between emitted lines | |
| 94 | + mu sync.Mutex | |
| 95 | + lastEmit time.Time | |
| 96 | + recentLines map[string]time.Time // dedup: line hash → last seen | |
| 97 | + onLine func(line string) | |
| 98 | + // BusyCallback is called when PTY output suggests the agent is busy | |
| 99 | + // (e.g. "esc to interrupt", "working..."). Optional. | |
| 100 | + BusyCallback func(now time.Time) | |
| 101 | +} | |
| 102 | + | |
| 103 | +// NewPTYMirror creates a mirror that calls onLine for each output line. | |
| 104 | +// maxLineLen truncates long lines (0 = no limit). | |
| 105 | +// minInterval throttles output (0 = no throttle, recommended: 500ms for IRC). | |
| 106 | +func NewPTYMirror(maxLineLen int, minInterval time.Duration, onLine func(line string)) *PTYMirror { | |
| 107 | + return &PTYMirror{ | |
| 108 | + maxLineLen: maxLineLen, | |
| 109 | + minInterval: minInterval, | |
| 110 | + recentLines: make(map[string]time.Time), | |
| 111 | + onLine: onLine, | |
| 112 | + } | |
| 113 | +} | |
| 114 | + | |
| 115 | +// Copy reads from r (typically a PTY fd) and also writes to w (typically | |
| 116 | +// os.Stdout for the interactive terminal). Lines are emitted via onLine. | |
| 117 | +// Blocks until r returns EOF or er// Package relaymirror provides shared PTY output mirroring for relay binaries. | |
| 118 | +// | |
| 119 | +// PTYMirror reads from a PTY file |
| --- a/pkg/relaymirror/pty.go | |
| +++ b/pkg/relaymirror/pty.go | |
| @@ -0,0 +1,119 @@ | |
| --- a/pkg/relaymirror/pty.go | |
| +++ b/pkg/relaymirror/pty.go | |
| @@ -0,0 +1,119 @@ | |
| 1 | // Package relaymirror provides shared PTY output mirroring for relay binaries. |
| 2 | // |
| 3 | // PTYMirror reads from a PTY file descriptor and emits lines to a callback. |
| 4 | // It handles ANSI escape stripping and line buffering for clean IRC output. |
| 5 | package relaymirror |
| 6 | |
| 7 | import ( |
| 8 | "bytes" |
| 9 | "io" |
| 10 | "regexp" |
| 11 | "strings" |
| 12 | "sync" |
| 13 | "time" |
| 14 | ) |
| 15 | |
| 16 | // ansiRE ma. |
| 17 | var ansiRE = regep.MustCompile(`\x1b\[[0-9;?]*[a-\x07|\x1b\(B|\[\?[0-9]+[hl]`) |
| 18 | |
| 19 | // noiseRE matches common terminal noise: spinner chars, progress bars, cursor movement. |
| 20 | var noiseRE = regexp.MustCompile(`^[⠋�s*$|^[.]+$|^\[?\?[0-9]+[hl]`) |
| 21 | |
| 22 | // PTYMirror reads PTY output and emits clean text lines to IRC. |
| 23 | // It includes rate limiting and noise filtering for clean IRC output. |
| 24 | type PTYMirror struct { |
| 25 | maxLineLen int |
| 26 | minInterval time.Duration // minimum time between emitted lines |
| 27 | mu sync.Mutex |
| 28 | lastEmit time.Time |
| 29 | recentLines map[string]time.Time // dedup: line hash → last seen |
| 30 | onLine func(line string) |
| 31 | // BusyCallback is called when PTY output suggests the agent is busy |
| 32 | // (e.g. "esc to interrupt", "working..."). Optional. |
| 33 | BusyCallback func(now time.Time) |
| 34 | } |
| 35 | |
| 36 | // NewPTYMirror creates a mirror that calls onLine for each output line. |
| 37 | // maxLineLen truncates long lines (0 = no limit). |
| 38 | // minInterval throttles output (0 = no throttle, recommended: 500ms for IRC). |
| 39 | func NewPTYMirror(maxLineLen int, minInterval time.Duration, onLine func(line string)) *PTYMirror { |
| 40 | return &PTYMirror{ |
| 41 | maxLineLen: maxLineLen, |
| 42 | minInterval: minInterval, |
| 43 | recentLines: make(map[string]time.Time), |
| 44 | onLine: onLine, |
| 45 | } |
| 46 | } |
| 47 | |
| 48 | // Copy reads from r (typically a PTY fd) and also writes to w (typically |
| 49 | // os.Stdout for the interactive terminal). Lines are emitted via onLine. |
| 50 | // Blocks until r returns EOF or error. |
| 51 | func (m *PTYMirror) Copy(r io.Reader, w io.Writer) error { |
| 52 | buf := make([]byte, 4096) |
| 53 | lineCh := make(chan []byte, |
| 54 | lineBuf.Write(chunk) |
| 55 | buf) |
| 56 | if n > 0 { |
| 57 | // Detect busy signals for interrupt logic. |
| 58 | if m.BusyCallback != nil { |
| 59 | lower := strings.ToLower(string(buf[:n])) |
| 60 | if strings.Contains(lower, "esc to interrupt") || strings.Contains(lower, "working...") { |
| 61 | m.BusyCallback(time.Now()) |
| 62 | } |
| 63 | } |
| 64 | // Pass through to terminal — ALWAYS immediate, neveine hash → last seen |
| 65 | onLine Buffer and emit lines.hared PTY output m// Packerr != nil { |
| 66 | / m.emitLine(lineBuf.String()r := r.Read(buf) |
| 67 | if n > 0 { |
| 68 | }ror provides shared PTY output mirroring for relay binaries. |
| 69 | // |
| 70 | // PTYMirror reads from a PTY file descriptor and emits lines to a callback. |
| 71 | // It handles ANSI escape stripping and line buffering for clean IRC output. |
| 72 | package relaymirror |
| 73 | |
| 74 | import ( |
| 75 | "bytes" |
| 76 | "io" |
| 77 | "regexp" |
| 78 | "strings" |
| 79 | "sync" |
| 80 | "time" |
| 81 | ) |
| 82 | |
| 83 | // ansiRE ma. |
| 84 | var ansiRE = regep.MustCompile(`\x1b\[[0-9;?]*[a-\x07|\x1b\(B|\[\?[0-9]+[hl]`) |
| 85 | |
| 86 | // noiseRE matches common terminal noise: spinner chars, progress bars, cursor movement. |
| 87 | var noiseRE = regexp.MustCompile(`^[⠋�s*$|^[.]+$|^\[?\?[0-9]+[hl]`) |
| 88 | |
| 89 | // PTYMirror reads PTY output and emits clean text lines to IRC. |
| 90 | // It includes rate limiting and noise filtering for clean IRC output. |
| 91 | type PTYMirror struct { |
| 92 | maxLineLen int |
| 93 | minInterval time.Duration // minimum time between emitted lines |
| 94 | mu sync.Mutex |
| 95 | lastEmit time.Time |
| 96 | recentLines map[string]time.Time // dedup: line hash → last seen |
| 97 | onLine func(line string) |
| 98 | // BusyCallback is called when PTY output suggests the agent is busy |
| 99 | // (e.g. "esc to interrupt", "working..."). Optional. |
| 100 | BusyCallback func(now time.Time) |
| 101 | } |
| 102 | |
| 103 | // NewPTYMirror creates a mirror that calls onLine for each output line. |
| 104 | // maxLineLen truncates long lines (0 = no limit). |
| 105 | // minInterval throttles output (0 = no throttle, recommended: 500ms for IRC). |
| 106 | func NewPTYMirror(maxLineLen int, minInterval time.Duration, onLine func(line string)) *PTYMirror { |
| 107 | return &PTYMirror{ |
| 108 | maxLineLen: maxLineLen, |
| 109 | minInterval: minInterval, |
| 110 | recentLines: make(map[string]time.Time), |
| 111 | onLine: onLine, |
| 112 | } |
| 113 | } |
| 114 | |
| 115 | // Copy reads from r (typically a PTY fd) and also writes to w (typically |
| 116 | // os.Stdout for the interactive terminal). Lines are emitted via onLine. |
| 117 | // Blocks until r returns EOF or er// Package relaymirror provides shared PTY output mirroring for relay binaries. |
| 118 | // |
| 119 | // PTYMirror reads from a PTY file |
+128
| --- a/pkg/relaymirror/pty_test.go | ||
| +++ b/pkg/relaymirror/pty_test.go | ||
| @@ -0,0 +1,128 @@ | ||
| 1 | +package relaymirror | |
| 2 | + | |
| 3 | +import ( | |
| 4 | + "bytes" | |
| 5 | + "strings" | |
| 6 | + "testing" | |
| 7 | + "time" | |
| 8 | +) | |
| 9 | + | |
| 10 | +func TestPTYMirrorBasic(t *testing.T) { | |
| 11 | + var lines []string | |
| 12 | + m := NewPTYMirror(0, 0, func(line string) { | |
| 13 | + lines = append(lines, line) | |
| 14 | + }) | |
| 15 | + | |
| 16 | + input := "hello world\ngoodbye\n" | |
| 17 | + err := m.Copy(strings.NewReader(input), nil) | |
| 18 | + if err != nil { | |
| 19 | + t.Fatal(err) | |
| 20 | + } | |
| 21 | + if len(lines) != 2 { | |
| 22 | + t.Fatalf("expected 2 lines, got %d: %v", len(lines), lines) | |
| 23 | + } | |
| 24 | + if lines[0] != "hello world" { | |
| 25 | + t.Errorf("line 0 = %q", lines[0]) | |
| 26 | + } | |
| 27 | + if lines[1] != "goodbye" { | |
| 28 | + t.Errorf("line 1 = %q", lines[1]) | |
| 29 | + } | |
| 30 | +} | |
| 31 | + | |
| 32 | +func TestPTYMirrorStripANSI(t *testing.T) { | |
| 33 | + var lines []string | |
| 34 | + m := NewPTYMirror(0, 0, func(line string) { | |
| 35 | + lines = append(lines, line) | |
| 36 | + }) | |
| 37 | + | |
| 38 | + input := "\x1b[32mgreen text\x1b[0m\n" | |
| 39 | + _ = m.Copy(strings.NewReader(input), nil) | |
| 40 | + | |
| 41 | + if len(lines) != 1 || lines[0] != "green text" { | |
| 42 | + t.Errorf("expected 'green text', got %v", lines) | |
| 43 | + } | |
| 44 | +} | |
| 45 | + | |
| 46 | +func TestPTYMirrorPassthrough(t *testing.T) { | |
| 47 | + var buf bytes.Buffer | |
| 48 | + m := NewPTYMirror(0, 0, func(string) {}) | |
| 49 | + | |
| 50 | + input := "hello\n" | |
| 51 | + _ = m.Copy(strings.NewReader(input), &buf) | |
| 52 | + | |
| 53 | + if buf.String() != input { | |
| 54 | + t.Errorf("passthrough = %q, want %q", buf.String(), input) | |
| 55 | + } | |
| 56 | +} | |
| 57 | + | |
| 58 | +func TestPTYMirrorMaxLineLen(t *testing.T) { | |
| 59 | + var lines []string | |
| 60 | + m := NewPTYMirror(20, 0, func(line string) { | |
| 61 | + lines = append(lines, line) | |
| 62 | + }) | |
| 63 | + | |
| 64 | + input := "this is a very long line that should be truncated\n" | |
| 65 | + _ = m.Copy(strings.NewReader(input), nil) | |
| 66 | + | |
| 67 | + if len(lines) != 1 || len(lines[0]) > 20 { | |
| 68 | + t.Errorf("expected truncated line <= 20 chars, got %q", lines[0]) | |
| 69 | + } | |
| 70 | +} | |
| 71 | + | |
| 72 | +func TestPTYMirrorNoise(t *testing.T) { | |
| 73 | + var lines []string | |
| 74 | + m := NewPTYMirror(0, 0, func(line string) { | |
| 75 | + lines = append(lines, line) | |
| 76 | + }) | |
| 77 | + | |
| 78 | + input := "real output\n⠋\n...\n50%\nmore output\n" | |
| 79 | + _ = m.Copy(strings.NewReader(input), nil) | |
| 80 | + | |
| 81 | + if len(lines) != 2 { | |
| 82 | + t.Fatalf("expected 2 lines (noise filtered), got %d: %v", len(lines), lines) | |
| 83 | + } | |
| 84 | +} | |
| 85 | + | |
| 86 | +func TestPTYMirrorDedup(t *testing.T) { | |
| 87 | + var lines []string | |
| 88 | + m := NewPTYMirror(0, 0, func(line string) { | |
| 89 | + lines = append(lines, line) | |
| 90 | + }) | |
| 91 | + | |
| 92 | + input := "same line\nsame line\ndifferent\n" | |
| 93 | + _ = m.Copy(strings.NewReader(input), nil) | |
| 94 | + | |
| 95 | + if len(lines) != 2 { | |
| 96 | + t.Fatalf("expected 2 lines (dedup), got %d: %v", len(lines), lines) | |
| 97 | + } | |
| 98 | +} | |
| 99 | + | |
| 100 | +func TestPTYMirrorMarkSeen(t *testing.T) { | |
| 101 | + var lines []string | |
| 102 | + m := NewPTYMirror(0, 0, func(line string) { | |
| 103 | + lines = append(lines, line) | |
| 104 | + }) | |
| 105 | + | |
| 106 | + // Mark a line as seen (from session file mirror). | |
| 107 | + m.MarkSeen("already seen") | |
| 108 | + | |
| 109 | + input := "already seen\nnew line\n" | |
| 110 | + _ = m.Copy(strings.NewReader(input), nil) | |
| 111 | + | |
| 112 | + if len(lines) != 1 || lines[0] != "new line" { | |
| 113 | + t.Errorf("expected only 'new line', got %v", lines) | |
| 114 | + } | |
| 115 | +} | |
| 116 | + | |
| 117 | +func TestPTYMirrorBusyCallback(t *testing.T) { | |
| 118 | + var busyAt time.Time | |
| 119 | + m := NewPTYMirror(0, 0, func(string) {}) | |
| 120 | + m.BusyCallback = func(now time.Time) { busyAt = now } | |
| 121 | + | |
| 122 | + input := "esc to interrupt\n" | |
| 123 | + _ = m.Copy(strings.NewReader(input), nil) | |
| 124 | + | |
| 125 | + if busyAt.IsZero() { | |
| 126 | + t.Error("busy callback was not called") | |
| 127 | + } | |
| 128 | +} |
| --- a/pkg/relaymirror/pty_test.go | |
| +++ b/pkg/relaymirror/pty_test.go | |
| @@ -0,0 +1,128 @@ | |
| --- a/pkg/relaymirror/pty_test.go | |
| +++ b/pkg/relaymirror/pty_test.go | |
| @@ -0,0 +1,128 @@ | |
| 1 | package relaymirror |
| 2 | |
| 3 | import ( |
| 4 | "bytes" |
| 5 | "strings" |
| 6 | "testing" |
| 7 | "time" |
| 8 | ) |
| 9 | |
| 10 | func TestPTYMirrorBasic(t *testing.T) { |
| 11 | var lines []string |
| 12 | m := NewPTYMirror(0, 0, func(line string) { |
| 13 | lines = append(lines, line) |
| 14 | }) |
| 15 | |
| 16 | input := "hello world\ngoodbye\n" |
| 17 | err := m.Copy(strings.NewReader(input), nil) |
| 18 | if err != nil { |
| 19 | t.Fatal(err) |
| 20 | } |
| 21 | if len(lines) != 2 { |
| 22 | t.Fatalf("expected 2 lines, got %d: %v", len(lines), lines) |
| 23 | } |
| 24 | if lines[0] != "hello world" { |
| 25 | t.Errorf("line 0 = %q", lines[0]) |
| 26 | } |
| 27 | if lines[1] != "goodbye" { |
| 28 | t.Errorf("line 1 = %q", lines[1]) |
| 29 | } |
| 30 | } |
| 31 | |
| 32 | func TestPTYMirrorStripANSI(t *testing.T) { |
| 33 | var lines []string |
| 34 | m := NewPTYMirror(0, 0, func(line string) { |
| 35 | lines = append(lines, line) |
| 36 | }) |
| 37 | |
| 38 | input := "\x1b[32mgreen text\x1b[0m\n" |
| 39 | _ = m.Copy(strings.NewReader(input), nil) |
| 40 | |
| 41 | if len(lines) != 1 || lines[0] != "green text" { |
| 42 | t.Errorf("expected 'green text', got %v", lines) |
| 43 | } |
| 44 | } |
| 45 | |
| 46 | func TestPTYMirrorPassthrough(t *testing.T) { |
| 47 | var buf bytes.Buffer |
| 48 | m := NewPTYMirror(0, 0, func(string) {}) |
| 49 | |
| 50 | input := "hello\n" |
| 51 | _ = m.Copy(strings.NewReader(input), &buf) |
| 52 | |
| 53 | if buf.String() != input { |
| 54 | t.Errorf("passthrough = %q, want %q", buf.String(), input) |
| 55 | } |
| 56 | } |
| 57 | |
| 58 | func TestPTYMirrorMaxLineLen(t *testing.T) { |
| 59 | var lines []string |
| 60 | m := NewPTYMirror(20, 0, func(line string) { |
| 61 | lines = append(lines, line) |
| 62 | }) |
| 63 | |
| 64 | input := "this is a very long line that should be truncated\n" |
| 65 | _ = m.Copy(strings.NewReader(input), nil) |
| 66 | |
| 67 | if len(lines) != 1 || len(lines[0]) > 20 { |
| 68 | t.Errorf("expected truncated line <= 20 chars, got %q", lines[0]) |
| 69 | } |
| 70 | } |
| 71 | |
| 72 | func TestPTYMirrorNoise(t *testing.T) { |
| 73 | var lines []string |
| 74 | m := NewPTYMirror(0, 0, func(line string) { |
| 75 | lines = append(lines, line) |
| 76 | }) |
| 77 | |
| 78 | input := "real output\n⠋\n...\n50%\nmore output\n" |
| 79 | _ = m.Copy(strings.NewReader(input), nil) |
| 80 | |
| 81 | if len(lines) != 2 { |
| 82 | t.Fatalf("expected 2 lines (noise filtered), got %d: %v", len(lines), lines) |
| 83 | } |
| 84 | } |
| 85 | |
| 86 | func TestPTYMirrorDedup(t *testing.T) { |
| 87 | var lines []string |
| 88 | m := NewPTYMirror(0, 0, func(line string) { |
| 89 | lines = append(lines, line) |
| 90 | }) |
| 91 | |
| 92 | input := "same line\nsame line\ndifferent\n" |
| 93 | _ = m.Copy(strings.NewReader(input), nil) |
| 94 | |
| 95 | if len(lines) != 2 { |
| 96 | t.Fatalf("expected 2 lines (dedup), got %d: %v", len(lines), lines) |
| 97 | } |
| 98 | } |
| 99 | |
| 100 | func TestPTYMirrorMarkSeen(t *testing.T) { |
| 101 | var lines []string |
| 102 | m := NewPTYMirror(0, 0, func(line string) { |
| 103 | lines = append(lines, line) |
| 104 | }) |
| 105 | |
| 106 | // Mark a line as seen (from session file mirror). |
| 107 | m.MarkSeen("already seen") |
| 108 | |
| 109 | input := "already seen\nnew line\n" |
| 110 | _ = m.Copy(strings.NewReader(input), nil) |
| 111 | |
| 112 | if len(lines) != 1 || lines[0] != "new line" { |
| 113 | t.Errorf("expected only 'new line', got %v", lines) |
| 114 | } |
| 115 | } |
| 116 | |
| 117 | func TestPTYMirrorBusyCallback(t *testing.T) { |
| 118 | var busyAt time.Time |
| 119 | m := NewPTYMirror(0, 0, func(string) {}) |
| 120 | m.BusyCallback = func(now time.Time) { busyAt = now } |
| 121 | |
| 122 | input := "esc to interrupt\n" |
| 123 | _ = m.Copy(strings.NewReader(input), nil) |
| 124 | |
| 125 | if busyAt.IsZero() { |
| 126 | t.Error("busy callback was not called") |
| 127 | } |
| 128 | } |
+123
| --- a/pkg/relaymirror/session.go | ||
| +++ b/pkg/relaymirror/session.go | ||
| @@ -0,0 +1,123 @@ | ||
| 1 | +package relaymirror | |
| 2 | + | |
| 3 | +import ( | |
| 4 | + "context" | |
| 5 | + "encoding/json" | |
| 6 | + "fmt" | |
| 7 | + "os" | |
| 8 | + "path/filepath" | |
| 9 | + "sort" | |
| 10 | + "strings" | |
| 11 | + "time" | |
| 12 | +) | |
| 13 | + | |
| 14 | +// SessionWatcher watches a directory for new session files and calls onFile | |
| 15 | +// when one is discovered. Designed for Gemini CLI session discovery. | |
| 16 | +type SessionWatcher struct { | |
| 17 | + dir string | |
| 18 | + prefix string // e.g. "session-" | |
| 19 | + timeout time.Duration | |
| 20 | +} | |
| 21 | + | |
| 22 | +// NewSessionWatcher creates a watcher for session files matching prefix in dir. | |
| 23 | +func NewSessionWatcher(dir, prefix string, timeout time.Duration) *SessionWatcher { | |
| 24 | + return &SessionWatcher{dir: dir, prefix: prefix, timeout: timeout} | |
| 25 | +} | |
| 26 | + | |
| 27 | +// Discover waits for a new session file to appear in the directory. | |
| 28 | +// Returns the path of the discovered file. | |
| 29 | +func (w *SessionWatcher) Discover(ctx context.Context, existingFiles map[string]bool) (string, error) { | |
| 30 | + deadline := time.After(w.timeout) | |
| 31 | + tick := time.NewTicker(500 * time.Millisecond) | |
| 32 | + defer tick.Stop() | |
| 33 | + | |
| 34 | + for { | |
| 35 | + select { | |
| 36 | + case <-ctx.Done(): | |
| 37 | + return "", ctx.Err() | |
| 38 | + case <-deadline: | |
| 39 | + return "", fmt.Errorf("session discovery timed out after %s", w.timeout) | |
| 40 | + case <-tick.C: | |
| 41 | + entries, err := os.ReadDir(w.dir) | |
| 42 | + if err != nil { | |
| 43 | + continue | |
| 44 | + } | |
| 45 | + // Find newest file matching prefix that isn't pre-existing. | |
| 46 | + var candidates []os.DirEntry | |
| 47 | + for _, e := range entries { | |
| 48 | + if e.IsDir() || !strings.HasPrefix(e.Name(), w.prefix) { | |
| 49 | + continue | |
| 50 | + } | |
| 51 | + if existingFiles[e.Name()] { | |
| 52 | + continue | |
| 53 | + } | |
| 54 | + candidates = append(candidates, e) | |
| 55 | + } | |
| 56 | + if len(candidates) == 0 { | |
| 57 | + continue | |
| 58 | + } | |
| 59 | + // Sort by mod time, pick newest. | |
| 60 | + sort.Slice(candidates, func(i, j int) bool { | |
| 61 | + ii, _ := candidates[i].Info() | |
| 62 | + jj, _ := candidates[j].Info() | |
| 63 | + if ii == nil || jj == nil { | |
| 64 | + return false | |
| 65 | + } | |
| 66 | + return ii.ModTime().After(jj.ModTime()) | |
| 67 | + }) | |
| 68 | + return filepath.Join(w.dir, candidates[0].Name()), nil | |
| 69 | + } | |
| 70 | + } | |
| 71 | +} | |
| 72 | + | |
| 73 | +// SnapshotDir returns a set of filenames currently in dir. | |
| 74 | +func SnapshotDir(dir string) map[string]bool { | |
| 75 | + entries, err := os.ReadDir(dir) | |
| 76 | + if err != nil { | |
| 77 | + return nil | |
| 78 | + } | |
| 79 | + out := make(map[string]bool, len(entries)) | |
| 80 | + for _, e := range entries { | |
| 81 | + out[e.Name()] = true | |
| 82 | + } | |
| 83 | + return out | |
| 84 | +} | |
| 85 | + | |
| 86 | +// GeminiMessage is a message from a Gemini CLI session file. | |
| 87 | +type GeminiMessage struct { | |
| 88 | + Type string `json:"type"` // "user", "gemini" | |
| 89 | + Content string `json:"content,omitempty"` | |
| 90 | + ToolCalls []GeminiToolCall `json:"toolCalls,omitempty"` | |
| 91 | +} | |
| 92 | + | |
| 93 | +// GeminiToolCall is a tool call in a Gemini session. | |
| 94 | +type GeminiToolCall struct { | |
| 95 | + Name string `json:"name"` | |
| 96 | + Args json.RawMessage `json:"args"` | |
| 97 | + Result json.RawMessage `json:"result,omitempty"` | |
| 98 | + Status string `json:"status"` | |
| 99 | +} | |
| 100 | + | |
| 101 | +// GeminiSession is the top-level structure of a Gemini session file. | |
| 102 | +type GeminiSession struct { | |
| 103 | + SessionID string `json:"sessionId"` | |
| 104 | + Messages []GeminiMessage `json:"messages"` | |
| 105 | +} | |
| 106 | + | |
| 107 | +// PollGeminiSession reads a Gemini session file and returns messages since | |
| 108 | +// the given index. Returns the new message count. | |
| 109 | +func PollGeminiSession(path string, sinceIdx int) ([]GeminiMessage, int, error) { | |
| 110 | + data, err := os.ReadFile(path) | |
| 111 | + if err != nil { | |
| 112 | + return nil, sinceIdx, err | |
| 113 | + } | |
| 114 | + var session GeminiSession | |
| 115 | + if err := json.Unmarshal(data, &session); err != nil { | |
| 116 | + return nil, sinceIdx, err | |
| 117 | + } | |
| 118 | + if len(session.Messages) <= sinceIdx { | |
| 119 | + return nil, sinceIdx, nil | |
| 120 | + } | |
| 121 | + newMsgs := session.Messages[sinceIdx:] | |
| 122 | + return newMsgs, len(session.Messages), nil | |
| 123 | +} |
| --- a/pkg/relaymirror/session.go | |
| +++ b/pkg/relaymirror/session.go | |
| @@ -0,0 +1,123 @@ | |
| --- a/pkg/relaymirror/session.go | |
| +++ b/pkg/relaymirror/session.go | |
| @@ -0,0 +1,123 @@ | |
| 1 | package relaymirror |
| 2 | |
| 3 | import ( |
| 4 | "context" |
| 5 | "encoding/json" |
| 6 | "fmt" |
| 7 | "os" |
| 8 | "path/filepath" |
| 9 | "sort" |
| 10 | "strings" |
| 11 | "time" |
| 12 | ) |
| 13 | |
| 14 | // SessionWatcher watches a directory for new session files and calls onFile |
| 15 | // when one is discovered. Designed for Gemini CLI session discovery. |
| 16 | type SessionWatcher struct { |
| 17 | dir string |
| 18 | prefix string // e.g. "session-" |
| 19 | timeout time.Duration |
| 20 | } |
| 21 | |
| 22 | // NewSessionWatcher creates a watcher for session files matching prefix in dir. |
| 23 | func NewSessionWatcher(dir, prefix string, timeout time.Duration) *SessionWatcher { |
| 24 | return &SessionWatcher{dir: dir, prefix: prefix, timeout: timeout} |
| 25 | } |
| 26 | |
| 27 | // Discover waits for a new session file to appear in the directory. |
| 28 | // Returns the path of the discovered file. |
| 29 | func (w *SessionWatcher) Discover(ctx context.Context, existingFiles map[string]bool) (string, error) { |
| 30 | deadline := time.After(w.timeout) |
| 31 | tick := time.NewTicker(500 * time.Millisecond) |
| 32 | defer tick.Stop() |
| 33 | |
| 34 | for { |
| 35 | select { |
| 36 | case <-ctx.Done(): |
| 37 | return "", ctx.Err() |
| 38 | case <-deadline: |
| 39 | return "", fmt.Errorf("session discovery timed out after %s", w.timeout) |
| 40 | case <-tick.C: |
| 41 | entries, err := os.ReadDir(w.dir) |
| 42 | if err != nil { |
| 43 | continue |
| 44 | } |
| 45 | // Find newest file matching prefix that isn't pre-existing. |
| 46 | var candidates []os.DirEntry |
| 47 | for _, e := range entries { |
| 48 | if e.IsDir() || !strings.HasPrefix(e.Name(), w.prefix) { |
| 49 | continue |
| 50 | } |
| 51 | if existingFiles[e.Name()] { |
| 52 | continue |
| 53 | } |
| 54 | candidates = append(candidates, e) |
| 55 | } |
| 56 | if len(candidates) == 0 { |
| 57 | continue |
| 58 | } |
| 59 | // Sort by mod time, pick newest. |
| 60 | sort.Slice(candidates, func(i, j int) bool { |
| 61 | ii, _ := candidates[i].Info() |
| 62 | jj, _ := candidates[j].Info() |
| 63 | if ii == nil || jj == nil { |
| 64 | return false |
| 65 | } |
| 66 | return ii.ModTime().After(jj.ModTime()) |
| 67 | }) |
| 68 | return filepath.Join(w.dir, candidates[0].Name()), nil |
| 69 | } |
| 70 | } |
| 71 | } |
| 72 | |
| 73 | // SnapshotDir returns a set of filenames currently in dir. |
| 74 | func SnapshotDir(dir string) map[string]bool { |
| 75 | entries, err := os.ReadDir(dir) |
| 76 | if err != nil { |
| 77 | return nil |
| 78 | } |
| 79 | out := make(map[string]bool, len(entries)) |
| 80 | for _, e := range entries { |
| 81 | out[e.Name()] = true |
| 82 | } |
| 83 | return out |
| 84 | } |
| 85 | |
| 86 | // GeminiMessage is a message from a Gemini CLI session file. |
| 87 | type GeminiMessage struct { |
| 88 | Type string `json:"type"` // "user", "gemini" |
| 89 | Content string `json:"content,omitempty"` |
| 90 | ToolCalls []GeminiToolCall `json:"toolCalls,omitempty"` |
| 91 | } |
| 92 | |
| 93 | // GeminiToolCall is a tool call in a Gemini session. |
| 94 | type GeminiToolCall struct { |
| 95 | Name string `json:"name"` |
| 96 | Args json.RawMessage `json:"args"` |
| 97 | Result json.RawMessage `json:"result,omitempty"` |
| 98 | Status string `json:"status"` |
| 99 | } |
| 100 | |
| 101 | // GeminiSession is the top-level structure of a Gemini session file. |
| 102 | type GeminiSession struct { |
| 103 | SessionID string `json:"sessionId"` |
| 104 | Messages []GeminiMessage `json:"messages"` |
| 105 | } |
| 106 | |
| 107 | // PollGeminiSession reads a Gemini session file and returns messages since |
| 108 | // the given index. Returns the new message count. |
| 109 | func PollGeminiSession(path string, sinceIdx int) ([]GeminiMessage, int, error) { |
| 110 | data, err := os.ReadFile(path) |
| 111 | if err != nil { |
| 112 | return nil, sinceIdx, err |
| 113 | } |
| 114 | var session GeminiSession |
| 115 | if err := json.Unmarshal(data, &session); err != nil { |
| 116 | return nil, sinceIdx, err |
| 117 | } |
| 118 | if len(session.Messages) <= sinceIdx { |
| 119 | return nil, sinceIdx, nil |
| 120 | } |
| 121 | newMsgs := session.Messages[sinceIdx:] |
| 122 | return newMsgs, len(session.Messages), nil |
| 123 | } |