diff --git a/.beads/issues.jsonl b/.beads/issues.jsonl index 586877c..096ee99 100644 --- a/.beads/issues.jsonl +++ b/.beads/issues.jsonl @@ -1,6 +1,6 @@ {"id":"forgejo-mcp-broker-q4x","title":"Phase 5c: idle reaper + Forgejo token rotation + child respawn","description":"Reaper (30s tick) applies idle timeout. Rotation (1-min tick) refreshes Forgejo tokens expiring \u003c2min, SIGTERMs child, respawns on next request (design.md §6). Token revocation tears down sessions.","acceptance_criteria":"Clock-injected tests: idle kill, rotation triggers respawn, revocation tears down sessions. Smoke test: 20 concurrent sessions for 10min with mid-test rotations.","status":"open","priority":1,"issue_type":"task","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:18Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-24T15:45:18Z","dependencies":[{"issue_id":"forgejo-mcp-broker-q4x","depends_on_id":"forgejo-mcp-broker-pur","type":"blocks","created_at":"2026-04-24T17:45:31Z","created_by":"Ole-Morten Duesund","metadata":"{}"},{"issue_id":"forgejo-mcp-broker-q4x","depends_on_id":"forgejo-mcp-broker-t81","type":"blocks","created_at":"2026-04-24T17:45:31Z","created_by":"Ole-Morten Duesund","metadata":"{}"}],"dependency_count":2,"dependent_count":1,"comment_count":0} -{"id":"forgejo-mcp-broker-ytw","title":"Phase 5b: bearer-token middleware on /mcp","description":"Middleware reads Authorization: Bearer \u003cmcp_token\u003e, resolves via store, attaches Forgejo access token to request context. 401 for missing/expired/revoked.","acceptance_criteria":"Table-driven tests: missing header, malformed, unknown token, expired, revoked, valid. Valid-token path puts Forgejo token on ctx via typed key.","status":"in_progress","priority":1,"issue_type":"task","assignee":"Ole-Morten Duesund","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:18Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-27T15:08:52Z","started_at":"2026-04-27T15:08:52Z","dependencies":[{"issue_id":"forgejo-mcp-broker-ytw","depends_on_id":"forgejo-mcp-broker-pur","type":"blocks","created_at":"2026-04-24T17:45:30Z","created_by":"Ole-Morten Duesund","metadata":"{}"}],"dependency_count":1,"dependent_count":1,"comment_count":0} -{"id":"forgejo-mcp-broker-t81","title":"Phase 5a: session registry + spawn-on-initialize","description":"Map Mcp-Session-Id -\u003e supervisor.Child + user metadata. On first initialize for unknown sid, spawn forgejo-mcp with user's Forgejo token in env, bind to bridge. LastActive bumped per request.","acceptance_criteria":"Tests with fake supervisor + fake bridge cover: spawn-on-initialize, reuse for subsequent messages, unknown-sid returns 410, max-sessions cap enforced.","status":"open","priority":1,"issue_type":"task","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:17Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-24T15:45:17Z","dependencies":[{"issue_id":"forgejo-mcp-broker-t81","depends_on_id":"forgejo-mcp-broker-am1","type":"blocks","created_at":"2026-04-24T17:45:29Z","created_by":"Ole-Morten Duesund","metadata":"{}"},{"issue_id":"forgejo-mcp-broker-t81","depends_on_id":"forgejo-mcp-broker-pur","type":"blocks","created_at":"2026-04-24T17:45:30Z","created_by":"Ole-Morten Duesund","metadata":"{}"},{"issue_id":"forgejo-mcp-broker-t81","depends_on_id":"forgejo-mcp-broker-zuq","type":"blocks","created_at":"2026-04-24T17:45:28Z","created_by":"Ole-Morten Duesund","metadata":"{}"}],"dependency_count":3,"dependent_count":2,"comment_count":0} +{"id":"forgejo-mcp-broker-ytw","title":"Phase 5b: bearer-token middleware on /mcp","description":"Middleware reads Authorization: Bearer \u003cmcp_token\u003e, resolves via store, attaches Forgejo access token to request context. 401 for missing/expired/revoked.","acceptance_criteria":"Table-driven tests: missing header, malformed, unknown token, expired, revoked, valid. Valid-token path puts Forgejo token on ctx via typed key.","status":"closed","priority":1,"issue_type":"task","assignee":"Ole-Morten Duesund","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:18Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-27T15:10:28Z","started_at":"2026-04-27T15:08:52Z","closed_at":"2026-04-27T15:10:28Z","close_reason":"Bearer middleware shipped: RequireBearer wraps protected handlers, looks up access_tokens by hash, rejects expired/revoked/unknown with RFC 6750 WWW-Authenticate. Session attached to ctx for downstream MCP endpoint use.","dependencies":[{"issue_id":"forgejo-mcp-broker-ytw","depends_on_id":"forgejo-mcp-broker-pur","type":"blocks","created_at":"2026-04-24T17:45:30Z","created_by":"Ole-Morten Duesund","metadata":"{}"}],"dependency_count":1,"dependent_count":1,"comment_count":0} +{"id":"forgejo-mcp-broker-t81","title":"Phase 5a: session registry + spawn-on-initialize","description":"Map Mcp-Session-Id -\u003e supervisor.Child + user metadata. On first initialize for unknown sid, spawn forgejo-mcp with user's Forgejo token in env, bind to bridge. LastActive bumped per request.","acceptance_criteria":"Tests with fake supervisor + fake bridge cover: spawn-on-initialize, reuse for subsequent messages, unknown-sid returns 410, max-sessions cap enforced.","status":"in_progress","priority":1,"issue_type":"task","assignee":"Ole-Morten Duesund","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:17Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-27T15:11:43Z","started_at":"2026-04-27T15:11:43Z","dependencies":[{"issue_id":"forgejo-mcp-broker-t81","depends_on_id":"forgejo-mcp-broker-am1","type":"blocks","created_at":"2026-04-24T17:45:29Z","created_by":"Ole-Morten Duesund","metadata":"{}"},{"issue_id":"forgejo-mcp-broker-t81","depends_on_id":"forgejo-mcp-broker-pur","type":"blocks","created_at":"2026-04-24T17:45:30Z","created_by":"Ole-Morten Duesund","metadata":"{}"},{"issue_id":"forgejo-mcp-broker-t81","depends_on_id":"forgejo-mcp-broker-zuq","type":"blocks","created_at":"2026-04-24T17:45:28Z","created_by":"Ole-Morten Duesund","metadata":"{}"}],"dependency_count":3,"dependent_count":2,"comment_count":0} {"id":"forgejo-mcp-broker-xot","title":"Phase 4b: bridge integration test against real forgejo-mcp","description":"Drive the bridge with initialize -\u003e tools/list -\u003e tools/call get_forgejo_mcp_server_version against a real forgejo-mcp subprocess. Validates the opaque-pipe assumption.","acceptance_criteria":"Full handshake, tools/list returns expected set, tools/call returns a version string. Tagged as integration test if runtime exceeds 2s.","status":"closed","priority":1,"issue_type":"task","assignee":"Ole-Morten Duesund","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:16Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-27T14:28:39Z","started_at":"2026-04-27T14:10:04Z","closed_at":"2026-04-27T14:28:39Z","close_reason":"Bridge integration test passes against real forgejo-mcp 2.2.0: MCP handshake (initialize → notifications/initialized → tools/list → tools/call) round-trips through bridge cleanly. Fake Forgejo covers /api/v1/version and /api/v1/user probes. Phase 4 complete.","dependencies":[{"issue_id":"forgejo-mcp-broker-xot","depends_on_id":"forgejo-mcp-broker-am1","type":"blocks","created_at":"2026-04-24T17:45:28Z","created_by":"Ole-Morten Duesund","metadata":"{}"}],"dependency_count":1,"dependent_count":0,"comment_count":0} {"id":"forgejo-mcp-broker-31t","title":"Phase 3b: supervisor stress tests (FD/goroutine/zombie leak detection)","description":"1000 spawn/stop cycles under -race. Verify no FD leak, no goroutine leak (go.uber.org/goleak), no zombies (wait4 returns ECHILD when idle).","acceptance_criteria":"Cycle test passes under -race. FD count stable within a small constant. goleak detects no extra goroutines after test.","status":"closed","priority":1,"issue_type":"task","assignee":"Ole-Morten Duesund","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:15Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-27T14:04:42Z","started_at":"2026-04-27T12:00:32Z","closed_at":"2026-04-27T14:04:42Z","close_reason":"Stress tests in place: 1000-cycle spawn/reap and 200-cycle Stop both clean under -race; FD/goroutine/zombie deltas all single-digit. Driver: /bin/true and /bin/cat (helper-process recursion at scale exposed an unrelated Go pidfd interaction). Supervisor now defensively closes pipe handles post-Wait.","dependencies":[{"issue_id":"forgejo-mcp-broker-31t","depends_on_id":"forgejo-mcp-broker-zuq","type":"blocks","created_at":"2026-04-24T17:45:26Z","created_by":"Ole-Morten Duesund","metadata":"{}"}],"dependency_count":1,"dependent_count":0,"comment_count":0} {"id":"forgejo-mcp-broker-am1","title":"Phase 4a: internal/bridge JSON-RPC pipe + SSE writer","description":"Given a supervisor.Child: inbound HTTP JSON -\u003e newline-framed stdin; stdout lines -\u003e SSE frames. Handle client disconnect without killing the child.","acceptance_criteria":"Unit tests with mock Child that echoes: request/response round trip, multiple concurrent requests with correct id routing, client disconnect mid-stream.","status":"closed","priority":1,"issue_type":"task","assignee":"Ole-Morten Duesund","owner":"olemd@glemt.net","created_at":"2026-04-24T15:45:15Z","created_by":"Ole-Morten Duesund","updated_at":"2026-04-27T11:59:35Z","started_at":"2026-04-27T11:56:15Z","closed_at":"2026-04-27T11:59:35Z","close_reason":"Bridge shipped: per-id routing, SSE responses for request/reply messages, 204 for notifications, structured 4xx/5xx for malformed input. Decoupled from supervisor (takes pipes directly) for clean testing via io.Pipe. 90.0% coverage.","dependencies":[{"issue_id":"forgejo-mcp-broker-am1","depends_on_id":"forgejo-mcp-broker-zuq","type":"blocks","created_at":"2026-04-24T17:45:27Z","created_by":"Ole-Morten Duesund","metadata":"{}"}],"dependency_count":1,"dependent_count":2,"comment_count":0} diff --git a/internal/oauth/auth.go b/internal/oauth/auth.go index 3c5040e..b0bd700 100644 --- a/internal/oauth/auth.go +++ b/internal/oauth/auth.go @@ -42,6 +42,13 @@ func SessionFromContext(ctx context.Context) (*Session, bool) { return s, ok } +// ContextWithSession attaches a Session to ctx using the same key +// RequireBearer uses. Primarily useful in tests that want to drive a +// gated handler without standing up the full OAuth flow. +func ContextWithSession(ctx context.Context, s *Session) context.Context { + return context.WithValue(ctx, sessionCtxKey{}, s) +} + // RequireBearer is HTTP middleware that: // 1. Demands an `Authorization: Bearer ` header. // 2. Looks the token up by SHA-256 hash in access_tokens. diff --git a/internal/session/session.go b/internal/session/session.go new file mode 100644 index 0000000..f45e365 --- /dev/null +++ b/internal/session/session.go @@ -0,0 +1,235 @@ +// Package session is the broker's MCP session glue. It maps the +// `Mcp-Session-Id` header onto a running forgejo-mcp subprocess (managed +// by internal/supervisor) plus a bridge (internal/bridge) that pipes +// JSON-RPC traffic. +// +// One Registry handles all sessions. New session ids are minted on the +// first /mcp POST that arrives without a session header; subsequent +// requests with that header are dispatched to the same backend so a +// single user keeps the same forgejo-mcp child for the life of the +// session. +// +// The Registry knows how to spawn — it does not know how. Phase-5a tests +// inject fake SpawnFuncs to exercise the lifecycle without forking real +// processes. A production wiring lives in cmd/broker (phase 5c will +// finalise that). +package session + +import ( + "context" + "crypto/rand" + "encoding/hex" + "errors" + "log/slog" + "net/http" + "sync" + "sync/atomic" + "time" + + "kode.naiv.no/olemd/forgejo-mcp-broker/internal/oauth" +) + +// Backend is the runtime view of one forgejo-mcp subprocess plus its +// bridge. The Registry calls Handler.ServeHTTP for each /mcp request +// belonging to the session and Stop when the session is reaped. +type Backend struct { + Handler http.Handler + Stop func(ctx context.Context) error + Done <-chan struct{} +} + +// SpawnFunc constructs a Backend for the supplied OAuth session. The +// production implementation spawns forgejo-mcp via supervisor and wires +// a bridge; tests pass fakes. +type SpawnFunc func(ctx context.Context, sess *oauth.Session) (*Backend, error) + +// Config bundles the inputs to New. +type Config struct { + Spawn SpawnFunc + MaxSessions int // 0 means unlimited + Log *slog.Logger // optional; defaults to discard + Now func() time.Time // optional; defaults to time.Now +} + +// Registry tracks active sessions, dispatches requests to them, and tears +// them down on broker shutdown. Construct via New — the embedded sync.Map +// makes value copies unsafe. +type Registry struct { + spawn SpawnFunc + maxSessions int + log *slog.Logger + now func() time.Time + + sessions sync.Map // string sid → *entry + count atomic.Int32 +} + +type entry struct { + sid string + backend *Backend + lastActive atomic.Int64 // unix nanoseconds; bumped per request + oauthSess *oauth.Session +} + +// SessionIDHeader is the streamable-HTTP MCP header that ferries the +// session id between client and server. +const SessionIDHeader = "Mcp-Session-Id" + +// New validates the config and returns a Registry ready to serve. +func New(cfg Config) (*Registry, error) { + if cfg.Spawn == nil { + return nil, errors.New("session: Spawn is required") + } + log := cfg.Log + if log == nil { + log = slog.New(slog.DiscardHandler) + } + now := cfg.Now + if now == nil { + now = time.Now + } + return &Registry{ + spawn: cfg.Spawn, + maxSessions: cfg.MaxSessions, + log: log, + now: now, + }, nil +} + +// Handler returns an http.Handler suitable for mounting at /mcp. Wrap it +// with oauth.Authenticator.RequireBearer so every request carries a +// validated Session in its context. +func (r *Registry) Handler() http.Handler { + return http.HandlerFunc(r.serve) +} + +func (r *Registry) serve(w http.ResponseWriter, req *http.Request) { + oauthSess, ok := oauth.SessionFromContext(req.Context()) + if !ok { + // Mounted without RequireBearer; treat as a programmer error. + http.Error(w, "no auth session in context", http.StatusInternalServerError) + return + } + + sid := req.Header.Get(SessionIDHeader) + if sid == "" { + // No session id yet — this must be an `initialize`. Mint a session. + e, err := r.spawnSession(req.Context(), oauthSess) + if err != nil { + r.respondSpawnError(w, err) + return + } + w.Header().Set(SessionIDHeader, e.sid) + e.lastActive.Store(r.now().UnixNano()) + e.backend.Handler.ServeHTTP(w, req) + return + } + + // Lookup; reject unknown session ids with 410 Gone so the client + // re-initialises rather than retrying forever (per MCP guidance). + v, ok := r.sessions.Load(sid) + if !ok { + http.Error(w, "unknown or expired session", http.StatusGone) + return + } + e := v.(*entry) + if e.oauthSess.BrokerTokenHash != oauthSess.BrokerTokenHash { + // Session id is bound to the OAuth token that minted it. A + // different bearer probing a stolen sid gets 403 — not 401, so + // this is distinct from "your token is bad" and from "we don't + // know that sid" (410 above). Defence in depth. + http.Error(w, "session bound to a different token", http.StatusForbidden) + return + } + e.lastActive.Store(r.now().UnixNano()) + e.backend.Handler.ServeHTTP(w, req) +} + +func (r *Registry) spawnSession(ctx context.Context, oauthSess *oauth.Session) (*entry, error) { + if r.maxSessions > 0 && int(r.count.Load()) >= r.maxSessions { + return nil, errMaxSessions + } + + backend, err := r.spawn(ctx, oauthSess) + if err != nil { + return nil, err + } + + sid := newSessionID() + e := &entry{sid: sid, backend: backend, oauthSess: oauthSess} + e.lastActive.Store(r.now().UnixNano()) + + if _, loaded := r.sessions.LoadOrStore(sid, e); loaded { + // Astronomically unlikely (24-byte random collision); roll back. + _ = backend.Stop(ctx) + return nil, errors.New("session: id collision") + } + r.count.Add(1) + + // When the child exits on its own (crash, OOM, etc.), reap the entry. + go func() { + <-backend.Done + r.removeSession(sid) + }() + return e, nil +} + +func (r *Registry) removeSession(sid string) { + if _, ok := r.sessions.LoadAndDelete(sid); ok { + r.count.Add(-1) + } +} + +func (r *Registry) respondSpawnError(w http.ResponseWriter, err error) { + if errors.Is(err, errMaxSessions) { + w.Header().Set("Retry-After", "30") + http.Error(w, "broker at max sessions", http.StatusServiceUnavailable) + return + } + r.log.Error("session spawn failed", slog.String("err", err.Error())) + http.Error(w, "session spawn failed", http.StatusInternalServerError) +} + +// Active returns the number of currently-tracked sessions. Mostly for +// tests and metrics. +func (r *Registry) Active() int { return int(r.count.Load()) } + +// Stop tears down every active session. Used at broker shutdown so +// children don't leak past the parent. Best-effort: stop errors are +// logged but not aggregated. +func (r *Registry) Stop(ctx context.Context) { + r.sessions.Range(func(k, v any) bool { + e := v.(*entry) + if err := e.backend.Stop(ctx); err != nil { + r.log.Warn("session stop", slog.String("sid", e.sid), slog.String("err", err.Error())) + } + r.sessions.Delete(k) + r.count.Add(-1) + return true + }) +} + +// LastActive returns the last-active wall-clock time for sid, or the +// zero time if no such session exists. Phase 5c's reaper uses this to +// evict idle sessions. +func (r *Registry) LastActive(sid string) time.Time { + v, ok := r.sessions.Load(sid) + if !ok { + return time.Time{} + } + return time.Unix(0, v.(*entry).lastActive.Load()) +} + +// errMaxSessions signals the cap was hit. Internal sentinel only — +// callers see a 503. +var errMaxSessions = errors.New("session: max sessions reached") + +// newSessionID returns a hex-encoded 24-byte random id (48 hex chars, +// 192 bits of entropy). Plenty for global uniqueness across a broker. +func newSessionID() string { + b := make([]byte, 24) + if _, err := rand.Read(b); err != nil { + panic("session: crypto/rand failed: " + err.Error()) + } + return hex.EncodeToString(b) +} diff --git a/internal/session/session_test.go b/internal/session/session_test.go new file mode 100644 index 0000000..763636e --- /dev/null +++ b/internal/session/session_test.go @@ -0,0 +1,345 @@ +package session_test + +import ( + "context" + "fmt" + "io" + "net/http" + "net/http/httptest" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + brokerlog "kode.naiv.no/olemd/forgejo-mcp-broker/internal/log" + "kode.naiv.no/olemd/forgejo-mcp-broker/internal/oauth" + "kode.naiv.no/olemd/forgejo-mcp-broker/internal/session" +) + +// fakeBackend is a controllable session.Backend used by all tests in this +// package. It records every Handler invocation and exposes a Done channel +// the tests can close to simulate the child exiting. +type fakeBackend struct { + id int + handler func(w http.ResponseWriter, r *http.Request) + done chan struct{} + stopErr error + stopped atomic.Bool + requests atomic.Int32 +} + +func newFakeBackend(id int) *fakeBackend { + return &fakeBackend{id: id, done: make(chan struct{})} +} + +func (f *fakeBackend) backend() *session.Backend { + return &session.Backend{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + f.requests.Add(1) + if f.handler != nil { + f.handler(w, r) + return + } + fmt.Fprintf(w, "backend-%d-served", f.id) + }), + Stop: func(ctx context.Context) error { + f.stopped.Store(true) + select { + case <-f.done: + default: + close(f.done) + } + return f.stopErr + }, + Done: f.done, + } +} + +// fakeSpawner returns a SpawnFunc that hands out a sequence of fakeBackends. +// The returned slice is appended to as Spawn is called, so tests can +// inspect every backend that was minted. +func fakeSpawner(t *testing.T) (session.SpawnFunc, *[]*fakeBackend) { + t.Helper() + var ( + mu sync.Mutex + backends []*fakeBackend + next int + ) + spawn := func(ctx context.Context, sess *oauth.Session) (*session.Backend, error) { + mu.Lock() + defer mu.Unlock() + fb := newFakeBackend(next) + next++ + backends = append(backends, fb) + return fb.backend(), nil + } + return spawn, &backends +} + +// testBearerHeader carries a bearer-hash discriminator across the wire so +// the test server can swap in the right oauth.Session per request. We +// can't propagate context from client to server through net/http, so this +// header substitutes for what RequireBearer would otherwise inject. +const testBearerHeader = "X-Test-Bearer-Hash" + +// newTestServer wraps the Registry handler with a tiny middleware that +// reads testBearerHeader and attaches a matching oauth.Session to the +// request context. Without the header (i.e. simulating no auth), the +// registry sees a context without a session and returns 500 — exactly +// the production behaviour for unauthenticated /mcp traffic. +func newTestServer(t *testing.T, r *session.Registry) *httptest.Server { + t.Helper() + handler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + if hash := req.Header.Get(testBearerHeader); hash != "" { + ctx := oauth.ContextWithSession(req.Context(), bearerSess(hash)) + req = req.WithContext(ctx) + } + r.Handler().ServeHTTP(w, req) + }) + srv := httptest.NewServer(handler) + t.Cleanup(srv.Close) + return srv +} + +// helper used at the package level so tests don't have to construct +// oauth.Session manually each time. +func bearerSess(hash string) *oauth.Session { + return &oauth.Session{ + ClientID: "client-" + hash, + ForgejoUsername: "user-" + hash, + BrokerTokenHash: hash, + ForgejoToken: "fj-token-" + hash, + Scopes: "read:user", + } +} + +func TestServe_NewSession_MintsSidAndDispatches(t *testing.T) { + spawn, backends := fakeSpawner(t) + r, err := session.New(session.Config{Spawn: spawn, Log: brokerlog.Discard()}) + if err != nil { + t.Fatalf("New: %v", err) + } + + srv := newTestServer(t, r) + + resp := doReq(t, srv.URL, "", bearerSess("hash-A"), `{"jsonrpc":"2.0","id":1,"method":"initialize"}`) + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + t.Fatalf("status = %d, want 200; body: %s", resp.StatusCode, body) + } + sid := resp.Header.Get(session.SessionIDHeader) + if sid == "" { + t.Error("response missing Mcp-Session-Id header") + } + if r.Active() != 1 { + t.Errorf("Active() = %d, want 1", r.Active()) + } + if len(*backends) != 1 || (*backends)[0].requests.Load() != 1 { + t.Errorf("backend was not invoked exactly once: %+v", *backends) + } +} + +func TestServe_KnownSid_ReusesBackend(t *testing.T) { + spawn, backends := fakeSpawner(t) + r, _ := session.New(session.Config{Spawn: spawn, Log: brokerlog.Discard()}) + srv := newTestServer(t, r) + + bearer := bearerSess("hash-B") + + resp1 := doReq(t, srv.URL, "", bearer, `{"jsonrpc":"2.0","id":1,"method":"initialize"}`) + resp1.Body.Close() + sid := resp1.Header.Get(session.SessionIDHeader) + + resp2 := doReq(t, srv.URL, sid, bearer, `{"jsonrpc":"2.0","id":2,"method":"tools/list"}`) + resp2.Body.Close() + if resp2.StatusCode != http.StatusOK { + t.Errorf("second request status = %d, want 200", resp2.StatusCode) + } + + if r.Active() != 1 { + t.Errorf("Active() = %d, want 1 (reuse, not spawn)", r.Active()) + } + if len(*backends) != 1 { + t.Errorf("Spawn called %d times, want 1", len(*backends)) + } + if (*backends)[0].requests.Load() != 2 { + t.Errorf("backend.requests = %d, want 2", (*backends)[0].requests.Load()) + } +} + +func TestServe_UnknownSid_410(t *testing.T) { + spawn, _ := fakeSpawner(t) + r, _ := session.New(session.Config{Spawn: spawn, Log: brokerlog.Discard()}) + srv := newTestServer(t, r) + + resp := doReq(t, srv.URL, "definitely-not-a-real-sid", bearerSess("hash-C"), `{}`) + resp.Body.Close() + if resp.StatusCode != http.StatusGone { + t.Errorf("status = %d, want 410", resp.StatusCode) + } +} + +func TestServe_TokenMismatch_403(t *testing.T) { + // Two different bearer hashes for the same sid: only the original + // owner can access. + spawn, _ := fakeSpawner(t) + r, _ := session.New(session.Config{Spawn: spawn, Log: brokerlog.Discard()}) + srv := newTestServer(t, r) + + first := doReq(t, srv.URL, "", bearerSess("alice"), `{"jsonrpc":"2.0","id":1,"method":"initialize"}`) + first.Body.Close() + sid := first.Header.Get(session.SessionIDHeader) + + hijack := doReq(t, srv.URL, sid, bearerSess("eve"), `{"jsonrpc":"2.0","id":2}`) + hijack.Body.Close() + if hijack.StatusCode != http.StatusForbidden { + t.Errorf("status = %d, want 403", hijack.StatusCode) + } +} + +func TestServe_NoAuthSessionInContext_500(t *testing.T) { + // Calling /mcp without going through RequireBearer first is a + // programmer error; the registry surfaces it loudly rather than + // silently spawning an unauthenticated session. + spawn, _ := fakeSpawner(t) + r, _ := session.New(session.Config{Spawn: spawn, Log: brokerlog.Discard()}) + srv := newTestServer(t, r) + + resp, err := http.Post(srv.URL, "application/json", strings.NewReader(`{}`)) + if err != nil { + t.Fatalf("post: %v", err) + } + resp.Body.Close() + if resp.StatusCode != http.StatusInternalServerError { + t.Errorf("status = %d, want 500", resp.StatusCode) + } +} + +func TestServe_MaxSessionsCap(t *testing.T) { + spawn, _ := fakeSpawner(t) + r, _ := session.New(session.Config{ + Spawn: spawn, Log: brokerlog.Discard(), MaxSessions: 2, + }) + srv := newTestServer(t, r) + + // Two sessions allowed. + for i, hash := range []string{"a", "b"} { + resp := doReq(t, srv.URL, "", bearerSess(hash), `{"jsonrpc":"2.0","id":1,"method":"initialize"}`) + resp.Body.Close() + if resp.StatusCode != http.StatusOK { + t.Fatalf("session %d: status = %d", i, resp.StatusCode) + } + } + // Third hits the cap. + resp := doReq(t, srv.URL, "", bearerSess("c"), `{"jsonrpc":"2.0","id":1,"method":"initialize"}`) + resp.Body.Close() + if resp.StatusCode != http.StatusServiceUnavailable { + t.Errorf("third session status = %d, want 503", resp.StatusCode) + } + if resp.Header.Get("Retry-After") == "" { + t.Error("503 response should include Retry-After") + } + if r.Active() != 2 { + t.Errorf("Active() = %d, want 2", r.Active()) + } +} + +func TestServe_BackendDone_RemovesSession(t *testing.T) { + // When the child exits on its own (Done closes), the registry should + // reap the entry so subsequent traffic with that sid gets 410. + spawn, backends := fakeSpawner(t) + r, _ := session.New(session.Config{Spawn: spawn, Log: brokerlog.Discard()}) + srv := newTestServer(t, r) + + bearer := bearerSess("crashed") + first := doReq(t, srv.URL, "", bearer, `{"jsonrpc":"2.0","id":1,"method":"initialize"}`) + first.Body.Close() + sid := first.Header.Get(session.SessionIDHeader) + + // Simulate the child exiting. + close((*backends)[0].done) + + // Wait for the reaper goroutine — poll Active() rather than add a + // special hook to the production type. + if !waitForActive(r, 0, 2*time.Second) { + t.Fatalf("session count never dropped to 0") + } + + resp := doReq(t, srv.URL, sid, bearer, `{}`) + resp.Body.Close() + if resp.StatusCode != http.StatusGone { + t.Errorf("after backend.Done, status = %d, want 410", resp.StatusCode) + } +} + +func TestStop_TearsDownAllSessions(t *testing.T) { + spawn, backends := fakeSpawner(t) + r, _ := session.New(session.Config{Spawn: spawn, Log: brokerlog.Discard()}) + srv := newTestServer(t, r) + + for _, hash := range []string{"x", "y", "z"} { + resp := doReq(t, srv.URL, "", bearerSess(hash), `{}`) + resp.Body.Close() + } + if r.Active() != 3 { + t.Fatalf("Active = %d, want 3", r.Active()) + } + + r.Stop(context.Background()) + if r.Active() != 0 { + t.Errorf("Active after Stop = %d, want 0", r.Active()) + } + for _, b := range *backends { + if !b.stopped.Load() { + t.Errorf("backend %d not stopped", b.id) + } + } +} + +func TestNew_RequiresSpawn(t *testing.T) { + _, err := session.New(session.Config{Log: brokerlog.Discard()}) + if err == nil || !strings.Contains(err.Error(), "Spawn") { + t.Errorf("want Spawn-required error, got %v", err) + } +} + +// doReq POSTs to the test server with an optional session id and a +// pseudo-bearer hash that the test middleware translates into an +// oauth.Session. Pass an empty hash to simulate an unauthenticated +// request. +func doReq(t *testing.T, url, sid string, sess *oauth.Session, body string) *http.Response { + t.Helper() + req, err := http.NewRequestWithContext(t.Context(), http.MethodPost, url, strings.NewReader(body)) + if err != nil { + t.Fatalf("new request: %v", err) + } + req.Header.Set("Content-Type", "application/json") + if sid != "" { + req.Header.Set(session.SessionIDHeader, sid) + } + if sess != nil { + req.Header.Set(testBearerHeader, sess.BrokerTokenHash) + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("do: %v", err) + } + return resp +} + +// waitForActive polls r.Active() until it equals target or the deadline +// expires. Returns true on a hit. Used by the backend-Done test to give +// the registry's reaper goroutine time to run. +func waitForActive(r *session.Registry, target int, within time.Duration) bool { + deadline := time.Now().Add(within) + for time.Now().Before(deadline) { + if r.Active() == target { + return true + } + time.Sleep(5 * time.Millisecond) + } + return false +}