ScuttleBot
wip: relay reconnection watchdog (not yet working) Adds a watchdog goroutine to the IRC connector that probes the API and force-closes dead connections. Also re-registers SASL credentials on reconnect. The watchdog starts but ticks don't fire for unknown reasons — tracked in #51.
Commit
5336d3377baa3f8934edab9f1eb28baba0e1afdc6ef5255e3bbd65906c0ed768
Parent
54be8d26cb8d16e…
1 file changed
+69
-2
+69
-2
| --- pkg/sessionrelay/irc.go | ||
| +++ pkg/sessionrelay/irc.go | ||
| @@ -87,10 +87,11 @@ | ||
| 87 | 87 | case err := <-c.errCh: |
| 88 | 88 | _ = c.cleanupRegistration(context.Background()) |
| 89 | 89 | return fmt.Errorf("sessionrelay: irc connect: %w", err) |
| 90 | 90 | case <-joined: |
| 91 | 91 | go c.keepAlive(ctx, host, port) |
| 92 | + go c.watchdog(ctx) | |
| 92 | 93 | return nil |
| 93 | 94 | } |
| 94 | 95 | } |
| 95 | 96 | |
| 96 | 97 | // dial creates a fresh girc client, wires up handlers, and starts the |
| @@ -165,11 +166,12 @@ | ||
| 165 | 166 | wait := ircReconnectMin |
| 166 | 167 | for { |
| 167 | 168 | select { |
| 168 | 169 | case <-ctx.Done(): |
| 169 | 170 | return |
| 170 | - case <-c.errCh: | |
| 171 | + case err := <-c.errCh: | |
| 172 | + fmt.Fprintf(os.Stderr, "sessionrelay: connection lost: %v\n", err) | |
| 171 | 173 | } |
| 172 | 174 | |
| 173 | 175 | // Close the dead client before replacing it. |
| 174 | 176 | c.mu.Lock() |
| 175 | 177 | if c.client != nil { |
| @@ -181,10 +183,11 @@ | ||
| 181 | 183 | select { |
| 182 | 184 | case <-ctx.Done(): |
| 183 | 185 | return |
| 184 | 186 | case <-time.After(wait): |
| 185 | 187 | } |
| 188 | + fmt.Fprintf(os.Stderr, "sessionrelay: reconnecting (backoff %v)...\n", wait) | |
| 186 | 189 | |
| 187 | 190 | // Re-register to get fresh SASL credentials in case the server |
| 188 | 191 | // restarted and the Ergo database was reset. |
| 189 | 192 | c.pass = "" // clear stale creds |
| 190 | 193 | if err := c.ensureCredentials(ctx); err != nil { |
| @@ -197,13 +200,77 @@ | ||
| 197 | 200 | default: |
| 198 | 201 | } |
| 199 | 202 | }() |
| 200 | 203 | continue |
| 201 | 204 | } |
| 205 | + fmt.Fprintf(os.Stderr, "sessionrelay: credentials refreshed, dialing...\n", ) | |
| 202 | 206 | |
| 203 | 207 | wait = min(wait*2, ircReconnectMax) |
| 204 | - c.dial(host, port, func() { wait = ircReconnectMin }) | |
| 208 | + c.dial(host, port, func() { | |
| 209 | + wait = ircReconnectMin | |
| 210 | + fmt.Fprintf(os.Stderr, "sessionrelay: reconnected successfully\n") | |
| 211 | + }) | |
| 212 | + } | |
| 213 | +} | |
| 214 | + | |
| 215 | +// watchdog periodically checks if the IRC client is still connected and | |
| 216 | +// if the API is reachable. Forces reconnection when the connection is dead. | |
| 217 | +func (c *ircConnector) watchdog(ctx context.Context) { | |
| 218 | + failures := 0 | |
| 219 | + for { | |
| 220 | + select { | |
| 221 | + case <-ctx.Done(): | |
| 222 | + return | |
| 223 | + case <-time.After(10 * time.Second): | |
| 224 | + } | |
| 225 | + | |
| 226 | + c.mu.RLock() | |
| 227 | + client := c.client | |
| 228 | + c.mu.RUnlock() | |
| 229 | + if client == nil { | |
| 230 | + failures = 0 | |
| 231 | + continue | |
| 232 | + } | |
| 233 | + | |
| 234 | + if !client.IsConnected() { | |
| 235 | + client.Close() | |
| 236 | + select { | |
| 237 | + case c.errCh <- fmt.Errorf("watchdog: client disconnected"): | |
| 238 | + default: | |
| 239 | + } | |
| 240 | + failures = 0 | |
| 241 | + continue | |
| 242 | + } | |
| 243 | + | |
| 244 | + // Probe the API to detect server restarts. | |
| 245 | + if c.apiURL != "" && c.token != "" { | |
| 246 | + probeCtx, cancel := context.WithTimeout(ctx, 5*time.Second) | |
| 247 | + req, _ := http.NewRequestWithContext(probeCtx, http.MethodGet, c.apiURL+"/v1/status", nil) | |
| 248 | + if req != nil { | |
| 249 | + req.Header.Set("Authorization", "Bearer "+c.token) | |
| 250 | + resp, err := http.DefaultClient.Do(req) | |
| 251 | + if err != nil || resp.StatusCode != 200 { | |
| 252 | + failures++ | |
| 253 | + if resp != nil { | |
| 254 | + resp.Body.Close() | |
| 255 | + } | |
| 256 | + } else { | |
| 257 | + resp.Body.Close() | |
| 258 | + failures = 0 | |
| 259 | + } | |
| 260 | + } | |
| 261 | + cancel() | |
| 262 | + } | |
| 263 | + | |
| 264 | + if failures >= 3 { | |
| 265 | + client.Close() | |
| 266 | + select { | |
| 267 | + case c.errCh <- fmt.Errorf("watchdog: API unreachable"): | |
| 268 | + default: | |
| 269 | + } | |
| 270 | + failures = 0 | |
| 271 | + } | |
| 205 | 272 | } |
| 206 | 273 | } |
| 207 | 274 | |
| 208 | 275 | func (c *ircConnector) Post(_ context.Context, text string) error { |
| 209 | 276 | c.mu.RLock() |
| 210 | 277 |
| --- pkg/sessionrelay/irc.go | |
| +++ pkg/sessionrelay/irc.go | |
| @@ -87,10 +87,11 @@ | |
| 87 | case err := <-c.errCh: |
| 88 | _ = c.cleanupRegistration(context.Background()) |
| 89 | return fmt.Errorf("sessionrelay: irc connect: %w", err) |
| 90 | case <-joined: |
| 91 | go c.keepAlive(ctx, host, port) |
| 92 | return nil |
| 93 | } |
| 94 | } |
| 95 | |
| 96 | // dial creates a fresh girc client, wires up handlers, and starts the |
| @@ -165,11 +166,12 @@ | |
| 165 | wait := ircReconnectMin |
| 166 | for { |
| 167 | select { |
| 168 | case <-ctx.Done(): |
| 169 | return |
| 170 | case <-c.errCh: |
| 171 | } |
| 172 | |
| 173 | // Close the dead client before replacing it. |
| 174 | c.mu.Lock() |
| 175 | if c.client != nil { |
| @@ -181,10 +183,11 @@ | |
| 181 | select { |
| 182 | case <-ctx.Done(): |
| 183 | return |
| 184 | case <-time.After(wait): |
| 185 | } |
| 186 | |
| 187 | // Re-register to get fresh SASL credentials in case the server |
| 188 | // restarted and the Ergo database was reset. |
| 189 | c.pass = "" // clear stale creds |
| 190 | if err := c.ensureCredentials(ctx); err != nil { |
| @@ -197,13 +200,77 @@ | |
| 197 | default: |
| 198 | } |
| 199 | }() |
| 200 | continue |
| 201 | } |
| 202 | |
| 203 | wait = min(wait*2, ircReconnectMax) |
| 204 | c.dial(host, port, func() { wait = ircReconnectMin }) |
| 205 | } |
| 206 | } |
| 207 | |
| 208 | func (c *ircConnector) Post(_ context.Context, text string) error { |
| 209 | c.mu.RLock() |
| 210 |
| --- pkg/sessionrelay/irc.go | |
| +++ pkg/sessionrelay/irc.go | |
| @@ -87,10 +87,11 @@ | |
| 87 | case err := <-c.errCh: |
| 88 | _ = c.cleanupRegistration(context.Background()) |
| 89 | return fmt.Errorf("sessionrelay: irc connect: %w", err) |
| 90 | case <-joined: |
| 91 | go c.keepAlive(ctx, host, port) |
| 92 | go c.watchdog(ctx) |
| 93 | return nil |
| 94 | } |
| 95 | } |
| 96 | |
| 97 | // dial creates a fresh girc client, wires up handlers, and starts the |
| @@ -165,11 +166,12 @@ | |
| 166 | wait := ircReconnectMin |
| 167 | for { |
| 168 | select { |
| 169 | case <-ctx.Done(): |
| 170 | return |
| 171 | case err := <-c.errCh: |
| 172 | fmt.Fprintf(os.Stderr, "sessionrelay: connection lost: %v\n", err) |
| 173 | } |
| 174 | |
| 175 | // Close the dead client before replacing it. |
| 176 | c.mu.Lock() |
| 177 | if c.client != nil { |
| @@ -181,10 +183,11 @@ | |
| 183 | select { |
| 184 | case <-ctx.Done(): |
| 185 | return |
| 186 | case <-time.After(wait): |
| 187 | } |
| 188 | fmt.Fprintf(os.Stderr, "sessionrelay: reconnecting (backoff %v)...\n", wait) |
| 189 | |
| 190 | // Re-register to get fresh SASL credentials in case the server |
| 191 | // restarted and the Ergo database was reset. |
| 192 | c.pass = "" // clear stale creds |
| 193 | if err := c.ensureCredentials(ctx); err != nil { |
| @@ -197,13 +200,77 @@ | |
| 200 | default: |
| 201 | } |
| 202 | }() |
| 203 | continue |
| 204 | } |
| 205 | fmt.Fprintf(os.Stderr, "sessionrelay: credentials refreshed, dialing...\n", ) |
| 206 | |
| 207 | wait = min(wait*2, ircReconnectMax) |
| 208 | c.dial(host, port, func() { |
| 209 | wait = ircReconnectMin |
| 210 | fmt.Fprintf(os.Stderr, "sessionrelay: reconnected successfully\n") |
| 211 | }) |
| 212 | } |
| 213 | } |
| 214 | |
| 215 | // watchdog periodically checks if the IRC client is still connected and |
| 216 | // if the API is reachable. Forces reconnection when the connection is dead. |
| 217 | func (c *ircConnector) watchdog(ctx context.Context) { |
| 218 | failures := 0 |
| 219 | for { |
| 220 | select { |
| 221 | case <-ctx.Done(): |
| 222 | return |
| 223 | case <-time.After(10 * time.Second): |
| 224 | } |
| 225 | |
| 226 | c.mu.RLock() |
| 227 | client := c.client |
| 228 | c.mu.RUnlock() |
| 229 | if client == nil { |
| 230 | failures = 0 |
| 231 | continue |
| 232 | } |
| 233 | |
| 234 | if !client.IsConnected() { |
| 235 | client.Close() |
| 236 | select { |
| 237 | case c.errCh <- fmt.Errorf("watchdog: client disconnected"): |
| 238 | default: |
| 239 | } |
| 240 | failures = 0 |
| 241 | continue |
| 242 | } |
| 243 | |
| 244 | // Probe the API to detect server restarts. |
| 245 | if c.apiURL != "" && c.token != "" { |
| 246 | probeCtx, cancel := context.WithTimeout(ctx, 5*time.Second) |
| 247 | req, _ := http.NewRequestWithContext(probeCtx, http.MethodGet, c.apiURL+"/v1/status", nil) |
| 248 | if req != nil { |
| 249 | req.Header.Set("Authorization", "Bearer "+c.token) |
| 250 | resp, err := http.DefaultClient.Do(req) |
| 251 | if err != nil || resp.StatusCode != 200 { |
| 252 | failures++ |
| 253 | if resp != nil { |
| 254 | resp.Body.Close() |
| 255 | } |
| 256 | } else { |
| 257 | resp.Body.Close() |
| 258 | failures = 0 |
| 259 | } |
| 260 | } |
| 261 | cancel() |
| 262 | } |
| 263 | |
| 264 | if failures >= 3 { |
| 265 | client.Close() |
| 266 | select { |
| 267 | case c.errCh <- fmt.Errorf("watchdog: API unreachable"): |
| 268 | default: |
| 269 | } |
| 270 | failures = 0 |
| 271 | } |
| 272 | } |
| 273 | } |
| 274 | |
| 275 | func (c *ircConnector) Post(_ context.Context, text string) error { |
| 276 | c.mu.RLock() |
| 277 |