cmd/broker/main.go now composes every phase-2-5 component into a live
binary:
/healthz → internal/httpserver
/oauth/* → internal/oauth.Server.Handler()
/.well-known → internal/oauth.Server.Handler()
/mcp → oauth.Authenticator.RequireBearer
over session.Registry.Handler()
The SpawnFunc passed to the registry composes supervisor + bridge: each
new MCP session forks `forgejo-mcp --transport stdio` with the user's
upstream token in env, wraps stdio with a bridge, and returns the
bridge's HandleSSE as the per-session http.Handler. The reaper is wired
with a refresh callback that calls forgejo.Client.Refresh and persists
rotated tokens back to access_tokens before the rotator swaps the
session's child.
cmd/broker/e2e_test.go is the gating local validation: builds the
binary, builds forgejo-mcp from the sibling repo (skipped if absent),
stands up a fake Forgejo, runs the broker, and walks
register → authorize → callback → token → /mcp initialize → tools/list.
This catches:
- any component left unwired
- the subprocess-context bug fixed in this commit (using a request
context in supervisor.Start kills the child when the request that
minted it returns; the fix is a long-lived childCtx)
- the happy-path Mcp-Session-Id mint+reuse cycle that unit tests
can't exercise without a real subprocess
docs/phase7-findings.md documents both the local automated validation
(this test) and the manual Claude.ai-side checklist (OAuth completes,
tool discovery, tool invocation, session reuse, idle reap, mid-session
token refresh, revocation). The Claude.ai half is fundamentally manual
and stays that way; the automated test catches the broker bugs that
would otherwise hide behind operator setup mistakes.
Closes forgejo-mcp-broker-q6n. Phase 7 — and the project's primary
implementation track — complete.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
267 lines
8.6 KiB
Go
267 lines
8.6 KiB
Go
// Command fjmcp-broker is an OAuth 2.1 authorization server and MCP session
|
|
// broker that fronts forgejo-mcp. See ../../README.md and ../../docs/ for
|
|
// the design.
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
"time"
|
|
|
|
"kode.naiv.no/olemd/forgejo-mcp-broker/internal/bridge"
|
|
"kode.naiv.no/olemd/forgejo-mcp-broker/internal/buildinfo"
|
|
"kode.naiv.no/olemd/forgejo-mcp-broker/internal/config"
|
|
"kode.naiv.no/olemd/forgejo-mcp-broker/internal/forgejo"
|
|
"kode.naiv.no/olemd/forgejo-mcp-broker/internal/httpserver"
|
|
brokerlog "kode.naiv.no/olemd/forgejo-mcp-broker/internal/log"
|
|
"kode.naiv.no/olemd/forgejo-mcp-broker/internal/oauth"
|
|
"kode.naiv.no/olemd/forgejo-mcp-broker/internal/session"
|
|
"kode.naiv.no/olemd/forgejo-mcp-broker/internal/store"
|
|
"kode.naiv.no/olemd/forgejo-mcp-broker/internal/supervisor"
|
|
)
|
|
|
|
// Exit codes follow the usual convention: 0 success, 2 config/usage, 1 runtime.
|
|
const (
|
|
exitSuccess = 0
|
|
exitRuntime = 1
|
|
exitConfig = 2
|
|
)
|
|
|
|
func main() {
|
|
os.Exit(run(os.Args[1:], os.Stderr))
|
|
}
|
|
|
|
// run is the testable entry point. Parses config, wires dependencies, and
|
|
// blocks until the HTTP server exits or a shutdown signal arrives. Returns
|
|
// an OS exit code.
|
|
func run(args []string, out io.Writer) int {
|
|
// --version is handled before config.Load so operators can inspect a
|
|
// binary without providing the rest of the required config.
|
|
for _, a := range args {
|
|
if a == "--version" || a == "-version" {
|
|
fmt.Fprintf(out, "fjmcp-broker %s (rev %s, built %s)\n",
|
|
buildinfo.Version, buildinfo.GitRevision, buildinfo.BuildDate)
|
|
return exitSuccess
|
|
}
|
|
}
|
|
|
|
cfg, err := config.Load(args, out)
|
|
switch {
|
|
case errors.Is(err, flag.ErrHelp):
|
|
return exitSuccess
|
|
case err != nil:
|
|
fmt.Fprintln(out, "fjmcp-broker: config error:")
|
|
fmt.Fprintln(out, err.Error())
|
|
return exitConfig
|
|
}
|
|
|
|
logger := brokerlog.New(out, cfg.Debug)
|
|
logger.Info("starting broker",
|
|
"listen", cfg.Listen,
|
|
"public_url", cfg.PublicURL,
|
|
"forgejo_url", cfg.ForgejoURL,
|
|
"store_path", cfg.StorePath,
|
|
"max_sessions", cfg.MaxSessions,
|
|
"idle_timeout", cfg.SessionIdleTimeout.String(),
|
|
)
|
|
|
|
// Signal handling is owned here; the HTTP server just responds to ctx
|
|
// cancellation. This keeps internal/httpserver free of signal coupling
|
|
// and makes it testable without any OS-level wiring.
|
|
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
|
defer stop()
|
|
|
|
st, err := store.Open(ctx, cfg.StorePath)
|
|
if err != nil {
|
|
logger.Error("open store", "err", err.Error())
|
|
return exitRuntime
|
|
}
|
|
defer func() {
|
|
if err := st.Close(); err != nil {
|
|
logger.Error("close store", "err", err.Error())
|
|
}
|
|
}()
|
|
|
|
// childCtx outlives any one HTTP request — it's the broker-process-
|
|
// scoped context that supervisor.Start will associate with each
|
|
// spawned forgejo-mcp. Canceling it on shutdown is what tears the
|
|
// whole subprocess tree down. (Using a request context here would
|
|
// kill children when their /mcp request returns — exec.CommandContext
|
|
// ties process lifetime to the ctx.)
|
|
childCtx, cancelChildren := context.WithCancel(context.Background())
|
|
defer cancelChildren()
|
|
|
|
// Build the OAuth + MCP-session stack as a single http.Handler that we
|
|
// pass to the http server as ExtraHandler. /healthz keeps living in
|
|
// httpserver itself.
|
|
mux, sessReg, stopReaper, err := buildHandlers(cfg, logger, st, childCtx)
|
|
if err != nil {
|
|
logger.Error("wire handlers", "err", err.Error())
|
|
return exitRuntime
|
|
}
|
|
defer stopReaper()
|
|
defer sessReg.Stop(context.Background())
|
|
|
|
srv := &httpserver.Server{
|
|
Addr: cfg.Listen,
|
|
Log: logger,
|
|
Store: st,
|
|
ExtraHandler: mux,
|
|
}
|
|
if err := srv.Run(ctx); err != nil {
|
|
logger.Error("server exit", "err", err.Error())
|
|
return exitRuntime
|
|
}
|
|
logger.Info("broker stopped")
|
|
return exitSuccess
|
|
}
|
|
|
|
// buildHandlers assembles the OAuth server, the bearer-gated /mcp endpoint,
|
|
// the session registry, and the reaper into one http.Handler. The returned
|
|
// stopReaper must be called at shutdown. childCtx is a long-lived context
|
|
// used to spawn forgejo-mcp subprocesses — it must outlive any single
|
|
// /mcp request, otherwise exec.CommandContext kills the child on
|
|
// request completion.
|
|
func buildHandlers(
|
|
cfg *config.Config,
|
|
logger *slog.Logger,
|
|
st *store.Store,
|
|
childCtx context.Context,
|
|
) (http.Handler, *session.Registry, func(), error) {
|
|
fjClient, err := forgejo.NewClient(forgejo.ClientConfig{
|
|
BaseURL: cfg.ForgejoURL,
|
|
ClientID: cfg.ForgejoOAuthClientID,
|
|
ClientSecret: cfg.ForgejoOAuthClientSecret,
|
|
UserAgent: "fjmcp-broker/" + buildinfo.Version,
|
|
})
|
|
if err != nil {
|
|
return nil, nil, nil, fmt.Errorf("forgejo client: %w", err)
|
|
}
|
|
|
|
oauthSrv, err := oauth.NewServer(oauth.Config{
|
|
Store: st,
|
|
Forgejo: fjClient,
|
|
Issuer: cfg.PublicURL,
|
|
Scopes: cfg.ForgejoOAuthScopes,
|
|
Log: logger.With("component", "oauth"),
|
|
})
|
|
if err != nil {
|
|
return nil, nil, nil, fmt.Errorf("oauth server: %w", err)
|
|
}
|
|
|
|
auth := &oauth.Authenticator{Store: st}
|
|
|
|
spawn := func(_ context.Context, sess *oauth.Session) (*session.Backend, error) {
|
|
// Deliberately ignore the per-request context the registry hands
|
|
// us; pass childCtx so the spawned forgejo-mcp survives once the
|
|
// HTTP request that triggered the spawn returns.
|
|
return spawnForgejoMCP(childCtx, cfg, logger, sess)
|
|
}
|
|
|
|
sessReg, err := session.New(session.Config{
|
|
Spawn: spawn,
|
|
MaxSessions: cfg.MaxSessions,
|
|
Log: logger.With("component", "session"),
|
|
})
|
|
if err != nil {
|
|
return nil, nil, nil, fmt.Errorf("session registry: %w", err)
|
|
}
|
|
|
|
stopReaper := sessReg.StartReaper(session.ReaperConfig{
|
|
IdleTimeout: cfg.SessionIdleTimeout,
|
|
RefreshForgejo: refreshFunc(fjClient, st, logger),
|
|
Respawn: spawn,
|
|
})
|
|
|
|
mux := http.NewServeMux()
|
|
// Mount OAuth + discovery at the root: /oauth/*, /.well-known/*.
|
|
// oauth.Server.Handler() declares method-routed patterns, so it
|
|
// composes safely with the gated /mcp below.
|
|
oauthHandler := oauthSrv.Handler()
|
|
mux.Handle("/oauth/", oauthHandler)
|
|
mux.Handle("/.well-known/", oauthHandler)
|
|
// Gated MCP endpoint.
|
|
mux.Handle("POST /mcp", auth.RequireBearer(sessReg.Handler()))
|
|
mux.Handle("GET /mcp", auth.RequireBearer(sessReg.Handler()))
|
|
|
|
return mux, sessReg, stopReaper, nil
|
|
}
|
|
|
|
// spawnForgejoMCP constructs a session.Backend by launching forgejo-mcp
|
|
// under the supervisor and wrapping its stdio with a bridge. The user's
|
|
// upstream Forgejo token is injected via FORGEJO_ACCESS_TOKEN — that's
|
|
// what makes per-session token isolation work.
|
|
func spawnForgejoMCP(
|
|
ctx context.Context,
|
|
cfg *config.Config,
|
|
logger *slog.Logger,
|
|
sess *oauth.Session,
|
|
) (*session.Backend, error) {
|
|
childLog := logger.With(
|
|
"component", "forgejo-mcp",
|
|
"user", sess.ForgejoUsername,
|
|
)
|
|
child, err := supervisor.Start(ctx, supervisor.Config{
|
|
Cmd: []string{cfg.ForgejoMCPBinary, "--transport", "stdio", "--url", cfg.ForgejoURL},
|
|
Env: []string{
|
|
"FORGEJO_ACCESS_TOKEN=" + sess.ForgejoToken,
|
|
"FORGEJO_USER_AGENT=fjmcp-broker/" + buildinfo.Version,
|
|
},
|
|
OnStderr: func(line string) {
|
|
childLog.Debug("stderr", "line", line)
|
|
},
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
br := bridge.New(child.Stdin, child.Stdout, child.Done(), childLog)
|
|
br.Start()
|
|
|
|
return &session.Backend{
|
|
Handler: http.HandlerFunc(br.HandleSSE),
|
|
Stop: child.Stop,
|
|
Done: child.Done(),
|
|
}, nil
|
|
}
|
|
|
|
// refreshFunc adapts the Forgejo OAuth client to the
|
|
// session.ReaperConfig.RefreshForgejo signature: refresh upstream tokens
|
|
// AND persist the new tokens back to the access_tokens row keyed by the
|
|
// session's broker-token hash.
|
|
func refreshFunc(
|
|
fj *forgejo.Client,
|
|
st *store.Store,
|
|
logger *slog.Logger,
|
|
) func(context.Context, *oauth.Session) (string, string, time.Time, error) {
|
|
return func(ctx context.Context, sess *oauth.Session) (string, string, time.Time, error) {
|
|
tok, err := fj.Refresh(ctx, sess.ForgejoRefresh)
|
|
if err != nil {
|
|
return "", "", time.Time{}, fmt.Errorf("forgejo refresh: %w", err)
|
|
}
|
|
expiresAt := time.Now().Add(time.Duration(tok.ExpiresIn) * time.Second)
|
|
_, dbErr := st.DB().ExecContext(ctx,
|
|
`UPDATE access_tokens
|
|
SET forgejo_access_token = ?,
|
|
forgejo_refresh_token = ?,
|
|
forgejo_token_expires_at = ?
|
|
WHERE token_hash = ? AND revoked_at IS NULL`,
|
|
tok.AccessToken, tok.RefreshToken, expiresAt.Unix(), sess.BrokerTokenHash)
|
|
if dbErr != nil && !errors.Is(dbErr, sql.ErrNoRows) {
|
|
logger.Warn("persist refreshed forgejo token", "err", dbErr.Error())
|
|
// Persist failure is non-fatal — the rotator can still
|
|
// hand the new token to the respawned child; the next
|
|
// rotation cycle will retry the persist.
|
|
}
|
|
return tok.AccessToken, tok.RefreshToken, expiresAt, nil
|
|
}
|
|
}
|