diff --git a/.beads/issues.jsonl b/.beads/issues.jsonl index a50e221..97c4f7f 100644 --- a/.beads/issues.jsonl +++ b/.beads/issues.jsonl @@ -3,9 +3,9 @@ {"id":"forgejo-mcp-broker-t81","title":"Phase 5a: session registry + spawn-on-initialize","description":"Map Mcp-Session-Id -\u003e supervisor.Child + user metadata. On first initialize for unknown sid, spawn forgejo-mcp with user's Forgejo token in env, bind to bridge. LastActive bumped per request.","acceptance_criteria":"Tests with fake supervisor + fake bridge cover: spawn-on-initialize, reuse for subsequent messages, unknown-sid returns 410, max-sessions cap enforced.","status":"open","priority":1,"issue_type":"task","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:17Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-24T15:45:17Z","dependencies":[{"issue_id":"forgejo-mcp-broker-t81","depends_on_id":"forgejo-mcp-broker-am1","type":"blocks","created_at":"2026-04-24T17:45:29Z","created_by":"Ole-Morten Duesund","metadata":"{}"},{"issue_id":"forgejo-mcp-broker-t81","depends_on_id":"forgejo-mcp-broker-pur","type":"blocks","created_at":"2026-04-24T17:45:30Z","created_by":"Ole-Morten Duesund","metadata":"{}"},{"issue_id":"forgejo-mcp-broker-t81","depends_on_id":"forgejo-mcp-broker-zuq","type":"blocks","created_at":"2026-04-24T17:45:28Z","created_by":"Ole-Morten Duesund","metadata":"{}"}],"dependency_count":3,"dependent_count":2,"comment_count":0} {"id":"forgejo-mcp-broker-xot","title":"Phase 4b: bridge integration test against real forgejo-mcp","description":"Drive the bridge with initialize -\u003e tools/list -\u003e tools/call get_forgejo_mcp_server_version against a real forgejo-mcp subprocess. Validates the opaque-pipe assumption.","acceptance_criteria":"Full handshake, tools/list returns expected set, tools/call returns a version string. Tagged as integration test if runtime exceeds 2s.","status":"open","priority":1,"issue_type":"task","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:16Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-24T15:45:16Z","dependencies":[{"issue_id":"forgejo-mcp-broker-xot","depends_on_id":"forgejo-mcp-broker-am1","type":"blocks","created_at":"2026-04-24T17:45:28Z","created_by":"Ole-Morten Duesund","metadata":"{}"}],"dependency_count":1,"dependent_count":0,"comment_count":0} {"id":"forgejo-mcp-broker-31t","title":"Phase 3b: supervisor stress tests (FD/goroutine/zombie leak detection)","description":"1000 spawn/stop cycles under -race. Verify no FD leak, no goroutine leak (go.uber.org/goleak), no zombies (wait4 returns ECHILD when idle).","acceptance_criteria":"Cycle test passes under -race. FD count stable within a small constant. goleak detects no extra goroutines after test.","status":"open","priority":1,"issue_type":"task","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:15Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-24T15:45:15Z","dependencies":[{"issue_id":"forgejo-mcp-broker-31t","depends_on_id":"forgejo-mcp-broker-zuq","type":"blocks","created_at":"2026-04-24T17:45:26Z","created_by":"Ole-Morten Duesund","metadata":"{}"}],"dependency_count":1,"dependent_count":0,"comment_count":0} -{"id":"forgejo-mcp-broker-am1","title":"Phase 4a: internal/bridge JSON-RPC pipe + SSE writer","description":"Given a supervisor.Child: inbound HTTP JSON -\u003e newline-framed stdin; stdout lines -\u003e SSE frames. Handle client disconnect without killing the child.","acceptance_criteria":"Unit tests with mock Child that echoes: request/response round trip, multiple concurrent requests with correct id routing, client disconnect mid-stream.","status":"open","priority":1,"issue_type":"task","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:15Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-24T15:45:15Z","dependencies":[{"issue_id":"forgejo-mcp-broker-am1","depends_on_id":"forgejo-mcp-broker-zuq","type":"blocks","created_at":"2026-04-24T17:45:27Z","created_by":"Ole-Morten Duesund","metadata":"{}"}],"dependency_count":1,"dependent_count":2,"comment_count":0} +{"id":"forgejo-mcp-broker-am1","title":"Phase 4a: internal/bridge JSON-RPC pipe + SSE writer","description":"Given a supervisor.Child: inbound HTTP JSON -\u003e newline-framed stdin; stdout lines -\u003e SSE frames. Handle client disconnect without killing the child.","acceptance_criteria":"Unit tests with mock Child that echoes: request/response round trip, multiple concurrent requests with correct id routing, client disconnect mid-stream.","status":"in_progress","priority":1,"issue_type":"task","assignee":"Ole-Morten Duesund","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:15Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-27T11:56:15Z","started_at":"2026-04-27T11:56:15Z","dependencies":[{"issue_id":"forgejo-mcp-broker-am1","depends_on_id":"forgejo-mcp-broker-zuq","type":"blocks","created_at":"2026-04-24T17:45:27Z","created_by":"Ole-Morten Duesund","metadata":"{}"}],"dependency_count":1,"dependent_count":2,"comment_count":0} {"id":"forgejo-mcp-broker-wgo","title":"Phase 2e: OAuth security review + attack-path tests","description":"Phase 2 exit gate. Review every handler for classic OAuth vulns (open redirect, code replay, mix-up, token leak in logs, host spoofing). Add at least one test per attack class. Update design.md §8 with findings.","acceptance_criteria":"Review checklist documented. Tests added for: PKCE mismatch, stale code, token absent from log attributes, bad redirect_uri, mismatched state, replay of used code.","status":"open","priority":1,"issue_type":"task","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:14Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-24T15:45:14Z","dependencies":[{"issue_id":"forgejo-mcp-broker-wgo","depends_on_id":"forgejo-mcp-broker-b2o","type":"blocks","created_at":"2026-04-24T17:45:26Z","created_by":"Ole-Morten Duesund","metadata":"{}"},{"issue_id":"forgejo-mcp-broker-wgo","depends_on_id":"forgejo-mcp-broker-pur","type":"blocks","created_at":"2026-04-24T17:45:25Z","created_by":"Ole-Morten Duesund","metadata":"{}"}],"dependency_count":2,"dependent_count":0,"comment_count":0} -{"id":"forgejo-mcp-broker-zuq","title":"Phase 3a: internal/supervisor managed stdio subprocess","description":"Child type: Start, Stop(ctx) with SIGTERM -\u003e grace -\u003e SIGKILL, Wait+reap goroutine (no zombies), stderr drainer with prefix. Protocol-agnostic.","acceptance_criteria":"Unit tests against an echo-loop helper: round trip, graceful stop, kill-after-grace, child-exits-on-own detection, stderr capture. Manual spawn of real forgejo-mcp --transport stdio works.","status":"in_progress","priority":1,"issue_type":"task","assignee":"Ole-Morten Duesund","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:14Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-27T11:32:54Z","started_at":"2026-04-27T11:32:54Z","dependency_count":0,"dependent_count":3,"comment_count":0} +{"id":"forgejo-mcp-broker-zuq","title":"Phase 3a: internal/supervisor managed stdio subprocess","description":"Child type: Start, Stop(ctx) with SIGTERM -\u003e grace -\u003e SIGKILL, Wait+reap goroutine (no zombies), stderr drainer with prefix. Protocol-agnostic.","acceptance_criteria":"Unit tests against an echo-loop helper: round trip, graceful stop, kill-after-grace, child-exits-on-own detection, stderr capture. Manual spawn of real forgejo-mcp --transport stdio works.","status":"closed","priority":1,"issue_type":"task","assignee":"Ole-Morten Duesund","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:14Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-27T11:41:07Z","started_at":"2026-04-27T11:32:54Z","closed_at":"2026-04-27T11:41:07Z","close_reason":"internal/supervisor shipped: Start/Stop/Done/ExitErr/Pid, SIGTERM-\u003egrace-\u003eSIGKILL escalation, mandatory wait-and-reap. Test uses TestMain helper-process pattern; coverage 89.6% on the testable surface.","dependency_count":0,"dependent_count":3,"comment_count":0} {"id":"forgejo-mcp-broker-b2o","title":"Phase 2d: OAuth discovery endpoints (/.well-known/*)","description":"GET /.well-known/oauth-protected-resource and /.well-known/oauth-authorization-server. Issuer URLs MUST derive from cfg.PublicURL, never inbound headers (host-header attack defense per design.md §8).","acceptance_criteria":"Responses validate against RFC 8414/9728 shapes. Issuer URL sourced from config only. supported_scopes matches cfg.ForgejoOAuthScopes.","status":"open","priority":1,"issue_type":"task","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:13Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-24T15:45:13Z","dependencies":[{"issue_id":"forgejo-mcp-broker-b2o","depends_on_id":"forgejo-mcp-broker-pur","type":"blocks","created_at":"2026-04-24T17:45:25Z","created_by":"Ole-Morten Duesund","metadata":"{}"}],"dependency_count":1,"dependent_count":1,"comment_count":0} {"id":"forgejo-mcp-broker-b9i","title":"Phase 2b: internal/forgejo OAuth client","description":"Broker-side OAuth client for upstream Forgejo: authorize URL builder, code-to-token exchange, refresh_token grant, userinfo fetch, revoke. Used by AS callback and refresh machinery. Stateless; caller owns persistence.","acceptance_criteria":"Unit tests with httptest.Server fake Forgejo cover each grant plus error paths (wrong code, expired refresh, revoked token). No state persisted in this package.","status":"closed","priority":1,"issue_type":"task","assignee":"Ole-Morten Duesund","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:12Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-27T11:31:27Z","started_at":"2026-04-27T11:29:17Z","closed_at":"2026-04-27T11:31:27Z","close_reason":"internal/forgejo shipped: AuthorizeURL, ExchangeCode, Refresh, FetchUserInfo. Structured *forgejo.Error for OAuth failures (errors.As-friendly). 95.1% coverage. Stateless — caller owns persistence. Revocation deferred since upstream Forgejo lacks the endpoint.","dependency_count":0,"dependent_count":1,"comment_count":0} {"id":"forgejo-mcp-broker-pur","title":"Phase 2c: internal/oauth AS endpoints (register, authorize, callback, token, revoke)","description":"Five OAuth handlers per design.md §4.1. RFC 7591 DCR with ephemeral client IDs, authorize -\u003e Forgejo delegation, callback minting broker auth codes, token exchange with SHA-256 hashing at rest, revoke. PKCE S256 required.","acceptance_criteria":"End-to-end curl walkthrough from plan.md phase 2 passes. All tokens hashed at rest. Auth codes single-use, 10-min TTL. Rejects missing PKCE, non-S256, wrong verifier, expired codes/tokens. Handler coverage \u003e=80%.","status":"open","priority":1,"issue_type":"task","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:12Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-24T15:45:12Z","dependencies":[{"issue_id":"forgejo-mcp-broker-pur","depends_on_id":"forgejo-mcp-broker-b9i","type":"blocks","created_at":"2026-04-24T17:45:24Z","created_by":"Ole-Morten Duesund","metadata":"{}"},{"issue_id":"forgejo-mcp-broker-pur","depends_on_id":"forgejo-mcp-broker-cpb","type":"blocks","created_at":"2026-04-24T17:45:24Z","created_by":"Ole-Morten Duesund","metadata":"{}"}],"dependency_count":2,"dependent_count":5,"comment_count":0} diff --git a/internal/bridge/bridge.go b/internal/bridge/bridge.go new file mode 100644 index 0000000..bdc3d08 --- /dev/null +++ b/internal/bridge/bridge.go @@ -0,0 +1,261 @@ +// Package bridge connects an HTTP-side MCP client (e.g. Claude.ai over +// streamable HTTP) to a stdio-side MCP server (e.g. forgejo-mcp managed +// by supervisor) by piping JSON-RPC messages. +// +// The bridge is protocol-opaque: it parses each line only enough to extract +// the JSON-RPC `id` field for response routing. Everything else passes +// through verbatim. This is the design.md §5.3 "raw bytes through" +// approach — MCP is JSON-RPC 2.0 on both transports, so we can pipe +// without understanding semantics. +// +// One Bridge wraps one child. Multiple HTTP requests can be in flight +// concurrently; responses are routed back to the right HTTP handler by +// matching their `id` field against pending waiters. +package bridge + +import ( + "bufio" + "encoding/json" + "errors" + "fmt" + "io" + "log/slog" + "net/http" + "sync" + "sync/atomic" +) + +// Bridge owns the message routing for a single supervised child. +type Bridge struct { + stdin io.Writer + stdout *bufio.Reader + done <-chan struct{} + log *slog.Logger + + writeMu sync.Mutex // serializes writes to stdin + pending sync.Map // string(id JSON) -> chan []byte + started atomic.Bool + closed atomic.Bool +} + +// New constructs a Bridge over the supplied pipes. The caller is +// responsible for keeping stdin/stdout alive for the bridge's lifetime +// (typically by holding the supervisor.Child). +func New(stdin io.Writer, stdout *bufio.Reader, done <-chan struct{}, log *slog.Logger) *Bridge { + if log == nil { + log = slog.New(slog.DiscardHandler) + } + return &Bridge{stdin: stdin, stdout: stdout, done: done, log: log} +} + +// Start kicks off the reader goroutine. Must be called exactly once +// before HandleSSE is used; calling twice is a programmer error and is +// silently ignored. +func (b *Bridge) Start() { + if !b.started.CompareAndSwap(false, true) { + return + } + go b.readLoop() +} + +// HandleSSE forwards the request body to the child as one newline-framed +// JSON-RPC line, then streams the matching response back as a single SSE +// event. Returns when the response is delivered, the client disconnects, +// or the child exits. +// +// Errors before SSE headers go out: written as plain HTTP errors with the +// appropriate status. Errors after headers: best-effort write of an SSE +// `error` event, then return. +func (b *Bridge) HandleSSE(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, "read body: "+err.Error(), http.StatusBadRequest) + return + } + if len(body) == 0 { + http.Error(w, "empty body", http.StatusBadRequest) + return + } + + idKey, ok := extractIDKey(body) + if !ok { + // No id → notification. Forward without waiting; reply 204. + // (MCP notifications don't expect a response.) + if err := b.send(body); err != nil { + http.Error(w, "send: "+err.Error(), http.StatusBadGateway) + return + } + w.WriteHeader(http.StatusNoContent) + return + } + + // Register a waiter before sending so a fast child can't reply before + // we're ready to receive. + respCh := make(chan []byte, 1) + if _, loaded := b.pending.LoadOrStore(idKey, respCh); loaded { + http.Error(w, "duplicate in-flight id", http.StatusConflict) + return + } + defer b.pending.Delete(idKey) + + if err := b.send(body); err != nil { + http.Error(w, "send: "+err.Error(), http.StatusBadGateway) + return + } + + // SSE headers must go out before the first event. + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.WriteHeader(http.StatusOK) + flusher, _ := w.(http.Flusher) + if flusher != nil { + flusher.Flush() + } + + select { + case resp := <-respCh: + writeSSEEvent(w, "message", resp) + if flusher != nil { + flusher.Flush() + } + case <-r.Context().Done(): + // Client disconnected. Just return — child keeps running, future + // requests on the same bridge are unaffected. + b.log.Debug("client disconnected mid-stream", slog.String("id", idKey)) + case <-b.done: + writeSSEEvent(w, "error", []byte(`{"reason":"child_exited"}`)) + if flusher != nil { + flusher.Flush() + } + } +} + +// send writes one newline-framed message to the child. Concurrent writes +// to the underlying pipe are not safe (interleaved bytes), so we serialize +// here. +func (b *Bridge) send(msg []byte) error { + b.writeMu.Lock() + defer b.writeMu.Unlock() + if _, err := b.stdin.Write(msg); err != nil { + return err + } + if len(msg) == 0 || msg[len(msg)-1] != '\n' { + if _, err := b.stdin.Write([]byte{'\n'}); err != nil { + return err + } + } + return nil +} + +// readLoop reads one JSON-RPC message per line from the child's stdout +// and routes each to the waiting handler (if any). Lines without an id +// (server-initiated notifications) are dropped on the floor for now — +// phase 5 introduces the GET-stream channel for those. +func (b *Bridge) readLoop() { + for { + line, err := b.stdout.ReadString('\n') + if line != "" { + b.routeResponse(line) + } + if err != nil { + if errors.Is(err, io.EOF) { + b.log.Debug("child stdout closed; bridge reader exiting") + } else { + b.log.Warn("bridge reader error", slog.String("err", err.Error())) + } + b.closed.Store(true) + // Wake any pending waiters so handlers don't hang forever after + // the child dies. + b.pending.Range(func(k, v any) bool { + if ch, ok := v.(chan []byte); ok { + select { + case <-ch: + default: + close(ch) + } + } + return true + }) + return + } + } +} + +func (b *Bridge) routeResponse(line string) { + idKey, ok := extractIDKey([]byte(line)) + if !ok { + // Server-initiated notification or malformed message — log and skip. + b.log.Debug("bridge: dropping un-routable line", slog.Int("len", len(line))) + return + } + v, ok := b.pending.LoadAndDelete(idKey) + if !ok { + // No waiter — request was likely abandoned (client disconnected). + // Logging at debug; this is normal. + b.log.Debug("bridge: response with no waiter", slog.String("id", idKey)) + return + } + ch, _ := v.(chan []byte) + if ch == nil { + return + } + // Non-blocking send: the caller created the channel with buffer 1, so + // this is guaranteed to succeed unless we're racing a Close. + select { + case ch <- []byte(line): + default: + } +} + +// extractIDKey parses the JSON-RPC `id` field and returns its raw JSON +// representation as a routing key. Returns ok=false when the message has +// no id (notification) or fails to parse. +// +// JSON-RPC ids are string, number, or null. Comparing the raw JSON token +// avoids quirks like distinguishing 1 from "1". +func extractIDKey(data []byte) (string, bool) { + var probe struct { + ID json.RawMessage `json:"id"` + } + if err := json.Unmarshal(data, &probe); err != nil { + return "", false + } + if len(probe.ID) == 0 || string(probe.ID) == "null" { + return "", false + } + return string(probe.ID), true +} + +// writeSSEEvent writes one Server-Sent Event to w. The data may be +// multi-line — we re-frame it per the SSE spec (each \n becomes a +// separate `data:` line). For our typical single-line JSON, this is a +// no-op. +func writeSSEEvent(w io.Writer, event string, data []byte) { + fmt.Fprintf(w, "event: %s\n", event) + for _, line := range splitLines(data) { + fmt.Fprintf(w, "data: %s\n", line) + } + _, _ = w.Write([]byte("\n")) +} + +func splitLines(data []byte) []string { + // Strip a single trailing newline (typical for child output). + if n := len(data); n > 0 && data[n-1] == '\n' { + data = data[:n-1] + } + if len(data) == 0 { + return []string{""} + } + out := []string{} + start := 0 + for i, b := range data { + if b == '\n' { + out = append(out, string(data[start:i])) + start = i + 1 + } + } + out = append(out, string(data[start:])) + return out +} + diff --git a/internal/bridge/bridge_test.go b/internal/bridge/bridge_test.go new file mode 100644 index 0000000..4d5fe3b --- /dev/null +++ b/internal/bridge/bridge_test.go @@ -0,0 +1,440 @@ +package bridge_test + +import ( + "bufio" + "context" + "fmt" + "io" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" + + "kode.naiv.no/olemd/forgejo-mcp-broker/internal/bridge" + brokerlog "kode.naiv.no/olemd/forgejo-mcp-broker/internal/log" +) + +// fakeChild simulates a supervised stdio subprocess via io.Pipe pairs. +// Test code writes to stdoutW (acting as the child sending output) and +// reads from stdinR (acting as the child receiving input). +type fakeChild struct { + stdinR *io.PipeReader + stdinW *io.PipeWriter + stdoutR *io.PipeReader + stdoutW *io.PipeWriter + done chan struct{} +} + +func newFakeChild() *fakeChild { + stdinR, stdinW := io.Pipe() + stdoutR, stdoutW := io.Pipe() + return &fakeChild{ + stdinR: stdinR, + stdinW: stdinW, + stdoutR: stdoutR, + stdoutW: stdoutW, + done: make(chan struct{}), + } +} + +// kill closes the stdout pipe to simulate the child exiting. +func (f *fakeChild) kill() { + _ = f.stdoutW.Close() + close(f.done) +} + +// readChildStdin reads one newline-terminated line from what the bridge +// sent to the child. +func (f *fakeChild) readChildStdin(t *testing.T) string { + t.Helper() + br := bufio.NewReader(f.stdinR) + line, err := br.ReadString('\n') + if err != nil { + t.Fatalf("read child stdin: %v", err) + } + return line +} + +// writeChildStdout writes a line as if the child produced it. Adds the +// trailing newline if missing. +func (f *fakeChild) writeChildStdout(t *testing.T, line string) { + t.Helper() + if !strings.HasSuffix(line, "\n") { + line += "\n" + } + if _, err := io.WriteString(f.stdoutW, line); err != nil { + t.Fatalf("write child stdout: %v", err) + } +} + +func newBridge(t *testing.T, fc *fakeChild) *bridge.Bridge { + t.Helper() + b := bridge.New(fc.stdinW, bufio.NewReader(fc.stdoutR), fc.done, brokerlog.Discard()) + b.Start() + t.Cleanup(func() { + _ = fc.stdoutW.Close() + _ = fc.stdinR.Close() + }) + return b +} + +func TestHandleSSE_RoundTrip(t *testing.T) { + fc := newFakeChild() + b := newBridge(t, fc) + + srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE)) + t.Cleanup(srv.Close) + + // Drive the child side: read what the bridge sends, reply with a + // JSON-RPC response carrying the same id. + go func() { + _ = fc.readChildStdin(t) + fc.writeChildStdout(t, `{"jsonrpc":"2.0","id":1,"result":{"ok":true}}`) + }() + + resp, err := http.Post(srv.URL, "application/json", + strings.NewReader(`{"jsonrpc":"2.0","id":1,"method":"ping"}`)) + if err != nil { + t.Fatalf("POST: %v", err) + } + body, _ := io.ReadAll(resp.Body) + _ = resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + t.Errorf("status = %d, want 200; body: %s", resp.StatusCode, body) + } + if ct := resp.Header.Get("Content-Type"); ct != "text/event-stream" { + t.Errorf("Content-Type = %q, want text/event-stream", ct) + } + got := string(body) + if !strings.Contains(got, "event: message") { + t.Errorf("body missing message event: %q", got) + } + if !strings.Contains(got, `"result":{"ok":true}`) { + t.Errorf("body missing JSON-RPC payload: %q", got) + } +} + +func TestHandleSSE_ConcurrentRequests_RoutedById(t *testing.T) { + fc := newFakeChild() + b := newBridge(t, fc) + + srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE)) + t.Cleanup(srv.Close) + + // Driver: read incoming messages from the bridge in lockstep and + // reply to each by id. + var seen sync.Map + go func() { + br := bufio.NewReader(fc.stdinR) + for { + line, err := br.ReadString('\n') + if err != nil { + return + } + // Trivial id extraction: search for "id":N. Sufficient for the + // known message shape used in this test. + id := extractTestID(line) + seen.Store(id, true) + fc.writeChildStdout(t, fmt.Sprintf(`{"jsonrpc":"2.0","id":%s,"result":%s}`+"\n", id, id)) + } + }() + + // Fire 10 concurrent requests with distinct ids. + const N = 10 + var wg sync.WaitGroup + results := make([]string, N) + for i := 0; i < N; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + body := fmt.Sprintf(`{"jsonrpc":"2.0","id":%d,"method":"echo"}`, i) + resp, err := http.Post(srv.URL, "application/json", strings.NewReader(body)) + if err != nil { + t.Errorf("POST id=%d: %v", i, err) + return + } + b, _ := io.ReadAll(resp.Body) + _ = resp.Body.Close() + results[i] = string(b) + }(i) + } + wg.Wait() + + for i, body := range results { + want := fmt.Sprintf(`"result":%d`, i) + if !strings.Contains(body, want) { + t.Errorf("id=%d body missing %q, got: %s", i, want, body) + } + } +} + +func TestHandleSSE_NotificationReturns204(t *testing.T) { + fc := newFakeChild() + b := newBridge(t, fc) + srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE)) + t.Cleanup(srv.Close) + + // Drain the child's stdin so the write doesn't block. + gotLine := make(chan string, 1) + go func() { gotLine <- fc.readChildStdin(t) }() + + resp, err := http.Post(srv.URL, "application/json", + strings.NewReader(`{"jsonrpc":"2.0","method":"notify","params":{}}`)) + if err != nil { + t.Fatalf("POST: %v", err) + } + _ = resp.Body.Close() + if resp.StatusCode != http.StatusNoContent { + t.Errorf("status = %d, want 204", resp.StatusCode) + } + select { + case line := <-gotLine: + if !strings.Contains(line, `"method":"notify"`) { + t.Errorf("forwarded line missing method: %q", line) + } + case <-time.After(time.Second): + t.Error("notification was not forwarded to child") + } +} + +func TestHandleSSE_EmptyBody_400(t *testing.T) { + fc := newFakeChild() + b := newBridge(t, fc) + srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE)) + t.Cleanup(srv.Close) + + resp, err := http.Post(srv.URL, "application/json", strings.NewReader("")) + if err != nil { + t.Fatalf("POST: %v", err) + } + _ = resp.Body.Close() + if resp.StatusCode != http.StatusBadRequest { + t.Errorf("status = %d, want 400", resp.StatusCode) + } +} + +func TestHandleSSE_DuplicateInflightID_409(t *testing.T) { + fc := newFakeChild() + b := newBridge(t, fc) + srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE)) + t.Cleanup(srv.Close) + + // First request blocks waiting for child reply (we never send one). + first := make(chan *http.Response, 1) + firstCtx, firstCancel := context.WithCancel(t.Context()) + defer firstCancel() + go func() { + req, _ := http.NewRequestWithContext(firstCtx, http.MethodPost, srv.URL, + strings.NewReader(`{"jsonrpc":"2.0","id":1,"method":"slow"}`)) + resp, _ := http.DefaultClient.Do(req) + first <- resp + }() + + // Drain the child stdin so the first request can register its waiter. + go func() { _ = fc.readChildStdin(t) }() + + // Wait briefly for the first request's waiter to register. + time.Sleep(100 * time.Millisecond) + + // Second request with same id must 409. + resp, err := http.Post(srv.URL, "application/json", + strings.NewReader(`{"jsonrpc":"2.0","id":1,"method":"slow"}`)) + if err != nil { + t.Fatalf("second POST: %v", err) + } + _ = resp.Body.Close() + if resp.StatusCode != http.StatusConflict { + t.Errorf("status = %d, want 409", resp.StatusCode) + } + + // Cleanup: cancel the first request so it doesn't leak. + firstCancel() + <-first +} + +func TestHandleSSE_ClientDisconnectMidStream_DoesNotKillChild(t *testing.T) { + fc := newFakeChild() + b := newBridge(t, fc) + srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE)) + t.Cleanup(srv.Close) + + // Drain child stdin so the bridge can register the waiter. + go func() { _ = fc.readChildStdin(t) }() + + ctx, cancel := context.WithCancel(t.Context()) + req, _ := http.NewRequestWithContext(ctx, http.MethodPost, srv.URL, + strings.NewReader(`{"jsonrpc":"2.0","id":42,"method":"slow"}`)) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("POST: %v", err) + } + // Disconnect immediately. + _ = resp.Body.Close() + cancel() + + // Bridge should still be functional for a new request. + go func() { + _ = fc.readChildStdin(t) + fc.writeChildStdout(t, `{"jsonrpc":"2.0","id":99,"result":"alive"}`) + }() + + resp2, err := http.Post(srv.URL, "application/json", + strings.NewReader(`{"jsonrpc":"2.0","id":99,"method":"ping"}`)) + if err != nil { + t.Fatalf("second POST after disconnect: %v", err) + } + body, _ := io.ReadAll(resp2.Body) + _ = resp2.Body.Close() + if !strings.Contains(string(body), `"result":"alive"`) { + t.Errorf("post-disconnect request did not get response: %s", body) + } +} + +func TestHandleSSE_ChildExits_SendsErrorEvent(t *testing.T) { + fc := newFakeChild() + b := newBridge(t, fc) + srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE)) + t.Cleanup(srv.Close) + + go func() { + _ = fc.readChildStdin(t) + // Don't reply; just kill the child. + time.Sleep(50 * time.Millisecond) + fc.kill() + }() + + resp, err := http.Post(srv.URL, "application/json", + strings.NewReader(`{"jsonrpc":"2.0","id":7,"method":"will_fail"}`)) + if err != nil { + t.Fatalf("POST: %v", err) + } + body, _ := io.ReadAll(resp.Body) + _ = resp.Body.Close() + + if !strings.Contains(string(body), "event: error") { + t.Errorf("body should contain error event when child exits: %s", body) + } + if !strings.Contains(string(body), "child_exited") { + t.Errorf("body should mention child_exited: %s", body) + } +} + +func TestHandleSSE_StartTwice_NoOp(t *testing.T) { + // Calling Start twice must not panic or spawn a second reader. + fc := newFakeChild() + b := bridge.New(fc.stdinW, bufio.NewReader(fc.stdoutR), fc.done, brokerlog.Discard()) + b.Start() + b.Start() // should be a no-op + t.Cleanup(func() { + _ = fc.stdoutW.Close() + _ = fc.stdinR.Close() + }) +} + +func TestReadLoop_UnsolicitedResponseDropped(t *testing.T) { + // Stale/unsolicited response with an id we never sent must not crash + // the reader; subsequent in-flight requests should still work. + fc := newFakeChild() + b := newBridge(t, fc) + + fc.writeChildStdout(t, `{"jsonrpc":"2.0","id":99,"result":"stale"}`) + // Give the reader a tick to consume the unsolicited line. + time.Sleep(50 * time.Millisecond) + + srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE)) + t.Cleanup(srv.Close) + + go func() { + _ = fc.readChildStdin(t) + fc.writeChildStdout(t, `{"jsonrpc":"2.0","id":1,"result":"after"}`) + }() + resp, err := http.Post(srv.URL, "application/json", + strings.NewReader(`{"jsonrpc":"2.0","id":1,"method":"ping"}`)) + if err != nil { + t.Fatalf("POST: %v", err) + } + body, _ := io.ReadAll(resp.Body) + _ = resp.Body.Close() + if !strings.Contains(string(body), `"result":"after"`) { + t.Errorf("stale unsolicited response broke later request: %s", body) + } +} + +func TestReadLoop_MalformedLinesDropped(t *testing.T) { + // Garbage from the child (not JSON, or JSON without id) is silently + // dropped — must not break the reader. + fc := newFakeChild() + b := newBridge(t, fc) + + fc.writeChildStdout(t, "not json at all") + fc.writeChildStdout(t, `{"jsonrpc":"2.0","method":"server-notification"}`) // no id + time.Sleep(50 * time.Millisecond) + + srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE)) + t.Cleanup(srv.Close) + + go func() { + _ = fc.readChildStdin(t) + fc.writeChildStdout(t, `{"jsonrpc":"2.0","id":1,"result":"valid"}`) + }() + resp, _ := http.Post(srv.URL, "application/json", + strings.NewReader(`{"jsonrpc":"2.0","id":1,"method":"ping"}`)) + body, _ := io.ReadAll(resp.Body) + _ = resp.Body.Close() + if !strings.Contains(string(body), `"result":"valid"`) { + t.Errorf("malformed lines broke subsequent request: %s", body) + } +} + +func TestNew_NilLogger_NoPanic(t *testing.T) { + fc := newFakeChild() + b := bridge.New(fc.stdinW, bufio.NewReader(fc.stdoutR), fc.done, nil) + b.Start() + t.Cleanup(func() { + _ = fc.stdoutW.Close() + _ = fc.stdinR.Close() + }) +} + +func TestHandleSSE_SendError_BadGateway(t *testing.T) { + // If writing to the child fails (e.g. broken pipe), the handler must + // surface a 502 rather than appear to succeed. + fc := newFakeChild() + b := newBridge(t, fc) + srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE)) + t.Cleanup(srv.Close) + + // Close the read end of the stdin pipe — bridge's writes will hit + // io.ErrClosedPipe. + _ = fc.stdinR.Close() + + resp, err := http.Post(srv.URL, "application/json", + strings.NewReader(`{"jsonrpc":"2.0","id":1,"method":"ping"}`)) + if err != nil { + t.Fatalf("POST: %v", err) + } + _ = resp.Body.Close() + if resp.StatusCode != http.StatusBadGateway { + t.Errorf("status = %d, want 502", resp.StatusCode) + } +} + +// extractTestID is a very small JSON probe — it returns the substring +// after `"id":` and before the first comma or closing brace. Sufficient +// for the controlled message shape used in the concurrent-requests test. +func extractTestID(line string) string { + idx := strings.Index(line, `"id":`) + if idx < 0 { + return "" + } + rest := line[idx+5:] + end := strings.IndexAny(rest, ",}") + if end < 0 { + return "" + } + return strings.TrimSpace(rest[:end]) +}