// Package supervisor manages a single long-running stdio subprocess on the // broker's behalf — typically `forgejo-mcp --transport stdio`, but the // implementation is protocol-agnostic. // // Lifecycle: Start spawns the child and wires three pipes (stdin/stdout for // JSON-RPC, stderr drained line-by-line into a caller-supplied callback). // A goroutine calls cmd.Wait so the kernel reaps the process — without it // you collect zombies. Stop SIGTERMs the child, waits a grace period, and // escalates to SIGKILL only if the child refuses to exit. // // The package owns no protocol knowledge. Bridge code (phase 4) writes // newline-framed JSON-RPC messages to Stdin and reads them back from // Stdout; this package never inspects them. package supervisor import ( "bufio" "context" "errors" "fmt" "io" "os" "os/exec" "sync" "syscall" "time" ) // DefaultStopGrace is the time Stop waits between SIGTERM and SIGKILL. const DefaultStopGrace = 5 * time.Second // Config describes a child to supervise. Cmd is required and Cmd[0] is the // binary; Cmd[1:] are arguments. Everything else is optional. type Config struct { Cmd []string Env []string // appended to the parent's env; later entries win StopGrace time.Duration // override DefaultStopGrace OnStderr func(line string) // called for every newline-terminated stderr line } // Child is a running supervised subprocess. type Child struct { // Stdin accepts newline-framed JSON-RPC messages addressed to the child. // Closing it signals EOF; well-behaved stdio servers exit on EOF. Stdin io.WriteCloser // Stdout streams the child's stdout buffered for line reads. Stdout *bufio.Reader cmd *exec.Cmd stopGrace time.Duration doneCh chan struct{} mu sync.Mutex exitErr error stopped bool } // Start launches a subprocess according to cfg and returns a Child the // caller can read/write/stop. ctx applies to the spawn itself; once the // child is running, lifecycle is governed by Stop or by the child exiting // on its own. func Start(ctx context.Context, cfg Config) (*Child, error) { if len(cfg.Cmd) == 0 { return nil, errors.New("supervisor: Cmd is required") } cmd := exec.CommandContext(ctx, cfg.Cmd[0], cfg.Cmd[1:]...) if len(cfg.Env) > 0 { // Inherit parent env, then append caller overrides. exec uses the // last value when keys collide on Linux, so later cfg.Env entries // win over identically-named parent vars. cmd.Env = append(os.Environ(), cfg.Env...) } stdin, err := cmd.StdinPipe() if err != nil { return nil, fmt.Errorf("supervisor: stdin pipe: %w", err) } stdout, err := cmd.StdoutPipe() if err != nil { _ = stdin.Close() return nil, fmt.Errorf("supervisor: stdout pipe: %w", err) } stderr, err := cmd.StderrPipe() if err != nil { _ = stdin.Close() _ = stdout.Close() return nil, fmt.Errorf("supervisor: stderr pipe: %w", err) } if err := cmd.Start(); err != nil { _ = stdin.Close() return nil, fmt.Errorf("supervisor: start %q: %w", cfg.Cmd[0], err) } // Close-handles we may need to clean up explicitly post-Wait. Some // kernels don't deliver EOF to drainStderr promptly under load when // using cmd.StderrPipe; an explicit Close after Wait ensures // drainStderr exits and FDs aren't leaked across high-frequency // spawn/reap cycles. stdoutCloser, _ := stdout.(io.Closer) stderrCloser, _ := stderr.(io.Closer) c := &Child{ Stdin: stdin, Stdout: bufio.NewReader(stdout), cmd: cmd, stopGrace: cfg.StopGrace, doneCh: make(chan struct{}), } if c.stopGrace <= 0 { c.stopGrace = DefaultStopGrace } // Drain stderr so the child never blocks on a full pipe. If the caller // didn't supply an OnStderr callback, we still need to read the bytes — // just throw them away. onStderr := cfg.OnStderr if onStderr == nil { onStderr = func(string) {} } go drainStderr(stderr, onStderr) // Reap the child. cmd.Wait must be called exactly once; do it here so // nobody else has to remember. After Wait, defensively close the // stdout/stderr handles so drainStderr definitely exits and our // reference count drops to zero — important under stress loads where // the kernel doesn't always deliver EOF promptly. go func() { err := cmd.Wait() if stdoutCloser != nil { _ = stdoutCloser.Close() } if stderrCloser != nil { _ = stderrCloser.Close() } c.mu.Lock() c.exitErr = err c.mu.Unlock() close(c.doneCh) }() return c, nil } // Done is closed once the child has exited and Wait has been called. // After Done closes, ExitErr is safe to inspect. func (c *Child) Done() <-chan struct{} { return c.doneCh } // ExitErr returns the child's exit error, or nil if it exited cleanly. // Calling before Done has closed returns nil and is racy — callers must // wait on Done() first. func (c *Child) ExitErr() error { c.mu.Lock() defer c.mu.Unlock() return c.exitErr } // Pid returns the child's process ID, or 0 if the process has already been // released. Useful for log correlation. func (c *Child) Pid() int { if c.cmd.Process == nil { return 0 } return c.cmd.Process.Pid } // Stop attempts to terminate the child. The sequence: // 1. Close Stdin — well-behaved stdio servers exit on EOF. // 2. Send SIGTERM. // 3. Wait up to StopGrace for the child to exit. // 4. If still running (or if stopCtx is canceled), send SIGKILL. // 5. Wait for the reaper goroutine to finish. // // Returns the child's exit error, mirroring ExitErr(). Calling Stop more // than once is safe; subsequent calls are no-ops. func (c *Child) Stop(stopCtx context.Context) error { c.mu.Lock() if c.stopped { c.mu.Unlock() <-c.doneCh return c.exitErr } c.stopped = true c.mu.Unlock() // Close stdin first so the child has a chance to drain and exit cleanly. _ = c.Stdin.Close() // SIGTERM second. Errors here likely mean the process already exited; // fall through to the wait below either way. _ = c.cmd.Process.Signal(syscall.SIGTERM) timer := time.NewTimer(c.stopGrace) defer timer.Stop() select { case <-c.doneCh: return c.ExitErr() case <-stopCtx.Done(): // Caller canceled the stop; escalate immediately. case <-timer.C: // Grace expired; escalate. } _ = c.cmd.Process.Kill() <-c.doneCh return c.ExitErr() } // drainStderr reads stderr line-by-line and forwards each line (without the // trailing newline) to onStderr. Returns silently when the pipe closes. func drainStderr(r io.Reader, onStderr func(string)) { br := bufio.NewReader(r) for { line, err := br.ReadString('\n') if line != "" { // Trim trailing newline; preserve everything else. n := len(line) if n > 0 && line[n-1] == '\n' { line = line[:n-1] } onStderr(line) } if err != nil { return } } }