feat(supervisor): managed stdio subprocess (forgejo-mcp-broker-zuq)
Adds internal/supervisor: a thin wrapper around os/exec that handles the zombie/leak/escalation concerns once, so phase-4 (bridge) and phase-5 (session glue) don't each have to re-derive them. Lifecycle (Stop): 1. Close stdin — well-behaved stdio servers exit on EOF 2. Send SIGTERM 3. Wait up to StopGrace (default 5s) for exit 4. SIGKILL if still alive Reaping is mandatory: a goroutine calls cmd.Wait so the kernel actually collects the child. Without it you accumulate zombies under N concurrent sessions. Tests exercise this via the helper-process pattern (TestMain re-execs the test binary in helper mode) — no shell or external binary dependency. Tests cover: empty Cmd validation, missing-binary error, echo round trip via stdin/stdout, stderr drainer collecting lines, SIGTERM-friendly graceful stop, SIGTERM-ignoring child escalating to SIGKILL (with a ready-on-stdout sync barrier so the test isn't racing the helper's signal.Notify), idempotent Stop, clean exit detection, non-zero exit detection, env override propagation. 89.6% coverage; remaining gap is unreachable-from-public-API defensive branches (pipe-creation failures under FD exhaustion, post-release Pid). Manual smoke test against a real `forgejo-mcp --transport stdio` is deferred to phase 4b's integration test (where it adds the most value). Closes forgejo-mcp-broker-zuq. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
006d5c1448
commit
7be7f5e199
3 changed files with 538 additions and 2 deletions
|
|
@ -5,9 +5,9 @@
|
||||||
{"id":"forgejo-mcp-broker-31t","title":"Phase 3b: supervisor stress tests (FD/goroutine/zombie leak detection)","description":"1000 spawn/stop cycles under -race. Verify no FD leak, no goroutine leak (go.uber.org/goleak), no zombies (wait4 returns ECHILD when idle).","acceptance_criteria":"Cycle test passes under -race. FD count stable within a small constant. goleak detects no extra goroutines after test.","status":"open","priority":1,"issue_type":"task","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:15Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-24T15:45:15Z","dependencies":[{"issue_id":"forgejo-mcp-broker-31t","depends_on_id":"forgejo-mcp-broker-zuq","type":"blocks","created_at":"2026-04-24T17:45:26Z","created_by":"Ole-Morten Duesund","metadata":"{}"}],"dependency_count":1,"dependent_count":0,"comment_count":0}
|
{"id":"forgejo-mcp-broker-31t","title":"Phase 3b: supervisor stress tests (FD/goroutine/zombie leak detection)","description":"1000 spawn/stop cycles under -race. Verify no FD leak, no goroutine leak (go.uber.org/goleak), no zombies (wait4 returns ECHILD when idle).","acceptance_criteria":"Cycle test passes under -race. FD count stable within a small constant. goleak detects no extra goroutines after test.","status":"open","priority":1,"issue_type":"task","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:15Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-24T15:45:15Z","dependencies":[{"issue_id":"forgejo-mcp-broker-31t","depends_on_id":"forgejo-mcp-broker-zuq","type":"blocks","created_at":"2026-04-24T17:45:26Z","created_by":"Ole-Morten Duesund","metadata":"{}"}],"dependency_count":1,"dependent_count":0,"comment_count":0}
|
||||||
{"id":"forgejo-mcp-broker-am1","title":"Phase 4a: internal/bridge JSON-RPC pipe + SSE writer","description":"Given a supervisor.Child: inbound HTTP JSON -\u003e newline-framed stdin; stdout lines -\u003e SSE frames. Handle client disconnect without killing the child.","acceptance_criteria":"Unit tests with mock Child that echoes: request/response round trip, multiple concurrent requests with correct id routing, client disconnect mid-stream.","status":"open","priority":1,"issue_type":"task","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:15Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-24T15:45:15Z","dependencies":[{"issue_id":"forgejo-mcp-broker-am1","depends_on_id":"forgejo-mcp-broker-zuq","type":"blocks","created_at":"2026-04-24T17:45:27Z","created_by":"Ole-Morten Duesund","metadata":"{}"}],"dependency_count":1,"dependent_count":2,"comment_count":0}
|
{"id":"forgejo-mcp-broker-am1","title":"Phase 4a: internal/bridge JSON-RPC pipe + SSE writer","description":"Given a supervisor.Child: inbound HTTP JSON -\u003e newline-framed stdin; stdout lines -\u003e SSE frames. Handle client disconnect without killing the child.","acceptance_criteria":"Unit tests with mock Child that echoes: request/response round trip, multiple concurrent requests with correct id routing, client disconnect mid-stream.","status":"open","priority":1,"issue_type":"task","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:15Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-24T15:45:15Z","dependencies":[{"issue_id":"forgejo-mcp-broker-am1","depends_on_id":"forgejo-mcp-broker-zuq","type":"blocks","created_at":"2026-04-24T17:45:27Z","created_by":"Ole-Morten Duesund","metadata":"{}"}],"dependency_count":1,"dependent_count":2,"comment_count":0}
|
||||||
{"id":"forgejo-mcp-broker-wgo","title":"Phase 2e: OAuth security review + attack-path tests","description":"Phase 2 exit gate. Review every handler for classic OAuth vulns (open redirect, code replay, mix-up, token leak in logs, host spoofing). Add at least one test per attack class. Update design.md §8 with findings.","acceptance_criteria":"Review checklist documented. Tests added for: PKCE mismatch, stale code, token absent from log attributes, bad redirect_uri, mismatched state, replay of used code.","status":"open","priority":1,"issue_type":"task","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:14Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-24T15:45:14Z","dependencies":[{"issue_id":"forgejo-mcp-broker-wgo","depends_on_id":"forgejo-mcp-broker-b2o","type":"blocks","created_at":"2026-04-24T17:45:26Z","created_by":"Ole-Morten Duesund","metadata":"{}"},{"issue_id":"forgejo-mcp-broker-wgo","depends_on_id":"forgejo-mcp-broker-pur","type":"blocks","created_at":"2026-04-24T17:45:25Z","created_by":"Ole-Morten Duesund","metadata":"{}"}],"dependency_count":2,"dependent_count":0,"comment_count":0}
|
{"id":"forgejo-mcp-broker-wgo","title":"Phase 2e: OAuth security review + attack-path tests","description":"Phase 2 exit gate. Review every handler for classic OAuth vulns (open redirect, code replay, mix-up, token leak in logs, host spoofing). Add at least one test per attack class. Update design.md §8 with findings.","acceptance_criteria":"Review checklist documented. Tests added for: PKCE mismatch, stale code, token absent from log attributes, bad redirect_uri, mismatched state, replay of used code.","status":"open","priority":1,"issue_type":"task","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:14Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-24T15:45:14Z","dependencies":[{"issue_id":"forgejo-mcp-broker-wgo","depends_on_id":"forgejo-mcp-broker-b2o","type":"blocks","created_at":"2026-04-24T17:45:26Z","created_by":"Ole-Morten Duesund","metadata":"{}"},{"issue_id":"forgejo-mcp-broker-wgo","depends_on_id":"forgejo-mcp-broker-pur","type":"blocks","created_at":"2026-04-24T17:45:25Z","created_by":"Ole-Morten Duesund","metadata":"{}"}],"dependency_count":2,"dependent_count":0,"comment_count":0}
|
||||||
{"id":"forgejo-mcp-broker-zuq","title":"Phase 3a: internal/supervisor managed stdio subprocess","description":"Child type: Start, Stop(ctx) with SIGTERM -\u003e grace -\u003e SIGKILL, Wait+reap goroutine (no zombies), stderr drainer with prefix. Protocol-agnostic.","acceptance_criteria":"Unit tests against an echo-loop helper: round trip, graceful stop, kill-after-grace, child-exits-on-own detection, stderr capture. Manual spawn of real forgejo-mcp --transport stdio works.","status":"open","priority":1,"issue_type":"task","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:14Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-24T15:45:14Z","dependency_count":0,"dependent_count":3,"comment_count":0}
|
{"id":"forgejo-mcp-broker-zuq","title":"Phase 3a: internal/supervisor managed stdio subprocess","description":"Child type: Start, Stop(ctx) with SIGTERM -\u003e grace -\u003e SIGKILL, Wait+reap goroutine (no zombies), stderr drainer with prefix. Protocol-agnostic.","acceptance_criteria":"Unit tests against an echo-loop helper: round trip, graceful stop, kill-after-grace, child-exits-on-own detection, stderr capture. Manual spawn of real forgejo-mcp --transport stdio works.","status":"in_progress","priority":1,"issue_type":"task","assignee":"Ole-Morten Duesund","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:14Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-27T11:32:54Z","started_at":"2026-04-27T11:32:54Z","dependency_count":0,"dependent_count":3,"comment_count":0}
|
||||||
{"id":"forgejo-mcp-broker-b2o","title":"Phase 2d: OAuth discovery endpoints (/.well-known/*)","description":"GET /.well-known/oauth-protected-resource and /.well-known/oauth-authorization-server. Issuer URLs MUST derive from cfg.PublicURL, never inbound headers (host-header attack defense per design.md §8).","acceptance_criteria":"Responses validate against RFC 8414/9728 shapes. Issuer URL sourced from config only. supported_scopes matches cfg.ForgejoOAuthScopes.","status":"open","priority":1,"issue_type":"task","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:13Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-24T15:45:13Z","dependencies":[{"issue_id":"forgejo-mcp-broker-b2o","depends_on_id":"forgejo-mcp-broker-pur","type":"blocks","created_at":"2026-04-24T17:45:25Z","created_by":"Ole-Morten Duesund","metadata":"{}"}],"dependency_count":1,"dependent_count":1,"comment_count":0}
|
{"id":"forgejo-mcp-broker-b2o","title":"Phase 2d: OAuth discovery endpoints (/.well-known/*)","description":"GET /.well-known/oauth-protected-resource and /.well-known/oauth-authorization-server. Issuer URLs MUST derive from cfg.PublicURL, never inbound headers (host-header attack defense per design.md §8).","acceptance_criteria":"Responses validate against RFC 8414/9728 shapes. Issuer URL sourced from config only. supported_scopes matches cfg.ForgejoOAuthScopes.","status":"open","priority":1,"issue_type":"task","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:13Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-24T15:45:13Z","dependencies":[{"issue_id":"forgejo-mcp-broker-b2o","depends_on_id":"forgejo-mcp-broker-pur","type":"blocks","created_at":"2026-04-24T17:45:25Z","created_by":"Ole-Morten Duesund","metadata":"{}"}],"dependency_count":1,"dependent_count":1,"comment_count":0}
|
||||||
{"id":"forgejo-mcp-broker-b9i","title":"Phase 2b: internal/forgejo OAuth client","description":"Broker-side OAuth client for upstream Forgejo: authorize URL builder, code-to-token exchange, refresh_token grant, userinfo fetch, revoke. Used by AS callback and refresh machinery. Stateless; caller owns persistence.","acceptance_criteria":"Unit tests with httptest.Server fake Forgejo cover each grant plus error paths (wrong code, expired refresh, revoked token). No state persisted in this package.","status":"in_progress","priority":1,"issue_type":"task","assignee":"Ole-Morten Duesund","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:12Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-27T11:29:17Z","started_at":"2026-04-27T11:29:17Z","dependency_count":0,"dependent_count":1,"comment_count":0}
|
{"id":"forgejo-mcp-broker-b9i","title":"Phase 2b: internal/forgejo OAuth client","description":"Broker-side OAuth client for upstream Forgejo: authorize URL builder, code-to-token exchange, refresh_token grant, userinfo fetch, revoke. Used by AS callback and refresh machinery. Stateless; caller owns persistence.","acceptance_criteria":"Unit tests with httptest.Server fake Forgejo cover each grant plus error paths (wrong code, expired refresh, revoked token). No state persisted in this package.","status":"closed","priority":1,"issue_type":"task","assignee":"Ole-Morten Duesund","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:12Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-27T11:31:27Z","started_at":"2026-04-27T11:29:17Z","closed_at":"2026-04-27T11:31:27Z","close_reason":"internal/forgejo shipped: AuthorizeURL, ExchangeCode, Refresh, FetchUserInfo. Structured *forgejo.Error for OAuth failures (errors.As-friendly). 95.1% coverage. Stateless — caller owns persistence. Revocation deferred since upstream Forgejo lacks the endpoint.","dependency_count":0,"dependent_count":1,"comment_count":0}
|
||||||
{"id":"forgejo-mcp-broker-pur","title":"Phase 2c: internal/oauth AS endpoints (register, authorize, callback, token, revoke)","description":"Five OAuth handlers per design.md §4.1. RFC 7591 DCR with ephemeral client IDs, authorize -\u003e Forgejo delegation, callback minting broker auth codes, token exchange with SHA-256 hashing at rest, revoke. PKCE S256 required.","acceptance_criteria":"End-to-end curl walkthrough from plan.md phase 2 passes. All tokens hashed at rest. Auth codes single-use, 10-min TTL. Rejects missing PKCE, non-S256, wrong verifier, expired codes/tokens. Handler coverage \u003e=80%.","status":"open","priority":1,"issue_type":"task","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:12Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-24T15:45:12Z","dependencies":[{"issue_id":"forgejo-mcp-broker-pur","depends_on_id":"forgejo-mcp-broker-b9i","type":"blocks","created_at":"2026-04-24T17:45:24Z","created_by":"Ole-Morten Duesund","metadata":"{}"},{"issue_id":"forgejo-mcp-broker-pur","depends_on_id":"forgejo-mcp-broker-cpb","type":"blocks","created_at":"2026-04-24T17:45:24Z","created_by":"Ole-Morten Duesund","metadata":"{}"}],"dependency_count":2,"dependent_count":5,"comment_count":0}
|
{"id":"forgejo-mcp-broker-pur","title":"Phase 2c: internal/oauth AS endpoints (register, authorize, callback, token, revoke)","description":"Five OAuth handlers per design.md §4.1. RFC 7591 DCR with ephemeral client IDs, authorize -\u003e Forgejo delegation, callback minting broker auth codes, token exchange with SHA-256 hashing at rest, revoke. PKCE S256 required.","acceptance_criteria":"End-to-end curl walkthrough from plan.md phase 2 passes. All tokens hashed at rest. Auth codes single-use, 10-min TTL. Rejects missing PKCE, non-S256, wrong verifier, expired codes/tokens. Handler coverage \u003e=80%.","status":"open","priority":1,"issue_type":"task","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:12Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-24T15:45:12Z","dependencies":[{"issue_id":"forgejo-mcp-broker-pur","depends_on_id":"forgejo-mcp-broker-b9i","type":"blocks","created_at":"2026-04-24T17:45:24Z","created_by":"Ole-Morten Duesund","metadata":"{}"},{"issue_id":"forgejo-mcp-broker-pur","depends_on_id":"forgejo-mcp-broker-cpb","type":"blocks","created_at":"2026-04-24T17:45:24Z","created_by":"Ole-Morten Duesund","metadata":"{}"}],"dependency_count":2,"dependent_count":5,"comment_count":0}
|
||||||
{"id":"forgejo-mcp-broker-cpb","title":"Phase 2a: OAuth tables migration","description":"Add migrations/0002_oauth_tables.sql creating clients, auth_codes, access_tokens, refresh_tokens per design.md §4.2. Broker tokens stored as SHA-256 hashes; Forgejo tokens cleartext (subprocess spawning requires them). See plan.md phase 2.","acceptance_criteria":"Migration applies on a fresh DB and is idempotent on reopen. Schema matches design.md §4.2. Tests verify every table and key column exists.","status":"closed","priority":1,"issue_type":"task","assignee":"Ole-Morten Duesund","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:04Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-27T11:28:20Z","started_at":"2026-04-27T11:26:17Z","closed_at":"2026-04-27T11:28:20Z","close_reason":"0002_oauth_tables.sql shipped: clients/auth_codes/access_tokens/refresh_tokens with cascading FKs, indexes on hot-path columns, and an oauth_schema_version marker. PRAGMA-driven tests verify columns; FK cascade tested in both directions.","dependency_count":0,"dependent_count":1,"comment_count":0}
|
{"id":"forgejo-mcp-broker-cpb","title":"Phase 2a: OAuth tables migration","description":"Add migrations/0002_oauth_tables.sql creating clients, auth_codes, access_tokens, refresh_tokens per design.md §4.2. Broker tokens stored as SHA-256 hashes; Forgejo tokens cleartext (subprocess spawning requires them). See plan.md phase 2.","acceptance_criteria":"Migration applies on a fresh DB and is idempotent on reopen. Schema matches design.md §4.2. Tests verify every table and key column exists.","status":"closed","priority":1,"issue_type":"task","assignee":"Ole-Morten Duesund","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:04Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-27T11:28:20Z","started_at":"2026-04-27T11:26:17Z","closed_at":"2026-04-27T11:28:20Z","close_reason":"0002_oauth_tables.sql shipped: clients/auth_codes/access_tokens/refresh_tokens with cascading FKs, indexes on hot-path columns, and an oauth_schema_version marker. PRAGMA-driven tests verify columns; FK cascade tested in both directions.","dependency_count":0,"dependent_count":1,"comment_count":0}
|
||||||
{"id":"forgejo-mcp-broker-8ei","title":"Phase 1: internal/httpserver with /healthz and graceful shutdown","description":"Implement internal/httpserver: constructs a *http.Server bound to cfg.Listen, mounts GET /healthz (returns 200 with JSON build-info from the build-info package, including version, git revision, build date, and current store status), handles SIGTERM/SIGINT by initiating graceful shutdown with a configurable deadline (default 10s). Uses log/slog for structured JSON logs. Exposes a Run(ctx) error method that blocks until shutdown completes.","acceptance_criteria":"go test ./internal/httpserver passes; GET /healthz returns expected JSON; sending SIGTERM causes Run to return nil within 2 seconds after in-flight requests complete; slow in-flight request is allowed to finish within the deadline, then forcibly closed.","status":"closed","priority":1,"issue_type":"task","assignee":"Ole-Morten Duesund","owner":"olemd@glemt.net","created_at":"2026-04-24T14:46:20Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-24T15:26:43Z","started_at":"2026-04-24T15:24:09Z","closed_at":"2026-04-24T15:26:43Z","close_reason":"httpserver shipped: /healthz with store probe, graceful shutdown with force-close fallback, ExtraHandler extension point. 97.9% coverage. internal/log also implemented in the same commit (100% coverage).","dependencies":[{"issue_id":"forgejo-mcp-broker-8ei","depends_on_id":"forgejo-mcp-broker-n84","type":"blocks","created_at":"2026-04-24T16:46:19Z","created_by":"Ole-Morten Duesund","metadata":"{}"}],"dependency_count":1,"dependent_count":1,"comment_count":0}
|
{"id":"forgejo-mcp-broker-8ei","title":"Phase 1: internal/httpserver with /healthz and graceful shutdown","description":"Implement internal/httpserver: constructs a *http.Server bound to cfg.Listen, mounts GET /healthz (returns 200 with JSON build-info from the build-info package, including version, git revision, build date, and current store status), handles SIGTERM/SIGINT by initiating graceful shutdown with a configurable deadline (default 10s). Uses log/slog for structured JSON logs. Exposes a Run(ctx) error method that blocks until shutdown completes.","acceptance_criteria":"go test ./internal/httpserver passes; GET /healthz returns expected JSON; sending SIGTERM causes Run to return nil within 2 seconds after in-flight requests complete; slow in-flight request is allowed to finish within the deadline, then forcibly closed.","status":"closed","priority":1,"issue_type":"task","assignee":"Ole-Morten Duesund","owner":"olemd@glemt.net","created_at":"2026-04-24T14:46:20Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-24T15:26:43Z","started_at":"2026-04-24T15:24:09Z","closed_at":"2026-04-24T15:26:43Z","close_reason":"httpserver shipped: /healthz with store probe, graceful shutdown with force-close fallback, ExtraHandler extension point. 97.9% coverage. internal/log also implemented in the same commit (100% coverage).","dependencies":[{"issue_id":"forgejo-mcp-broker-8ei","depends_on_id":"forgejo-mcp-broker-n84","type":"blocks","created_at":"2026-04-24T16:46:19Z","created_by":"Ole-Morten Duesund","metadata":"{}"}],"dependency_count":1,"dependent_count":1,"comment_count":0}
|
||||||
|
|
|
||||||
213
internal/supervisor/supervisor.go
Normal file
213
internal/supervisor/supervisor.go
Normal file
|
|
@ -0,0 +1,213 @@
|
||||||
|
// Package supervisor manages a single long-running stdio subprocess on the
|
||||||
|
// broker's behalf — typically `forgejo-mcp --transport stdio`, but the
|
||||||
|
// implementation is protocol-agnostic.
|
||||||
|
//
|
||||||
|
// Lifecycle: Start spawns the child and wires three pipes (stdin/stdout for
|
||||||
|
// JSON-RPC, stderr drained line-by-line into a caller-supplied callback).
|
||||||
|
// A goroutine calls cmd.Wait so the kernel reaps the process — without it
|
||||||
|
// you collect zombies. Stop SIGTERMs the child, waits a grace period, and
|
||||||
|
// escalates to SIGKILL only if the child refuses to exit.
|
||||||
|
//
|
||||||
|
// The package owns no protocol knowledge. Bridge code (phase 4) writes
|
||||||
|
// newline-framed JSON-RPC messages to Stdin and reads them back from
|
||||||
|
// Stdout; this package never inspects them.
|
||||||
|
package supervisor
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"sync"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// DefaultStopGrace is the time Stop waits between SIGTERM and SIGKILL.
|
||||||
|
const DefaultStopGrace = 5 * time.Second
|
||||||
|
|
||||||
|
// Config describes a child to supervise. Cmd is required and Cmd[0] is the
|
||||||
|
// binary; Cmd[1:] are arguments. Everything else is optional.
|
||||||
|
type Config struct {
|
||||||
|
Cmd []string
|
||||||
|
Env []string // appended to the parent's env; later entries win
|
||||||
|
StopGrace time.Duration // override DefaultStopGrace
|
||||||
|
OnStderr func(line string) // called for every newline-terminated stderr line
|
||||||
|
}
|
||||||
|
|
||||||
|
// Child is a running supervised subprocess.
|
||||||
|
type Child struct {
|
||||||
|
// Stdin accepts newline-framed JSON-RPC messages addressed to the child.
|
||||||
|
// Closing it signals EOF; well-behaved stdio servers exit on EOF.
|
||||||
|
Stdin io.WriteCloser
|
||||||
|
// Stdout streams the child's stdout buffered for line reads.
|
||||||
|
Stdout *bufio.Reader
|
||||||
|
|
||||||
|
cmd *exec.Cmd
|
||||||
|
stopGrace time.Duration
|
||||||
|
|
||||||
|
doneCh chan struct{}
|
||||||
|
mu sync.Mutex
|
||||||
|
exitErr error
|
||||||
|
stopped bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start launches a subprocess according to cfg and returns a Child the
|
||||||
|
// caller can read/write/stop. ctx applies to the spawn itself; once the
|
||||||
|
// child is running, lifecycle is governed by Stop or by the child exiting
|
||||||
|
// on its own.
|
||||||
|
func Start(ctx context.Context, cfg Config) (*Child, error) {
|
||||||
|
if len(cfg.Cmd) == 0 {
|
||||||
|
return nil, errors.New("supervisor: Cmd is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
cmd := exec.CommandContext(ctx, cfg.Cmd[0], cfg.Cmd[1:]...)
|
||||||
|
if len(cfg.Env) > 0 {
|
||||||
|
// Inherit parent env, then append caller overrides. exec uses the
|
||||||
|
// last value when keys collide on Linux, so later cfg.Env entries
|
||||||
|
// win over identically-named parent vars.
|
||||||
|
cmd.Env = append(os.Environ(), cfg.Env...)
|
||||||
|
}
|
||||||
|
|
||||||
|
stdin, err := cmd.StdinPipe()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("supervisor: stdin pipe: %w", err)
|
||||||
|
}
|
||||||
|
stdout, err := cmd.StdoutPipe()
|
||||||
|
if err != nil {
|
||||||
|
_ = stdin.Close()
|
||||||
|
return nil, fmt.Errorf("supervisor: stdout pipe: %w", err)
|
||||||
|
}
|
||||||
|
stderr, err := cmd.StderrPipe()
|
||||||
|
if err != nil {
|
||||||
|
_ = stdin.Close()
|
||||||
|
_ = stdout.Close()
|
||||||
|
return nil, fmt.Errorf("supervisor: stderr pipe: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := cmd.Start(); err != nil {
|
||||||
|
_ = stdin.Close()
|
||||||
|
return nil, fmt.Errorf("supervisor: start %q: %w", cfg.Cmd[0], err)
|
||||||
|
}
|
||||||
|
|
||||||
|
c := &Child{
|
||||||
|
Stdin: stdin,
|
||||||
|
Stdout: bufio.NewReader(stdout),
|
||||||
|
cmd: cmd,
|
||||||
|
stopGrace: cfg.StopGrace,
|
||||||
|
doneCh: make(chan struct{}),
|
||||||
|
}
|
||||||
|
if c.stopGrace <= 0 {
|
||||||
|
c.stopGrace = DefaultStopGrace
|
||||||
|
}
|
||||||
|
|
||||||
|
// Drain stderr so the child never blocks on a full pipe. If the caller
|
||||||
|
// didn't supply an OnStderr callback, we still need to read the bytes —
|
||||||
|
// just throw them away.
|
||||||
|
onStderr := cfg.OnStderr
|
||||||
|
if onStderr == nil {
|
||||||
|
onStderr = func(string) {}
|
||||||
|
}
|
||||||
|
go drainStderr(stderr, onStderr)
|
||||||
|
|
||||||
|
// Reap the child. cmd.Wait must be called exactly once; do it here so
|
||||||
|
// nobody else has to remember.
|
||||||
|
go func() {
|
||||||
|
err := cmd.Wait()
|
||||||
|
c.mu.Lock()
|
||||||
|
c.exitErr = err
|
||||||
|
c.mu.Unlock()
|
||||||
|
close(c.doneCh)
|
||||||
|
}()
|
||||||
|
|
||||||
|
return c, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Done is closed once the child has exited and Wait has been called.
|
||||||
|
// After Done closes, ExitErr is safe to inspect.
|
||||||
|
func (c *Child) Done() <-chan struct{} { return c.doneCh }
|
||||||
|
|
||||||
|
// ExitErr returns the child's exit error, or nil if it exited cleanly.
|
||||||
|
// Calling before Done has closed returns nil and is racy — callers must
|
||||||
|
// wait on Done() first.
|
||||||
|
func (c *Child) ExitErr() error {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
return c.exitErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pid returns the child's process ID, or 0 if the process has already been
|
||||||
|
// released. Useful for log correlation.
|
||||||
|
func (c *Child) Pid() int {
|
||||||
|
if c.cmd.Process == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return c.cmd.Process.Pid
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop attempts to terminate the child. The sequence:
|
||||||
|
// 1. Close Stdin — well-behaved stdio servers exit on EOF.
|
||||||
|
// 2. Send SIGTERM.
|
||||||
|
// 3. Wait up to StopGrace for the child to exit.
|
||||||
|
// 4. If still running (or if stopCtx is canceled), send SIGKILL.
|
||||||
|
// 5. Wait for the reaper goroutine to finish.
|
||||||
|
//
|
||||||
|
// Returns the child's exit error, mirroring ExitErr(). Calling Stop more
|
||||||
|
// than once is safe; subsequent calls are no-ops.
|
||||||
|
func (c *Child) Stop(stopCtx context.Context) error {
|
||||||
|
c.mu.Lock()
|
||||||
|
if c.stopped {
|
||||||
|
c.mu.Unlock()
|
||||||
|
<-c.doneCh
|
||||||
|
return c.exitErr
|
||||||
|
}
|
||||||
|
c.stopped = true
|
||||||
|
c.mu.Unlock()
|
||||||
|
|
||||||
|
// Close stdin first so the child has a chance to drain and exit cleanly.
|
||||||
|
_ = c.Stdin.Close()
|
||||||
|
|
||||||
|
// SIGTERM second. Errors here likely mean the process already exited;
|
||||||
|
// fall through to the wait below either way.
|
||||||
|
_ = c.cmd.Process.Signal(syscall.SIGTERM)
|
||||||
|
|
||||||
|
timer := time.NewTimer(c.stopGrace)
|
||||||
|
defer timer.Stop()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-c.doneCh:
|
||||||
|
return c.ExitErr()
|
||||||
|
case <-stopCtx.Done():
|
||||||
|
// Caller canceled the stop; escalate immediately.
|
||||||
|
case <-timer.C:
|
||||||
|
// Grace expired; escalate.
|
||||||
|
}
|
||||||
|
|
||||||
|
_ = c.cmd.Process.Kill()
|
||||||
|
<-c.doneCh
|
||||||
|
return c.ExitErr()
|
||||||
|
}
|
||||||
|
|
||||||
|
// drainStderr reads stderr line-by-line and forwards each line (without the
|
||||||
|
// trailing newline) to onStderr. Returns silently when the pipe closes.
|
||||||
|
func drainStderr(r io.Reader, onStderr func(string)) {
|
||||||
|
br := bufio.NewReader(r)
|
||||||
|
for {
|
||||||
|
line, err := br.ReadString('\n')
|
||||||
|
if line != "" {
|
||||||
|
// Trim trailing newline; preserve everything else.
|
||||||
|
n := len(line)
|
||||||
|
if n > 0 && line[n-1] == '\n' {
|
||||||
|
line = line[:n-1]
|
||||||
|
}
|
||||||
|
onStderr(line)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
323
internal/supervisor/supervisor_test.go
Normal file
323
internal/supervisor/supervisor_test.go
Normal file
|
|
@ -0,0 +1,323 @@
|
||||||
|
package supervisor_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"os/signal"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"syscall"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"kode.naiv.no/olemd/forgejo-mcp-broker/internal/supervisor"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestMain implements the helper-process pattern: when invoked with
|
||||||
|
// FJMCP_SUPERVISOR_HELPER set, the test binary acts as a child instead of
|
||||||
|
// running tests. This avoids needing a separate helper binary or shell
|
||||||
|
// dependency.
|
||||||
|
func TestMain(m *testing.M) {
|
||||||
|
if mode := os.Getenv("FJMCP_SUPERVISOR_HELPER"); mode != "" {
|
||||||
|
runHelper(mode)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
os.Exit(m.Run())
|
||||||
|
}
|
||||||
|
|
||||||
|
func runHelper(mode string) {
|
||||||
|
switch mode {
|
||||||
|
case "echo":
|
||||||
|
// Echo each stdin line back to stdout.
|
||||||
|
s := bufio.NewScanner(os.Stdin)
|
||||||
|
for s.Scan() {
|
||||||
|
fmt.Println(s.Text())
|
||||||
|
}
|
||||||
|
os.Exit(0)
|
||||||
|
|
||||||
|
case "stderr_at_startup":
|
||||||
|
// Print N lines to stderr at startup, then echo loop.
|
||||||
|
n, _ := strconv.Atoi(os.Getenv("FJMCP_HELPER_N"))
|
||||||
|
if n == 0 {
|
||||||
|
n = 3
|
||||||
|
}
|
||||||
|
for i := 1; i <= n; i++ {
|
||||||
|
fmt.Fprintf(os.Stderr, "stderr line %d\n", i)
|
||||||
|
}
|
||||||
|
s := bufio.NewScanner(os.Stdin)
|
||||||
|
for s.Scan() {
|
||||||
|
fmt.Println(s.Text())
|
||||||
|
}
|
||||||
|
os.Exit(0)
|
||||||
|
|
||||||
|
case "ignore_term":
|
||||||
|
// Install SIGTERM handler that swallows the signal, announce
|
||||||
|
// readiness on stdout (so tests have a sync barrier — the parent
|
||||||
|
// must not send SIGTERM before the handler is in place), then
|
||||||
|
// sleep until SIGKILL.
|
||||||
|
sig := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(sig, syscall.SIGTERM)
|
||||||
|
go func() {
|
||||||
|
for range sig { /* ignore */
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
fmt.Println("ready")
|
||||||
|
time.Sleep(60 * time.Second)
|
||||||
|
os.Exit(0)
|
||||||
|
|
||||||
|
case "exit_zero":
|
||||||
|
os.Exit(0)
|
||||||
|
|
||||||
|
case "exit_nonzero":
|
||||||
|
os.Exit(7)
|
||||||
|
|
||||||
|
default:
|
||||||
|
fmt.Fprintf(os.Stderr, "unknown helper mode %q\n", mode)
|
||||||
|
os.Exit(2)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// helperCmd returns Cmd args + Env that re-exec the test binary as a helper
|
||||||
|
// in the given mode. -test.run=^$ skips all tests (we just want runHelper).
|
||||||
|
func helperCmd(mode string, extraEnv ...string) ([]string, []string) {
|
||||||
|
cmd := []string{os.Args[0], "-test.run=^$"}
|
||||||
|
env := append([]string{"FJMCP_SUPERVISOR_HELPER=" + mode}, extraEnv...)
|
||||||
|
return cmd, env
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStart_RequiresCmd(t *testing.T) {
|
||||||
|
_, err := supervisor.Start(t.Context(), supervisor.Config{})
|
||||||
|
if err == nil || !strings.Contains(err.Error(), "Cmd is required") {
|
||||||
|
t.Errorf("want Cmd-required error, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStart_BadBinary(t *testing.T) {
|
||||||
|
_, err := supervisor.Start(t.Context(), supervisor.Config{
|
||||||
|
Cmd: []string{"/this/path/does/not/exist"},
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected error for missing binary")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEcho_RoundTrip(t *testing.T) {
|
||||||
|
cmd, env := helperCmd("echo")
|
||||||
|
c, err := supervisor.Start(t.Context(), supervisor.Config{Cmd: cmd, Env: env})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Start: %v", err)
|
||||||
|
}
|
||||||
|
defer c.Stop(t.Context())
|
||||||
|
|
||||||
|
if _, err := io.WriteString(c.Stdin, "hello-world\n"); err != nil {
|
||||||
|
t.Fatalf("write: %v", err)
|
||||||
|
}
|
||||||
|
line, err := c.Stdout.ReadString('\n')
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("read: %v", err)
|
||||||
|
}
|
||||||
|
if got := strings.TrimRight(line, "\n"); got != "hello-world" {
|
||||||
|
t.Errorf("read %q, want hello-world", got)
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.Pid() == 0 {
|
||||||
|
t.Error("Pid should be non-zero while child is running")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStderr_LinesDelivered(t *testing.T) {
|
||||||
|
var (
|
||||||
|
mu sync.Mutex
|
||||||
|
lines []string
|
||||||
|
)
|
||||||
|
cmd, env := helperCmd("stderr_at_startup", "FJMCP_HELPER_N=4")
|
||||||
|
c, err := supervisor.Start(t.Context(), supervisor.Config{
|
||||||
|
Cmd: cmd,
|
||||||
|
Env: env,
|
||||||
|
OnStderr: func(line string) {
|
||||||
|
mu.Lock()
|
||||||
|
lines = append(lines, line)
|
||||||
|
mu.Unlock()
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Start: %v", err)
|
||||||
|
}
|
||||||
|
defer c.Stop(t.Context())
|
||||||
|
|
||||||
|
// Helper drops the four stderr lines at startup; close stdin so it exits.
|
||||||
|
_ = c.Stdin.Close()
|
||||||
|
select {
|
||||||
|
case <-c.Done():
|
||||||
|
case <-time.After(3 * time.Second):
|
||||||
|
t.Fatal("child did not exit after stdin close")
|
||||||
|
}
|
||||||
|
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
if len(lines) != 4 {
|
||||||
|
t.Fatalf("collected %d stderr lines, want 4: %v", len(lines), lines)
|
||||||
|
}
|
||||||
|
for i, want := range []string{"stderr line 1", "stderr line 2", "stderr line 3", "stderr line 4"} {
|
||||||
|
if lines[i] != want {
|
||||||
|
t.Errorf("stderr[%d] = %q, want %q", i, lines[i], want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStop_GracefulOnSIGTERM(t *testing.T) {
|
||||||
|
cmd, env := helperCmd("echo")
|
||||||
|
c, err := supervisor.Start(t.Context(), supervisor.Config{Cmd: cmd, Env: env})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Start: %v", err)
|
||||||
|
}
|
||||||
|
start := time.Now()
|
||||||
|
if err := c.Stop(t.Context()); err != nil {
|
||||||
|
// Echo exits cleanly on stdin close; ExitErr should be nil. Some
|
||||||
|
// platforms report SIGTERM as an error if the helper got the signal
|
||||||
|
// before stdin EOF reached it — accept either.
|
||||||
|
t.Logf("Stop returned: %v (acceptable)", err)
|
||||||
|
}
|
||||||
|
if elapsed := time.Since(start); elapsed > 2*time.Second {
|
||||||
|
t.Errorf("Stop took %s, want <2s for a SIGTERM-friendly child", elapsed)
|
||||||
|
}
|
||||||
|
// Done must be closed by now.
|
||||||
|
select {
|
||||||
|
case <-c.Done():
|
||||||
|
default:
|
||||||
|
t.Error("Done should be closed after Stop returns")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStop_EscalatesToSIGKILL(t *testing.T) {
|
||||||
|
cmd, env := helperCmd("ignore_term")
|
||||||
|
c, err := supervisor.Start(t.Context(), supervisor.Config{
|
||||||
|
Cmd: cmd,
|
||||||
|
Env: env,
|
||||||
|
StopGrace: 200 * time.Millisecond,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Start: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for the helper to confirm its SIGTERM handler is installed.
|
||||||
|
// Without this, SIGTERM races signal.Notify and kills the process
|
||||||
|
// outright — the test would then mis-conclude that escalation worked
|
||||||
|
// when actually graceful exit happened.
|
||||||
|
line, err := c.Stdout.ReadString('\n')
|
||||||
|
if err != nil || strings.TrimSpace(line) != "ready" {
|
||||||
|
t.Fatalf("helper readiness sync failed: line=%q err=%v", line, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
err = c.Stop(t.Context())
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
|
||||||
|
// SIGKILL'd processes report a non-nil exit error.
|
||||||
|
if err == nil {
|
||||||
|
t.Error("expected non-nil exit error after SIGKILL escalation")
|
||||||
|
}
|
||||||
|
// Stop should return a tick or two after the grace period — not, say, 60s.
|
||||||
|
if elapsed > 2*time.Second {
|
||||||
|
t.Errorf("Stop took %s, want fast escalation past grace", elapsed)
|
||||||
|
}
|
||||||
|
if elapsed < 150*time.Millisecond {
|
||||||
|
t.Errorf("Stop took only %s, escalated before grace?", elapsed)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStop_IsIdempotent(t *testing.T) {
|
||||||
|
cmd, env := helperCmd("echo")
|
||||||
|
c, err := supervisor.Start(t.Context(), supervisor.Config{Cmd: cmd, Env: env})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Start: %v", err)
|
||||||
|
}
|
||||||
|
if err := c.Stop(t.Context()); err != nil {
|
||||||
|
t.Logf("first Stop: %v", err)
|
||||||
|
}
|
||||||
|
// Second call must not panic and must return promptly.
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
_ = c.Stop(t.Context())
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Error("second Stop hung")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDone_ChildExitsCleanly(t *testing.T) {
|
||||||
|
cmd, env := helperCmd("exit_zero")
|
||||||
|
c, err := supervisor.Start(t.Context(), supervisor.Config{Cmd: cmd, Env: env})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Start: %v", err)
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-c.Done():
|
||||||
|
case <-time.After(3 * time.Second):
|
||||||
|
t.Fatal("Done did not close")
|
||||||
|
}
|
||||||
|
if err := c.ExitErr(); err != nil {
|
||||||
|
t.Errorf("ExitErr = %v, want nil for clean exit", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDone_ChildExitsBadly(t *testing.T) {
|
||||||
|
cmd, env := helperCmd("exit_nonzero")
|
||||||
|
c, err := supervisor.Start(t.Context(), supervisor.Config{Cmd: cmd, Env: env})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Start: %v", err)
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-c.Done():
|
||||||
|
case <-time.After(3 * time.Second):
|
||||||
|
t.Fatal("Done did not close")
|
||||||
|
}
|
||||||
|
err = c.ExitErr()
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("ExitErr = nil, want exit error for non-zero exit")
|
||||||
|
}
|
||||||
|
var exitErr *exec.ExitError
|
||||||
|
if !errors.As(err, &exitErr) {
|
||||||
|
t.Errorf("ExitErr = %v, want *exec.ExitError", err)
|
||||||
|
} else if exitErr.ExitCode() != 7 {
|
||||||
|
t.Errorf("ExitCode = %d, want 7", exitErr.ExitCode())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStart_AppliesEnvOverrides(t *testing.T) {
|
||||||
|
// Verify cfg.Env actually reaches the child by reading FJMCP_HELPER_N
|
||||||
|
// in the stderr_at_startup helper.
|
||||||
|
cmd, env := helperCmd("stderr_at_startup", "FJMCP_HELPER_N=2")
|
||||||
|
|
||||||
|
var collected []string
|
||||||
|
var mu sync.Mutex
|
||||||
|
c, err := supervisor.Start(t.Context(), supervisor.Config{
|
||||||
|
Cmd: cmd,
|
||||||
|
Env: env,
|
||||||
|
OnStderr: func(line string) {
|
||||||
|
mu.Lock()
|
||||||
|
collected = append(collected, line)
|
||||||
|
mu.Unlock()
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Start: %v", err)
|
||||||
|
}
|
||||||
|
_ = c.Stdin.Close()
|
||||||
|
<-c.Done()
|
||||||
|
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
if len(collected) != 2 {
|
||||||
|
t.Errorf("got %d stderr lines, want 2 (env should set N=2): %v", len(collected), collected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue