package bridge_test import ( "bufio" "context" "fmt" "io" "net/http" "net/http/httptest" "strings" "sync" "testing" "time" "kode.naiv.no/olemd/forgejo-mcp-broker/internal/bridge" brokerlog "kode.naiv.no/olemd/forgejo-mcp-broker/internal/log" ) // fakeChild simulates a supervised stdio subprocess via io.Pipe pairs. // Test code writes to stdoutW (acting as the child sending output) and // reads from stdinR (acting as the child receiving input). type fakeChild struct { stdinR *io.PipeReader stdinW *io.PipeWriter stdoutR *io.PipeReader stdoutW *io.PipeWriter done chan struct{} } func newFakeChild() *fakeChild { stdinR, stdinW := io.Pipe() stdoutR, stdoutW := io.Pipe() return &fakeChild{ stdinR: stdinR, stdinW: stdinW, stdoutR: stdoutR, stdoutW: stdoutW, done: make(chan struct{}), } } // kill closes the stdout pipe to simulate the child exiting. func (f *fakeChild) kill() { _ = f.stdoutW.Close() close(f.done) } // readChildStdin reads one newline-terminated line from what the bridge // sent to the child. func (f *fakeChild) readChildStdin(t *testing.T) string { t.Helper() br := bufio.NewReader(f.stdinR) line, err := br.ReadString('\n') if err != nil { t.Fatalf("read child stdin: %v", err) } return line } // writeChildStdout writes a line as if the child produced it. Adds the // trailing newline if missing. func (f *fakeChild) writeChildStdout(t *testing.T, line string) { t.Helper() if !strings.HasSuffix(line, "\n") { line += "\n" } if _, err := io.WriteString(f.stdoutW, line); err != nil { t.Fatalf("write child stdout: %v", err) } } func newBridge(t *testing.T, fc *fakeChild) *bridge.Bridge { t.Helper() b := bridge.New(fc.stdinW, bufio.NewReader(fc.stdoutR), fc.done, brokerlog.Discard()) b.Start() t.Cleanup(func() { _ = fc.stdoutW.Close() _ = fc.stdinR.Close() }) return b } func TestHandleSSE_RoundTrip(t *testing.T) { fc := newFakeChild() b := newBridge(t, fc) srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE)) t.Cleanup(srv.Close) // Drive the child side: read what the bridge sends, reply with a // JSON-RPC response carrying the same id. go func() { _ = fc.readChildStdin(t) fc.writeChildStdout(t, `{"jsonrpc":"2.0","id":1,"result":{"ok":true}}`) }() resp, err := http.Post(srv.URL, "application/json", strings.NewReader(`{"jsonrpc":"2.0","id":1,"method":"ping"}`)) if err != nil { t.Fatalf("POST: %v", err) } body, _ := io.ReadAll(resp.Body) _ = resp.Body.Close() if resp.StatusCode != http.StatusOK { t.Errorf("status = %d, want 200; body: %s", resp.StatusCode, body) } if ct := resp.Header.Get("Content-Type"); ct != "text/event-stream" { t.Errorf("Content-Type = %q, want text/event-stream", ct) } got := string(body) if !strings.Contains(got, "event: message") { t.Errorf("body missing message event: %q", got) } if !strings.Contains(got, `"result":{"ok":true}`) { t.Errorf("body missing JSON-RPC payload: %q", got) } } func TestHandleSSE_ConcurrentRequests_RoutedById(t *testing.T) { fc := newFakeChild() b := newBridge(t, fc) srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE)) t.Cleanup(srv.Close) // Driver: read incoming messages from the bridge in lockstep and // reply to each by id. var seen sync.Map go func() { br := bufio.NewReader(fc.stdinR) for { line, err := br.ReadString('\n') if err != nil { return } // Trivial id extraction: search for "id":N. Sufficient for the // known message shape used in this test. id := extractTestID(line) seen.Store(id, true) fc.writeChildStdout(t, fmt.Sprintf(`{"jsonrpc":"2.0","id":%s,"result":%s}`+"\n", id, id)) } }() // Fire 10 concurrent requests with distinct ids. const N = 10 var wg sync.WaitGroup results := make([]string, N) for i := 0; i < N; i++ { wg.Add(1) go func(i int) { defer wg.Done() body := fmt.Sprintf(`{"jsonrpc":"2.0","id":%d,"method":"echo"}`, i) resp, err := http.Post(srv.URL, "application/json", strings.NewReader(body)) if err != nil { t.Errorf("POST id=%d: %v", i, err) return } b, _ := io.ReadAll(resp.Body) _ = resp.Body.Close() results[i] = string(b) }(i) } wg.Wait() for i, body := range results { want := fmt.Sprintf(`"result":%d`, i) if !strings.Contains(body, want) { t.Errorf("id=%d body missing %q, got: %s", i, want, body) } } } func TestHandleSSE_NotificationReturns204(t *testing.T) { fc := newFakeChild() b := newBridge(t, fc) srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE)) t.Cleanup(srv.Close) // Drain the child's stdin so the write doesn't block. gotLine := make(chan string, 1) go func() { gotLine <- fc.readChildStdin(t) }() resp, err := http.Post(srv.URL, "application/json", strings.NewReader(`{"jsonrpc":"2.0","method":"notify","params":{}}`)) if err != nil { t.Fatalf("POST: %v", err) } _ = resp.Body.Close() if resp.StatusCode != http.StatusNoContent { t.Errorf("status = %d, want 204", resp.StatusCode) } select { case line := <-gotLine: if !strings.Contains(line, `"method":"notify"`) { t.Errorf("forwarded line missing method: %q", line) } case <-time.After(time.Second): t.Error("notification was not forwarded to child") } } func TestHandleSSE_EmptyBody_400(t *testing.T) { fc := newFakeChild() b := newBridge(t, fc) srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE)) t.Cleanup(srv.Close) resp, err := http.Post(srv.URL, "application/json", strings.NewReader("")) if err != nil { t.Fatalf("POST: %v", err) } _ = resp.Body.Close() if resp.StatusCode != http.StatusBadRequest { t.Errorf("status = %d, want 400", resp.StatusCode) } } func TestHandleSSE_DuplicateInflightID_409(t *testing.T) { fc := newFakeChild() b := newBridge(t, fc) srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE)) t.Cleanup(srv.Close) // First request blocks waiting for child reply (we never send one). first := make(chan *http.Response, 1) firstCtx, firstCancel := context.WithCancel(t.Context()) defer firstCancel() go func() { req, _ := http.NewRequestWithContext(firstCtx, http.MethodPost, srv.URL, strings.NewReader(`{"jsonrpc":"2.0","id":1,"method":"slow"}`)) resp, _ := http.DefaultClient.Do(req) first <- resp }() // Drain the child stdin so the first request can register its waiter. go func() { _ = fc.readChildStdin(t) }() // Wait briefly for the first request's waiter to register. time.Sleep(100 * time.Millisecond) // Second request with same id must 409. resp, err := http.Post(srv.URL, "application/json", strings.NewReader(`{"jsonrpc":"2.0","id":1,"method":"slow"}`)) if err != nil { t.Fatalf("second POST: %v", err) } _ = resp.Body.Close() if resp.StatusCode != http.StatusConflict { t.Errorf("status = %d, want 409", resp.StatusCode) } // Cleanup: cancel the first request so it doesn't leak. firstCancel() <-first } func TestHandleSSE_ClientDisconnectMidStream_DoesNotKillChild(t *testing.T) { fc := newFakeChild() b := newBridge(t, fc) srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE)) t.Cleanup(srv.Close) // Drain child stdin so the bridge can register the waiter. go func() { _ = fc.readChildStdin(t) }() ctx, cancel := context.WithCancel(t.Context()) req, _ := http.NewRequestWithContext(ctx, http.MethodPost, srv.URL, strings.NewReader(`{"jsonrpc":"2.0","id":42,"method":"slow"}`)) resp, err := http.DefaultClient.Do(req) if err != nil { t.Fatalf("POST: %v", err) } // Disconnect immediately. _ = resp.Body.Close() cancel() // Bridge should still be functional for a new request. go func() { _ = fc.readChildStdin(t) fc.writeChildStdout(t, `{"jsonrpc":"2.0","id":99,"result":"alive"}`) }() resp2, err := http.Post(srv.URL, "application/json", strings.NewReader(`{"jsonrpc":"2.0","id":99,"method":"ping"}`)) if err != nil { t.Fatalf("second POST after disconnect: %v", err) } body, _ := io.ReadAll(resp2.Body) _ = resp2.Body.Close() if !strings.Contains(string(body), `"result":"alive"`) { t.Errorf("post-disconnect request did not get response: %s", body) } } func TestHandleSSE_ChildExits_SendsErrorEvent(t *testing.T) { fc := newFakeChild() b := newBridge(t, fc) srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE)) t.Cleanup(srv.Close) go func() { _ = fc.readChildStdin(t) // Don't reply; just kill the child. time.Sleep(50 * time.Millisecond) fc.kill() }() resp, err := http.Post(srv.URL, "application/json", strings.NewReader(`{"jsonrpc":"2.0","id":7,"method":"will_fail"}`)) if err != nil { t.Fatalf("POST: %v", err) } body, _ := io.ReadAll(resp.Body) _ = resp.Body.Close() if !strings.Contains(string(body), "event: error") { t.Errorf("body should contain error event when child exits: %s", body) } if !strings.Contains(string(body), "child_exited") { t.Errorf("body should mention child_exited: %s", body) } } func TestHandleSSE_StartTwice_NoOp(t *testing.T) { // Calling Start twice must not panic or spawn a second reader. fc := newFakeChild() b := bridge.New(fc.stdinW, bufio.NewReader(fc.stdoutR), fc.done, brokerlog.Discard()) b.Start() b.Start() // should be a no-op t.Cleanup(func() { _ = fc.stdoutW.Close() _ = fc.stdinR.Close() }) } func TestReadLoop_UnsolicitedResponseDropped(t *testing.T) { // Stale/unsolicited response with an id we never sent must not crash // the reader; subsequent in-flight requests should still work. fc := newFakeChild() b := newBridge(t, fc) fc.writeChildStdout(t, `{"jsonrpc":"2.0","id":99,"result":"stale"}`) // Give the reader a tick to consume the unsolicited line. time.Sleep(50 * time.Millisecond) srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE)) t.Cleanup(srv.Close) go func() { _ = fc.readChildStdin(t) fc.writeChildStdout(t, `{"jsonrpc":"2.0","id":1,"result":"after"}`) }() resp, err := http.Post(srv.URL, "application/json", strings.NewReader(`{"jsonrpc":"2.0","id":1,"method":"ping"}`)) if err != nil { t.Fatalf("POST: %v", err) } body, _ := io.ReadAll(resp.Body) _ = resp.Body.Close() if !strings.Contains(string(body), `"result":"after"`) { t.Errorf("stale unsolicited response broke later request: %s", body) } } func TestReadLoop_MalformedLinesDropped(t *testing.T) { // Garbage from the child (not JSON, or JSON without id) is silently // dropped — must not break the reader. fc := newFakeChild() b := newBridge(t, fc) fc.writeChildStdout(t, "not json at all") fc.writeChildStdout(t, `{"jsonrpc":"2.0","method":"server-notification"}`) // no id time.Sleep(50 * time.Millisecond) srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE)) t.Cleanup(srv.Close) go func() { _ = fc.readChildStdin(t) fc.writeChildStdout(t, `{"jsonrpc":"2.0","id":1,"result":"valid"}`) }() resp, _ := http.Post(srv.URL, "application/json", strings.NewReader(`{"jsonrpc":"2.0","id":1,"method":"ping"}`)) body, _ := io.ReadAll(resp.Body) _ = resp.Body.Close() if !strings.Contains(string(body), `"result":"valid"`) { t.Errorf("malformed lines broke subsequent request: %s", body) } } func TestNew_NilLogger_NoPanic(t *testing.T) { fc := newFakeChild() b := bridge.New(fc.stdinW, bufio.NewReader(fc.stdoutR), fc.done, nil) b.Start() t.Cleanup(func() { _ = fc.stdoutW.Close() _ = fc.stdinR.Close() }) } func TestHandleSSE_SendError_BadGateway(t *testing.T) { // If writing to the child fails (e.g. broken pipe), the handler must // surface a 502 rather than appear to succeed. fc := newFakeChild() b := newBridge(t, fc) srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE)) t.Cleanup(srv.Close) // Close the read end of the stdin pipe — bridge's writes will hit // io.ErrClosedPipe. _ = fc.stdinR.Close() resp, err := http.Post(srv.URL, "application/json", strings.NewReader(`{"jsonrpc":"2.0","id":1,"method":"ping"}`)) if err != nil { t.Fatalf("POST: %v", err) } _ = resp.Body.Close() if resp.StatusCode != http.StatusBadGateway { t.Errorf("status = %d, want 502", resp.StatusCode) } } // extractTestID is a very small JSON probe — it returns the substring // after `"id":` and before the first comma or closing brace. Sufficient // for the controlled message shape used in the concurrent-requests test. func extractTestID(line string) string { idx := strings.Index(line, `"id":`) if idx < 0 { return "" } rest := line[idx+5:] end := strings.IndexAny(rest, ",}") if end < 0 { return "" } return strings.TrimSpace(rest[:end]) }