forgejo-mcp-broker/internal/bridge/bridge_test.go

440 lines
12 KiB
Go
Raw Permalink Normal View History

feat(bridge): JSON-RPC pipe + SSE writer (forgejo-mcp-broker-am1) Adds internal/bridge: connects HTTP-side MCP clients to a stdio-side child via JSON-RPC framing. Decoupled from internal/supervisor — takes io.Writer + *bufio.Reader + done channel directly so it tests cleanly with io.Pipe pairs and could later wrap something other than a child process. Routing model: one reader goroutine consumes child stdout line-by-line. Each line is parsed only enough to extract the JSON-RPC `id` field (string/number/null kept as raw JSON, so `1` and `"1"` don't collide). HTTP requests register a per-id waiter channel before forwarding their body to the child; the reader delivers the response to whichever waiter matches. Concurrent in-flight requests are safe; a duplicate id while the first is still pending returns 409. HandleSSE response shapes: - request with id + child reply → 200 text/event-stream, one `event: message` SSE event carrying the JSON-RPC response - request without id (notification) → 204 No Content (no waiter needed; MCP notifications are fire-and-forget) - empty body → 400 - duplicate in-flight id → 409 - send-to-child fails → 502 - client disconnect mid-wait → bridge cleans up its waiter; child keeps running, other in-flight requests unaffected - child exits before reply → SSE `error` event with reason=child_exited Tests cover all of the above plus stale unsolicited replies, malformed lines from the child, and reader robustness across both. 90.0% coverage. The remaining gap is splitLines' empty-data branch (only reachable if the child sends a literal `\n` line). Closes forgejo-mcp-broker-am1. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 13:59:28 +02:00
package bridge_test
import (
"bufio"
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"
"kode.naiv.no/olemd/forgejo-mcp-broker/internal/bridge"
brokerlog "kode.naiv.no/olemd/forgejo-mcp-broker/internal/log"
)
// fakeChild simulates a supervised stdio subprocess via io.Pipe pairs.
// Test code writes to stdoutW (acting as the child sending output) and
// reads from stdinR (acting as the child receiving input).
type fakeChild struct {
stdinR *io.PipeReader
stdinW *io.PipeWriter
stdoutR *io.PipeReader
stdoutW *io.PipeWriter
done chan struct{}
}
func newFakeChild() *fakeChild {
stdinR, stdinW := io.Pipe()
stdoutR, stdoutW := io.Pipe()
return &fakeChild{
stdinR: stdinR,
stdinW: stdinW,
stdoutR: stdoutR,
stdoutW: stdoutW,
done: make(chan struct{}),
}
}
// kill closes the stdout pipe to simulate the child exiting.
func (f *fakeChild) kill() {
_ = f.stdoutW.Close()
close(f.done)
}
// readChildStdin reads one newline-terminated line from what the bridge
// sent to the child.
func (f *fakeChild) readChildStdin(t *testing.T) string {
t.Helper()
br := bufio.NewReader(f.stdinR)
line, err := br.ReadString('\n')
if err != nil {
t.Fatalf("read child stdin: %v", err)
}
return line
}
// writeChildStdout writes a line as if the child produced it. Adds the
// trailing newline if missing.
func (f *fakeChild) writeChildStdout(t *testing.T, line string) {
t.Helper()
if !strings.HasSuffix(line, "\n") {
line += "\n"
}
if _, err := io.WriteString(f.stdoutW, line); err != nil {
t.Fatalf("write child stdout: %v", err)
}
}
func newBridge(t *testing.T, fc *fakeChild) *bridge.Bridge {
t.Helper()
b := bridge.New(fc.stdinW, bufio.NewReader(fc.stdoutR), fc.done, brokerlog.Discard())
b.Start()
t.Cleanup(func() {
_ = fc.stdoutW.Close()
_ = fc.stdinR.Close()
})
return b
}
func TestHandleSSE_RoundTrip(t *testing.T) {
fc := newFakeChild()
b := newBridge(t, fc)
srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE))
t.Cleanup(srv.Close)
// Drive the child side: read what the bridge sends, reply with a
// JSON-RPC response carrying the same id.
go func() {
_ = fc.readChildStdin(t)
fc.writeChildStdout(t, `{"jsonrpc":"2.0","id":1,"result":{"ok":true}}`)
}()
resp, err := http.Post(srv.URL, "application/json",
strings.NewReader(`{"jsonrpc":"2.0","id":1,"method":"ping"}`))
if err != nil {
t.Fatalf("POST: %v", err)
}
body, _ := io.ReadAll(resp.Body)
_ = resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Errorf("status = %d, want 200; body: %s", resp.StatusCode, body)
}
if ct := resp.Header.Get("Content-Type"); ct != "text/event-stream" {
t.Errorf("Content-Type = %q, want text/event-stream", ct)
}
got := string(body)
if !strings.Contains(got, "event: message") {
t.Errorf("body missing message event: %q", got)
}
if !strings.Contains(got, `"result":{"ok":true}`) {
t.Errorf("body missing JSON-RPC payload: %q", got)
}
}
func TestHandleSSE_ConcurrentRequests_RoutedById(t *testing.T) {
fc := newFakeChild()
b := newBridge(t, fc)
srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE))
t.Cleanup(srv.Close)
// Driver: read incoming messages from the bridge in lockstep and
// reply to each by id.
var seen sync.Map
go func() {
br := bufio.NewReader(fc.stdinR)
for {
line, err := br.ReadString('\n')
if err != nil {
return
}
// Trivial id extraction: search for "id":N. Sufficient for the
// known message shape used in this test.
id := extractTestID(line)
seen.Store(id, true)
fc.writeChildStdout(t, fmt.Sprintf(`{"jsonrpc":"2.0","id":%s,"result":%s}`+"\n", id, id))
}
}()
// Fire 10 concurrent requests with distinct ids.
const N = 10
var wg sync.WaitGroup
results := make([]string, N)
for i := 0; i < N; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
body := fmt.Sprintf(`{"jsonrpc":"2.0","id":%d,"method":"echo"}`, i)
resp, err := http.Post(srv.URL, "application/json", strings.NewReader(body))
if err != nil {
t.Errorf("POST id=%d: %v", i, err)
return
}
b, _ := io.ReadAll(resp.Body)
_ = resp.Body.Close()
results[i] = string(b)
}(i)
}
wg.Wait()
for i, body := range results {
want := fmt.Sprintf(`"result":%d`, i)
if !strings.Contains(body, want) {
t.Errorf("id=%d body missing %q, got: %s", i, want, body)
}
}
}
func TestHandleSSE_NotificationReturns204(t *testing.T) {
fc := newFakeChild()
b := newBridge(t, fc)
srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE))
t.Cleanup(srv.Close)
// Drain the child's stdin so the write doesn't block.
gotLine := make(chan string, 1)
go func() { gotLine <- fc.readChildStdin(t) }()
resp, err := http.Post(srv.URL, "application/json",
strings.NewReader(`{"jsonrpc":"2.0","method":"notify","params":{}}`))
if err != nil {
t.Fatalf("POST: %v", err)
}
_ = resp.Body.Close()
if resp.StatusCode != http.StatusNoContent {
t.Errorf("status = %d, want 204", resp.StatusCode)
}
select {
case line := <-gotLine:
if !strings.Contains(line, `"method":"notify"`) {
t.Errorf("forwarded line missing method: %q", line)
}
case <-time.After(time.Second):
t.Error("notification was not forwarded to child")
}
}
func TestHandleSSE_EmptyBody_400(t *testing.T) {
fc := newFakeChild()
b := newBridge(t, fc)
srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE))
t.Cleanup(srv.Close)
resp, err := http.Post(srv.URL, "application/json", strings.NewReader(""))
if err != nil {
t.Fatalf("POST: %v", err)
}
_ = resp.Body.Close()
if resp.StatusCode != http.StatusBadRequest {
t.Errorf("status = %d, want 400", resp.StatusCode)
}
}
func TestHandleSSE_DuplicateInflightID_409(t *testing.T) {
fc := newFakeChild()
b := newBridge(t, fc)
srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE))
t.Cleanup(srv.Close)
// First request blocks waiting for child reply (we never send one).
first := make(chan *http.Response, 1)
firstCtx, firstCancel := context.WithCancel(t.Context())
defer firstCancel()
go func() {
req, _ := http.NewRequestWithContext(firstCtx, http.MethodPost, srv.URL,
strings.NewReader(`{"jsonrpc":"2.0","id":1,"method":"slow"}`))
resp, _ := http.DefaultClient.Do(req)
first <- resp
}()
// Drain the child stdin so the first request can register its waiter.
go func() { _ = fc.readChildStdin(t) }()
// Wait briefly for the first request's waiter to register.
time.Sleep(100 * time.Millisecond)
// Second request with same id must 409.
resp, err := http.Post(srv.URL, "application/json",
strings.NewReader(`{"jsonrpc":"2.0","id":1,"method":"slow"}`))
if err != nil {
t.Fatalf("second POST: %v", err)
}
_ = resp.Body.Close()
if resp.StatusCode != http.StatusConflict {
t.Errorf("status = %d, want 409", resp.StatusCode)
}
// Cleanup: cancel the first request so it doesn't leak.
firstCancel()
<-first
}
func TestHandleSSE_ClientDisconnectMidStream_DoesNotKillChild(t *testing.T) {
fc := newFakeChild()
b := newBridge(t, fc)
srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE))
t.Cleanup(srv.Close)
// Drain child stdin so the bridge can register the waiter.
go func() { _ = fc.readChildStdin(t) }()
ctx, cancel := context.WithCancel(t.Context())
req, _ := http.NewRequestWithContext(ctx, http.MethodPost, srv.URL,
strings.NewReader(`{"jsonrpc":"2.0","id":42,"method":"slow"}`))
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("POST: %v", err)
}
// Disconnect immediately.
_ = resp.Body.Close()
cancel()
// Bridge should still be functional for a new request.
go func() {
_ = fc.readChildStdin(t)
fc.writeChildStdout(t, `{"jsonrpc":"2.0","id":99,"result":"alive"}`)
}()
resp2, err := http.Post(srv.URL, "application/json",
strings.NewReader(`{"jsonrpc":"2.0","id":99,"method":"ping"}`))
if err != nil {
t.Fatalf("second POST after disconnect: %v", err)
}
body, _ := io.ReadAll(resp2.Body)
_ = resp2.Body.Close()
if !strings.Contains(string(body), `"result":"alive"`) {
t.Errorf("post-disconnect request did not get response: %s", body)
}
}
func TestHandleSSE_ChildExits_SendsErrorEvent(t *testing.T) {
fc := newFakeChild()
b := newBridge(t, fc)
srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE))
t.Cleanup(srv.Close)
go func() {
_ = fc.readChildStdin(t)
// Don't reply; just kill the child.
time.Sleep(50 * time.Millisecond)
fc.kill()
}()
resp, err := http.Post(srv.URL, "application/json",
strings.NewReader(`{"jsonrpc":"2.0","id":7,"method":"will_fail"}`))
if err != nil {
t.Fatalf("POST: %v", err)
}
body, _ := io.ReadAll(resp.Body)
_ = resp.Body.Close()
if !strings.Contains(string(body), "event: error") {
t.Errorf("body should contain error event when child exits: %s", body)
}
if !strings.Contains(string(body), "child_exited") {
t.Errorf("body should mention child_exited: %s", body)
}
}
func TestHandleSSE_StartTwice_NoOp(t *testing.T) {
// Calling Start twice must not panic or spawn a second reader.
fc := newFakeChild()
b := bridge.New(fc.stdinW, bufio.NewReader(fc.stdoutR), fc.done, brokerlog.Discard())
b.Start()
b.Start() // should be a no-op
t.Cleanup(func() {
_ = fc.stdoutW.Close()
_ = fc.stdinR.Close()
})
}
func TestReadLoop_UnsolicitedResponseDropped(t *testing.T) {
// Stale/unsolicited response with an id we never sent must not crash
// the reader; subsequent in-flight requests should still work.
fc := newFakeChild()
b := newBridge(t, fc)
fc.writeChildStdout(t, `{"jsonrpc":"2.0","id":99,"result":"stale"}`)
// Give the reader a tick to consume the unsolicited line.
time.Sleep(50 * time.Millisecond)
srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE))
t.Cleanup(srv.Close)
go func() {
_ = fc.readChildStdin(t)
fc.writeChildStdout(t, `{"jsonrpc":"2.0","id":1,"result":"after"}`)
}()
resp, err := http.Post(srv.URL, "application/json",
strings.NewReader(`{"jsonrpc":"2.0","id":1,"method":"ping"}`))
if err != nil {
t.Fatalf("POST: %v", err)
}
body, _ := io.ReadAll(resp.Body)
_ = resp.Body.Close()
if !strings.Contains(string(body), `"result":"after"`) {
t.Errorf("stale unsolicited response broke later request: %s", body)
}
}
func TestReadLoop_MalformedLinesDropped(t *testing.T) {
// Garbage from the child (not JSON, or JSON without id) is silently
// dropped — must not break the reader.
fc := newFakeChild()
b := newBridge(t, fc)
fc.writeChildStdout(t, "not json at all")
fc.writeChildStdout(t, `{"jsonrpc":"2.0","method":"server-notification"}`) // no id
time.Sleep(50 * time.Millisecond)
srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE))
t.Cleanup(srv.Close)
go func() {
_ = fc.readChildStdin(t)
fc.writeChildStdout(t, `{"jsonrpc":"2.0","id":1,"result":"valid"}`)
}()
resp, _ := http.Post(srv.URL, "application/json",
strings.NewReader(`{"jsonrpc":"2.0","id":1,"method":"ping"}`))
body, _ := io.ReadAll(resp.Body)
_ = resp.Body.Close()
if !strings.Contains(string(body), `"result":"valid"`) {
t.Errorf("malformed lines broke subsequent request: %s", body)
}
}
func TestNew_NilLogger_NoPanic(t *testing.T) {
fc := newFakeChild()
b := bridge.New(fc.stdinW, bufio.NewReader(fc.stdoutR), fc.done, nil)
b.Start()
t.Cleanup(func() {
_ = fc.stdoutW.Close()
_ = fc.stdinR.Close()
})
}
func TestHandleSSE_SendError_BadGateway(t *testing.T) {
// If writing to the child fails (e.g. broken pipe), the handler must
// surface a 502 rather than appear to succeed.
fc := newFakeChild()
b := newBridge(t, fc)
srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE))
t.Cleanup(srv.Close)
// Close the read end of the stdin pipe — bridge's writes will hit
// io.ErrClosedPipe.
_ = fc.stdinR.Close()
resp, err := http.Post(srv.URL, "application/json",
strings.NewReader(`{"jsonrpc":"2.0","id":1,"method":"ping"}`))
if err != nil {
t.Fatalf("POST: %v", err)
}
_ = resp.Body.Close()
if resp.StatusCode != http.StatusBadGateway {
t.Errorf("status = %d, want 502", resp.StatusCode)
}
}
// extractTestID is a very small JSON probe — it returns the substring
// after `"id":` and before the first comma or closing brace. Sufficient
// for the controlled message shape used in the concurrent-requests test.
func extractTestID(line string) string {
idx := strings.Index(line, `"id":`)
if idx < 0 {
return ""
}
rest := line[idx+5:]
end := strings.IndexAny(rest, ",}")
if end < 0 {
return ""
}
return strings.TrimSpace(rest[:end])
}