feat(bridge): JSON-RPC pipe + SSE writer (forgejo-mcp-broker-am1)
Adds internal/bridge: connects HTTP-side MCP clients to a stdio-side
child via JSON-RPC framing. Decoupled from internal/supervisor — takes
io.Writer + *bufio.Reader + done channel directly so it tests cleanly
with io.Pipe pairs and could later wrap something other than a child
process.
Routing model: one reader goroutine consumes child stdout line-by-line.
Each line is parsed only enough to extract the JSON-RPC `id` field
(string/number/null kept as raw JSON, so `1` and `"1"` don't collide).
HTTP requests register a per-id waiter channel before forwarding their
body to the child; the reader delivers the response to whichever waiter
matches. Concurrent in-flight requests are safe; a duplicate id while
the first is still pending returns 409.
HandleSSE response shapes:
- request with id + child reply → 200 text/event-stream, one
`event: message` SSE event carrying the JSON-RPC response
- request without id (notification) → 204 No Content (no waiter
needed; MCP notifications are fire-and-forget)
- empty body → 400
- duplicate in-flight id → 409
- send-to-child fails → 502
- client disconnect mid-wait → bridge cleans up its waiter; child
keeps running, other in-flight requests unaffected
- child exits before reply → SSE `error` event with reason=child_exited
Tests cover all of the above plus stale unsolicited replies, malformed
lines from the child, and reader robustness across both. 90.0%
coverage. The remaining gap is splitLines' empty-data branch (only
reachable if the child sends a literal `\n` line).
Closes forgejo-mcp-broker-am1.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
7be7f5e199
commit
bd68d7ed06
3 changed files with 703 additions and 2 deletions
261
internal/bridge/bridge.go
Normal file
261
internal/bridge/bridge.go
Normal file
|
|
@ -0,0 +1,261 @@
|
|||
// Package bridge connects an HTTP-side MCP client (e.g. Claude.ai over
|
||||
// streamable HTTP) to a stdio-side MCP server (e.g. forgejo-mcp managed
|
||||
// by supervisor) by piping JSON-RPC messages.
|
||||
//
|
||||
// The bridge is protocol-opaque: it parses each line only enough to extract
|
||||
// the JSON-RPC `id` field for response routing. Everything else passes
|
||||
// through verbatim. This is the design.md §5.3 "raw bytes through"
|
||||
// approach — MCP is JSON-RPC 2.0 on both transports, so we can pipe
|
||||
// without understanding semantics.
|
||||
//
|
||||
// One Bridge wraps one child. Multiple HTTP requests can be in flight
|
||||
// concurrently; responses are routed back to the right HTTP handler by
|
||||
// matching their `id` field against pending waiters.
|
||||
package bridge
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// Bridge owns the message routing for a single supervised child.
|
||||
type Bridge struct {
|
||||
stdin io.Writer
|
||||
stdout *bufio.Reader
|
||||
done <-chan struct{}
|
||||
log *slog.Logger
|
||||
|
||||
writeMu sync.Mutex // serializes writes to stdin
|
||||
pending sync.Map // string(id JSON) -> chan []byte
|
||||
started atomic.Bool
|
||||
closed atomic.Bool
|
||||
}
|
||||
|
||||
// New constructs a Bridge over the supplied pipes. The caller is
|
||||
// responsible for keeping stdin/stdout alive for the bridge's lifetime
|
||||
// (typically by holding the supervisor.Child).
|
||||
func New(stdin io.Writer, stdout *bufio.Reader, done <-chan struct{}, log *slog.Logger) *Bridge {
|
||||
if log == nil {
|
||||
log = slog.New(slog.DiscardHandler)
|
||||
}
|
||||
return &Bridge{stdin: stdin, stdout: stdout, done: done, log: log}
|
||||
}
|
||||
|
||||
// Start kicks off the reader goroutine. Must be called exactly once
|
||||
// before HandleSSE is used; calling twice is a programmer error and is
|
||||
// silently ignored.
|
||||
func (b *Bridge) Start() {
|
||||
if !b.started.CompareAndSwap(false, true) {
|
||||
return
|
||||
}
|
||||
go b.readLoop()
|
||||
}
|
||||
|
||||
// HandleSSE forwards the request body to the child as one newline-framed
|
||||
// JSON-RPC line, then streams the matching response back as a single SSE
|
||||
// event. Returns when the response is delivered, the client disconnects,
|
||||
// or the child exits.
|
||||
//
|
||||
// Errors before SSE headers go out: written as plain HTTP errors with the
|
||||
// appropriate status. Errors after headers: best-effort write of an SSE
|
||||
// `error` event, then return.
|
||||
func (b *Bridge) HandleSSE(w http.ResponseWriter, r *http.Request) {
|
||||
body, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
http.Error(w, "read body: "+err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
if len(body) == 0 {
|
||||
http.Error(w, "empty body", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
idKey, ok := extractIDKey(body)
|
||||
if !ok {
|
||||
// No id → notification. Forward without waiting; reply 204.
|
||||
// (MCP notifications don't expect a response.)
|
||||
if err := b.send(body); err != nil {
|
||||
http.Error(w, "send: "+err.Error(), http.StatusBadGateway)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return
|
||||
}
|
||||
|
||||
// Register a waiter before sending so a fast child can't reply before
|
||||
// we're ready to receive.
|
||||
respCh := make(chan []byte, 1)
|
||||
if _, loaded := b.pending.LoadOrStore(idKey, respCh); loaded {
|
||||
http.Error(w, "duplicate in-flight id", http.StatusConflict)
|
||||
return
|
||||
}
|
||||
defer b.pending.Delete(idKey)
|
||||
|
||||
if err := b.send(body); err != nil {
|
||||
http.Error(w, "send: "+err.Error(), http.StatusBadGateway)
|
||||
return
|
||||
}
|
||||
|
||||
// SSE headers must go out before the first event.
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
w.Header().Set("Cache-Control", "no-cache")
|
||||
w.Header().Set("Connection", "keep-alive")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
flusher, _ := w.(http.Flusher)
|
||||
if flusher != nil {
|
||||
flusher.Flush()
|
||||
}
|
||||
|
||||
select {
|
||||
case resp := <-respCh:
|
||||
writeSSEEvent(w, "message", resp)
|
||||
if flusher != nil {
|
||||
flusher.Flush()
|
||||
}
|
||||
case <-r.Context().Done():
|
||||
// Client disconnected. Just return — child keeps running, future
|
||||
// requests on the same bridge are unaffected.
|
||||
b.log.Debug("client disconnected mid-stream", slog.String("id", idKey))
|
||||
case <-b.done:
|
||||
writeSSEEvent(w, "error", []byte(`{"reason":"child_exited"}`))
|
||||
if flusher != nil {
|
||||
flusher.Flush()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// send writes one newline-framed message to the child. Concurrent writes
|
||||
// to the underlying pipe are not safe (interleaved bytes), so we serialize
|
||||
// here.
|
||||
func (b *Bridge) send(msg []byte) error {
|
||||
b.writeMu.Lock()
|
||||
defer b.writeMu.Unlock()
|
||||
if _, err := b.stdin.Write(msg); err != nil {
|
||||
return err
|
||||
}
|
||||
if len(msg) == 0 || msg[len(msg)-1] != '\n' {
|
||||
if _, err := b.stdin.Write([]byte{'\n'}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// readLoop reads one JSON-RPC message per line from the child's stdout
|
||||
// and routes each to the waiting handler (if any). Lines without an id
|
||||
// (server-initiated notifications) are dropped on the floor for now —
|
||||
// phase 5 introduces the GET-stream channel for those.
|
||||
func (b *Bridge) readLoop() {
|
||||
for {
|
||||
line, err := b.stdout.ReadString('\n')
|
||||
if line != "" {
|
||||
b.routeResponse(line)
|
||||
}
|
||||
if err != nil {
|
||||
if errors.Is(err, io.EOF) {
|
||||
b.log.Debug("child stdout closed; bridge reader exiting")
|
||||
} else {
|
||||
b.log.Warn("bridge reader error", slog.String("err", err.Error()))
|
||||
}
|
||||
b.closed.Store(true)
|
||||
// Wake any pending waiters so handlers don't hang forever after
|
||||
// the child dies.
|
||||
b.pending.Range(func(k, v any) bool {
|
||||
if ch, ok := v.(chan []byte); ok {
|
||||
select {
|
||||
case <-ch:
|
||||
default:
|
||||
close(ch)
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Bridge) routeResponse(line string) {
|
||||
idKey, ok := extractIDKey([]byte(line))
|
||||
if !ok {
|
||||
// Server-initiated notification or malformed message — log and skip.
|
||||
b.log.Debug("bridge: dropping un-routable line", slog.Int("len", len(line)))
|
||||
return
|
||||
}
|
||||
v, ok := b.pending.LoadAndDelete(idKey)
|
||||
if !ok {
|
||||
// No waiter — request was likely abandoned (client disconnected).
|
||||
// Logging at debug; this is normal.
|
||||
b.log.Debug("bridge: response with no waiter", slog.String("id", idKey))
|
||||
return
|
||||
}
|
||||
ch, _ := v.(chan []byte)
|
||||
if ch == nil {
|
||||
return
|
||||
}
|
||||
// Non-blocking send: the caller created the channel with buffer 1, so
|
||||
// this is guaranteed to succeed unless we're racing a Close.
|
||||
select {
|
||||
case ch <- []byte(line):
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// extractIDKey parses the JSON-RPC `id` field and returns its raw JSON
|
||||
// representation as a routing key. Returns ok=false when the message has
|
||||
// no id (notification) or fails to parse.
|
||||
//
|
||||
// JSON-RPC ids are string, number, or null. Comparing the raw JSON token
|
||||
// avoids quirks like distinguishing 1 from "1".
|
||||
func extractIDKey(data []byte) (string, bool) {
|
||||
var probe struct {
|
||||
ID json.RawMessage `json:"id"`
|
||||
}
|
||||
if err := json.Unmarshal(data, &probe); err != nil {
|
||||
return "", false
|
||||
}
|
||||
if len(probe.ID) == 0 || string(probe.ID) == "null" {
|
||||
return "", false
|
||||
}
|
||||
return string(probe.ID), true
|
||||
}
|
||||
|
||||
// writeSSEEvent writes one Server-Sent Event to w. The data may be
|
||||
// multi-line — we re-frame it per the SSE spec (each \n becomes a
|
||||
// separate `data:` line). For our typical single-line JSON, this is a
|
||||
// no-op.
|
||||
func writeSSEEvent(w io.Writer, event string, data []byte) {
|
||||
fmt.Fprintf(w, "event: %s\n", event)
|
||||
for _, line := range splitLines(data) {
|
||||
fmt.Fprintf(w, "data: %s\n", line)
|
||||
}
|
||||
_, _ = w.Write([]byte("\n"))
|
||||
}
|
||||
|
||||
func splitLines(data []byte) []string {
|
||||
// Strip a single trailing newline (typical for child output).
|
||||
if n := len(data); n > 0 && data[n-1] == '\n' {
|
||||
data = data[:n-1]
|
||||
}
|
||||
if len(data) == 0 {
|
||||
return []string{""}
|
||||
}
|
||||
out := []string{}
|
||||
start := 0
|
||||
for i, b := range data {
|
||||
if b == '\n' {
|
||||
out = append(out, string(data[start:i]))
|
||||
start = i + 1
|
||||
}
|
||||
}
|
||||
out = append(out, string(data[start:]))
|
||||
return out
|
||||
}
|
||||
|
||||
440
internal/bridge/bridge_test.go
Normal file
440
internal/bridge/bridge_test.go
Normal file
|
|
@ -0,0 +1,440 @@
|
|||
package bridge_test
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"kode.naiv.no/olemd/forgejo-mcp-broker/internal/bridge"
|
||||
brokerlog "kode.naiv.no/olemd/forgejo-mcp-broker/internal/log"
|
||||
)
|
||||
|
||||
// fakeChild simulates a supervised stdio subprocess via io.Pipe pairs.
|
||||
// Test code writes to stdoutW (acting as the child sending output) and
|
||||
// reads from stdinR (acting as the child receiving input).
|
||||
type fakeChild struct {
|
||||
stdinR *io.PipeReader
|
||||
stdinW *io.PipeWriter
|
||||
stdoutR *io.PipeReader
|
||||
stdoutW *io.PipeWriter
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
func newFakeChild() *fakeChild {
|
||||
stdinR, stdinW := io.Pipe()
|
||||
stdoutR, stdoutW := io.Pipe()
|
||||
return &fakeChild{
|
||||
stdinR: stdinR,
|
||||
stdinW: stdinW,
|
||||
stdoutR: stdoutR,
|
||||
stdoutW: stdoutW,
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// kill closes the stdout pipe to simulate the child exiting.
|
||||
func (f *fakeChild) kill() {
|
||||
_ = f.stdoutW.Close()
|
||||
close(f.done)
|
||||
}
|
||||
|
||||
// readChildStdin reads one newline-terminated line from what the bridge
|
||||
// sent to the child.
|
||||
func (f *fakeChild) readChildStdin(t *testing.T) string {
|
||||
t.Helper()
|
||||
br := bufio.NewReader(f.stdinR)
|
||||
line, err := br.ReadString('\n')
|
||||
if err != nil {
|
||||
t.Fatalf("read child stdin: %v", err)
|
||||
}
|
||||
return line
|
||||
}
|
||||
|
||||
// writeChildStdout writes a line as if the child produced it. Adds the
|
||||
// trailing newline if missing.
|
||||
func (f *fakeChild) writeChildStdout(t *testing.T, line string) {
|
||||
t.Helper()
|
||||
if !strings.HasSuffix(line, "\n") {
|
||||
line += "\n"
|
||||
}
|
||||
if _, err := io.WriteString(f.stdoutW, line); err != nil {
|
||||
t.Fatalf("write child stdout: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func newBridge(t *testing.T, fc *fakeChild) *bridge.Bridge {
|
||||
t.Helper()
|
||||
b := bridge.New(fc.stdinW, bufio.NewReader(fc.stdoutR), fc.done, brokerlog.Discard())
|
||||
b.Start()
|
||||
t.Cleanup(func() {
|
||||
_ = fc.stdoutW.Close()
|
||||
_ = fc.stdinR.Close()
|
||||
})
|
||||
return b
|
||||
}
|
||||
|
||||
func TestHandleSSE_RoundTrip(t *testing.T) {
|
||||
fc := newFakeChild()
|
||||
b := newBridge(t, fc)
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE))
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
// Drive the child side: read what the bridge sends, reply with a
|
||||
// JSON-RPC response carrying the same id.
|
||||
go func() {
|
||||
_ = fc.readChildStdin(t)
|
||||
fc.writeChildStdout(t, `{"jsonrpc":"2.0","id":1,"result":{"ok":true}}`)
|
||||
}()
|
||||
|
||||
resp, err := http.Post(srv.URL, "application/json",
|
||||
strings.NewReader(`{"jsonrpc":"2.0","id":1,"method":"ping"}`))
|
||||
if err != nil {
|
||||
t.Fatalf("POST: %v", err)
|
||||
}
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
_ = resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
t.Errorf("status = %d, want 200; body: %s", resp.StatusCode, body)
|
||||
}
|
||||
if ct := resp.Header.Get("Content-Type"); ct != "text/event-stream" {
|
||||
t.Errorf("Content-Type = %q, want text/event-stream", ct)
|
||||
}
|
||||
got := string(body)
|
||||
if !strings.Contains(got, "event: message") {
|
||||
t.Errorf("body missing message event: %q", got)
|
||||
}
|
||||
if !strings.Contains(got, `"result":{"ok":true}`) {
|
||||
t.Errorf("body missing JSON-RPC payload: %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleSSE_ConcurrentRequests_RoutedById(t *testing.T) {
|
||||
fc := newFakeChild()
|
||||
b := newBridge(t, fc)
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE))
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
// Driver: read incoming messages from the bridge in lockstep and
|
||||
// reply to each by id.
|
||||
var seen sync.Map
|
||||
go func() {
|
||||
br := bufio.NewReader(fc.stdinR)
|
||||
for {
|
||||
line, err := br.ReadString('\n')
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
// Trivial id extraction: search for "id":N. Sufficient for the
|
||||
// known message shape used in this test.
|
||||
id := extractTestID(line)
|
||||
seen.Store(id, true)
|
||||
fc.writeChildStdout(t, fmt.Sprintf(`{"jsonrpc":"2.0","id":%s,"result":%s}`+"\n", id, id))
|
||||
}
|
||||
}()
|
||||
|
||||
// Fire 10 concurrent requests with distinct ids.
|
||||
const N = 10
|
||||
var wg sync.WaitGroup
|
||||
results := make([]string, N)
|
||||
for i := 0; i < N; i++ {
|
||||
wg.Add(1)
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
body := fmt.Sprintf(`{"jsonrpc":"2.0","id":%d,"method":"echo"}`, i)
|
||||
resp, err := http.Post(srv.URL, "application/json", strings.NewReader(body))
|
||||
if err != nil {
|
||||
t.Errorf("POST id=%d: %v", i, err)
|
||||
return
|
||||
}
|
||||
b, _ := io.ReadAll(resp.Body)
|
||||
_ = resp.Body.Close()
|
||||
results[i] = string(b)
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
for i, body := range results {
|
||||
want := fmt.Sprintf(`"result":%d`, i)
|
||||
if !strings.Contains(body, want) {
|
||||
t.Errorf("id=%d body missing %q, got: %s", i, want, body)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleSSE_NotificationReturns204(t *testing.T) {
|
||||
fc := newFakeChild()
|
||||
b := newBridge(t, fc)
|
||||
srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE))
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
// Drain the child's stdin so the write doesn't block.
|
||||
gotLine := make(chan string, 1)
|
||||
go func() { gotLine <- fc.readChildStdin(t) }()
|
||||
|
||||
resp, err := http.Post(srv.URL, "application/json",
|
||||
strings.NewReader(`{"jsonrpc":"2.0","method":"notify","params":{}}`))
|
||||
if err != nil {
|
||||
t.Fatalf("POST: %v", err)
|
||||
}
|
||||
_ = resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusNoContent {
|
||||
t.Errorf("status = %d, want 204", resp.StatusCode)
|
||||
}
|
||||
select {
|
||||
case line := <-gotLine:
|
||||
if !strings.Contains(line, `"method":"notify"`) {
|
||||
t.Errorf("forwarded line missing method: %q", line)
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Error("notification was not forwarded to child")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleSSE_EmptyBody_400(t *testing.T) {
|
||||
fc := newFakeChild()
|
||||
b := newBridge(t, fc)
|
||||
srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE))
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
resp, err := http.Post(srv.URL, "application/json", strings.NewReader(""))
|
||||
if err != nil {
|
||||
t.Fatalf("POST: %v", err)
|
||||
}
|
||||
_ = resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusBadRequest {
|
||||
t.Errorf("status = %d, want 400", resp.StatusCode)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleSSE_DuplicateInflightID_409(t *testing.T) {
|
||||
fc := newFakeChild()
|
||||
b := newBridge(t, fc)
|
||||
srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE))
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
// First request blocks waiting for child reply (we never send one).
|
||||
first := make(chan *http.Response, 1)
|
||||
firstCtx, firstCancel := context.WithCancel(t.Context())
|
||||
defer firstCancel()
|
||||
go func() {
|
||||
req, _ := http.NewRequestWithContext(firstCtx, http.MethodPost, srv.URL,
|
||||
strings.NewReader(`{"jsonrpc":"2.0","id":1,"method":"slow"}`))
|
||||
resp, _ := http.DefaultClient.Do(req)
|
||||
first <- resp
|
||||
}()
|
||||
|
||||
// Drain the child stdin so the first request can register its waiter.
|
||||
go func() { _ = fc.readChildStdin(t) }()
|
||||
|
||||
// Wait briefly for the first request's waiter to register.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Second request with same id must 409.
|
||||
resp, err := http.Post(srv.URL, "application/json",
|
||||
strings.NewReader(`{"jsonrpc":"2.0","id":1,"method":"slow"}`))
|
||||
if err != nil {
|
||||
t.Fatalf("second POST: %v", err)
|
||||
}
|
||||
_ = resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusConflict {
|
||||
t.Errorf("status = %d, want 409", resp.StatusCode)
|
||||
}
|
||||
|
||||
// Cleanup: cancel the first request so it doesn't leak.
|
||||
firstCancel()
|
||||
<-first
|
||||
}
|
||||
|
||||
func TestHandleSSE_ClientDisconnectMidStream_DoesNotKillChild(t *testing.T) {
|
||||
fc := newFakeChild()
|
||||
b := newBridge(t, fc)
|
||||
srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE))
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
// Drain child stdin so the bridge can register the waiter.
|
||||
go func() { _ = fc.readChildStdin(t) }()
|
||||
|
||||
ctx, cancel := context.WithCancel(t.Context())
|
||||
req, _ := http.NewRequestWithContext(ctx, http.MethodPost, srv.URL,
|
||||
strings.NewReader(`{"jsonrpc":"2.0","id":42,"method":"slow"}`))
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
t.Fatalf("POST: %v", err)
|
||||
}
|
||||
// Disconnect immediately.
|
||||
_ = resp.Body.Close()
|
||||
cancel()
|
||||
|
||||
// Bridge should still be functional for a new request.
|
||||
go func() {
|
||||
_ = fc.readChildStdin(t)
|
||||
fc.writeChildStdout(t, `{"jsonrpc":"2.0","id":99,"result":"alive"}`)
|
||||
}()
|
||||
|
||||
resp2, err := http.Post(srv.URL, "application/json",
|
||||
strings.NewReader(`{"jsonrpc":"2.0","id":99,"method":"ping"}`))
|
||||
if err != nil {
|
||||
t.Fatalf("second POST after disconnect: %v", err)
|
||||
}
|
||||
body, _ := io.ReadAll(resp2.Body)
|
||||
_ = resp2.Body.Close()
|
||||
if !strings.Contains(string(body), `"result":"alive"`) {
|
||||
t.Errorf("post-disconnect request did not get response: %s", body)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleSSE_ChildExits_SendsErrorEvent(t *testing.T) {
|
||||
fc := newFakeChild()
|
||||
b := newBridge(t, fc)
|
||||
srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE))
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
go func() {
|
||||
_ = fc.readChildStdin(t)
|
||||
// Don't reply; just kill the child.
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
fc.kill()
|
||||
}()
|
||||
|
||||
resp, err := http.Post(srv.URL, "application/json",
|
||||
strings.NewReader(`{"jsonrpc":"2.0","id":7,"method":"will_fail"}`))
|
||||
if err != nil {
|
||||
t.Fatalf("POST: %v", err)
|
||||
}
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
_ = resp.Body.Close()
|
||||
|
||||
if !strings.Contains(string(body), "event: error") {
|
||||
t.Errorf("body should contain error event when child exits: %s", body)
|
||||
}
|
||||
if !strings.Contains(string(body), "child_exited") {
|
||||
t.Errorf("body should mention child_exited: %s", body)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleSSE_StartTwice_NoOp(t *testing.T) {
|
||||
// Calling Start twice must not panic or spawn a second reader.
|
||||
fc := newFakeChild()
|
||||
b := bridge.New(fc.stdinW, bufio.NewReader(fc.stdoutR), fc.done, brokerlog.Discard())
|
||||
b.Start()
|
||||
b.Start() // should be a no-op
|
||||
t.Cleanup(func() {
|
||||
_ = fc.stdoutW.Close()
|
||||
_ = fc.stdinR.Close()
|
||||
})
|
||||
}
|
||||
|
||||
func TestReadLoop_UnsolicitedResponseDropped(t *testing.T) {
|
||||
// Stale/unsolicited response with an id we never sent must not crash
|
||||
// the reader; subsequent in-flight requests should still work.
|
||||
fc := newFakeChild()
|
||||
b := newBridge(t, fc)
|
||||
|
||||
fc.writeChildStdout(t, `{"jsonrpc":"2.0","id":99,"result":"stale"}`)
|
||||
// Give the reader a tick to consume the unsolicited line.
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE))
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
go func() {
|
||||
_ = fc.readChildStdin(t)
|
||||
fc.writeChildStdout(t, `{"jsonrpc":"2.0","id":1,"result":"after"}`)
|
||||
}()
|
||||
resp, err := http.Post(srv.URL, "application/json",
|
||||
strings.NewReader(`{"jsonrpc":"2.0","id":1,"method":"ping"}`))
|
||||
if err != nil {
|
||||
t.Fatalf("POST: %v", err)
|
||||
}
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
_ = resp.Body.Close()
|
||||
if !strings.Contains(string(body), `"result":"after"`) {
|
||||
t.Errorf("stale unsolicited response broke later request: %s", body)
|
||||
}
|
||||
}
|
||||
|
||||
func TestReadLoop_MalformedLinesDropped(t *testing.T) {
|
||||
// Garbage from the child (not JSON, or JSON without id) is silently
|
||||
// dropped — must not break the reader.
|
||||
fc := newFakeChild()
|
||||
b := newBridge(t, fc)
|
||||
|
||||
fc.writeChildStdout(t, "not json at all")
|
||||
fc.writeChildStdout(t, `{"jsonrpc":"2.0","method":"server-notification"}`) // no id
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE))
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
go func() {
|
||||
_ = fc.readChildStdin(t)
|
||||
fc.writeChildStdout(t, `{"jsonrpc":"2.0","id":1,"result":"valid"}`)
|
||||
}()
|
||||
resp, _ := http.Post(srv.URL, "application/json",
|
||||
strings.NewReader(`{"jsonrpc":"2.0","id":1,"method":"ping"}`))
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
_ = resp.Body.Close()
|
||||
if !strings.Contains(string(body), `"result":"valid"`) {
|
||||
t.Errorf("malformed lines broke subsequent request: %s", body)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNew_NilLogger_NoPanic(t *testing.T) {
|
||||
fc := newFakeChild()
|
||||
b := bridge.New(fc.stdinW, bufio.NewReader(fc.stdoutR), fc.done, nil)
|
||||
b.Start()
|
||||
t.Cleanup(func() {
|
||||
_ = fc.stdoutW.Close()
|
||||
_ = fc.stdinR.Close()
|
||||
})
|
||||
}
|
||||
|
||||
func TestHandleSSE_SendError_BadGateway(t *testing.T) {
|
||||
// If writing to the child fails (e.g. broken pipe), the handler must
|
||||
// surface a 502 rather than appear to succeed.
|
||||
fc := newFakeChild()
|
||||
b := newBridge(t, fc)
|
||||
srv := httptest.NewServer(http.HandlerFunc(b.HandleSSE))
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
// Close the read end of the stdin pipe — bridge's writes will hit
|
||||
// io.ErrClosedPipe.
|
||||
_ = fc.stdinR.Close()
|
||||
|
||||
resp, err := http.Post(srv.URL, "application/json",
|
||||
strings.NewReader(`{"jsonrpc":"2.0","id":1,"method":"ping"}`))
|
||||
if err != nil {
|
||||
t.Fatalf("POST: %v", err)
|
||||
}
|
||||
_ = resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusBadGateway {
|
||||
t.Errorf("status = %d, want 502", resp.StatusCode)
|
||||
}
|
||||
}
|
||||
|
||||
// extractTestID is a very small JSON probe — it returns the substring
|
||||
// after `"id":` and before the first comma or closing brace. Sufficient
|
||||
// for the controlled message shape used in the concurrent-requests test.
|
||||
func extractTestID(line string) string {
|
||||
idx := strings.Index(line, `"id":`)
|
||||
if idx < 0 {
|
||||
return ""
|
||||
}
|
||||
rest := line[idx+5:]
|
||||
end := strings.IndexAny(rest, ",}")
|
||||
if end < 0 {
|
||||
return ""
|
||||
}
|
||||
return strings.TrimSpace(rest[:end])
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue