feat(httpserver,log): /healthz, graceful shutdown, slog constructor
Implements internal/httpserver and internal/log. httpserver (forgejo-mcp-broker-8ei): - Server struct owns the HTTP lifecycle; Run(ctx) blocks, Handler() returns the composed handler for unit tests - GET /healthz returns JSON with status, version, git_revision, build_date, and store probe result. Returns 503 when the store reports unhealthy - Signal handling delegated to the caller via ctx cancellation — main wires signal.NotifyContext, httpserver just responds to Done() - Graceful shutdown with a configurable deadline (default 10s). When the deadline expires, falls back to http.Server.Close() so lingering connections are forcibly terminated — http.Server.Shutdown alone never interrupts active connections - ExtraHandler extension point for the OAuth + MCP routes that land in phase 2 and phase 5, so the server doesn't need to be re-plumbed later log: - Small slog wrapper: New(w, debug) returns a JSON logger that stamps every record with service/version/git_rev for correlation across deployments - Discard() helper for tests Tests: 97.9% coverage on httpserver (all health states, wrong-method, ExtraHandler dispatch, ctx-cancel shutdown, shutdown-deadline force-close of hanging requests, missing-field errors), 100% on log. Closes forgejo-mcp-broker-8ei. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
df2253398b
commit
36722940eb
7 changed files with 506 additions and 14 deletions
|
|
@ -1,6 +0,0 @@
|
|||
// Package httpserver hosts the broker's HTTP surface: OAuth endpoints, the
|
||||
// gated MCP endpoint, and /healthz. Owns an *http.Server with graceful
|
||||
// shutdown on SIGTERM / SIGINT.
|
||||
//
|
||||
// Implementation lands in forgejo-mcp-broker-8ei.
|
||||
package httpserver
|
||||
164
internal/httpserver/httpserver.go
Normal file
164
internal/httpserver/httpserver.go
Normal file
|
|
@ -0,0 +1,164 @@
|
|||
// Package httpserver hosts the broker's HTTP surface. In phase 1 that's just
|
||||
// /healthz; OAuth endpoints and the gated MCP endpoint land in later phases.
|
||||
//
|
||||
// The package owns an *http.Server and its lifecycle. Signal handling lives
|
||||
// in main: the caller passes a context that is canceled on SIGTERM/SIGINT,
|
||||
// and Run initiates a graceful shutdown with a bounded deadline.
|
||||
package httpserver
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"kode.naiv.no/olemd/forgejo-mcp-broker/internal/buildinfo"
|
||||
)
|
||||
|
||||
// DefaultShutdownTimeout is the graceful-shutdown deadline used when
|
||||
// Server.ShutdownTimeout is zero.
|
||||
const DefaultShutdownTimeout = 10 * time.Second
|
||||
|
||||
// Pinger reports whether a dependency is still reachable. The store
|
||||
// implements this; other backends can too.
|
||||
type Pinger interface {
|
||||
Ping(ctx context.Context) error
|
||||
}
|
||||
|
||||
// Server is the broker's HTTP front end. It composes a few well-known
|
||||
// handlers (/healthz today; OAuth and MCP in later phases) with an optional
|
||||
// ExtraHandler for routes the server doesn't own natively.
|
||||
type Server struct {
|
||||
// Addr is the TCP listen address (e.g. ":8080"). Required.
|
||||
Addr string
|
||||
|
||||
// Log is the structured logger used for lifecycle and request events.
|
||||
// Must not be nil.
|
||||
Log *slog.Logger
|
||||
|
||||
// Store is probed by /healthz. nil means "not configured" (health still
|
||||
// reports 200 but marks store as unconfigured).
|
||||
Store Pinger
|
||||
|
||||
// ExtraHandler, if non-nil, receives any request /healthz does not match.
|
||||
// This is the extension point later phases use to add OAuth and MCP
|
||||
// routes without forking the server.
|
||||
ExtraHandler http.Handler
|
||||
|
||||
// ShutdownTimeout bounds how long Run will wait for in-flight requests
|
||||
// during graceful shutdown. Zero means DefaultShutdownTimeout.
|
||||
ShutdownTimeout time.Duration
|
||||
}
|
||||
|
||||
// Handler returns the composed HTTP handler without any listening setup.
|
||||
// Useful for handler-level tests via httptest.NewRecorder.
|
||||
func (s *Server) Handler() http.Handler {
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("GET /healthz", s.handleHealth)
|
||||
if s.ExtraHandler != nil {
|
||||
// ServeMux treats "/" as the catch-all pattern; specific patterns
|
||||
// (like "GET /healthz") take precedence in Go 1.22+ routing.
|
||||
mux.Handle("/", s.ExtraHandler)
|
||||
}
|
||||
return mux
|
||||
}
|
||||
|
||||
// Run starts the HTTP server and blocks until ctx is canceled or the server
|
||||
// stops on its own. On ctx cancellation, initiates graceful shutdown with
|
||||
// ShutdownTimeout as the deadline.
|
||||
func (s *Server) Run(ctx context.Context) error {
|
||||
if s.Log == nil {
|
||||
return errors.New("httpserver: Log is required")
|
||||
}
|
||||
if s.Addr == "" {
|
||||
return errors.New("httpserver: Addr is required")
|
||||
}
|
||||
|
||||
srv := &http.Server{
|
||||
Addr: s.Addr,
|
||||
Handler: s.Handler(),
|
||||
ReadHeaderTimeout: 10 * time.Second,
|
||||
}
|
||||
|
||||
serveErr := make(chan error, 1)
|
||||
go func() {
|
||||
s.Log.Info("server listening", slog.String("addr", s.Addr))
|
||||
err := srv.ListenAndServe()
|
||||
if errors.Is(err, http.ErrServerClosed) {
|
||||
err = nil
|
||||
}
|
||||
serveErr <- err
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-serveErr:
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
s.Log.Info("shutdown initiated", slog.String("cause", ctx.Err().Error()))
|
||||
}
|
||||
|
||||
timeout := s.ShutdownTimeout
|
||||
if timeout <= 0 {
|
||||
timeout = DefaultShutdownTimeout
|
||||
}
|
||||
shutdownCtx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
|
||||
shutdownErr := srv.Shutdown(shutdownCtx)
|
||||
if shutdownErr != nil {
|
||||
// Graceful shutdown timed out. http.Server.Shutdown does not
|
||||
// interrupt active connections on its own — Close forces the
|
||||
// sockets closed, which cancels each in-flight request's context
|
||||
// and lets handlers observe the termination via r.Context().Done().
|
||||
s.Log.Error("graceful shutdown exceeded deadline; forcing close",
|
||||
slog.Duration("deadline", timeout),
|
||||
slog.String("err", shutdownErr.Error()))
|
||||
_ = srv.Close()
|
||||
}
|
||||
// Wait for ListenAndServe's goroutine to exit so we don't leak it.
|
||||
<-serveErr
|
||||
|
||||
if shutdownErr != nil {
|
||||
return fmt.Errorf("shutdown: %w", shutdownErr)
|
||||
}
|
||||
s.Log.Info("server stopped")
|
||||
return nil
|
||||
}
|
||||
|
||||
type healthResponse struct {
|
||||
Status string `json:"status"`
|
||||
Version string `json:"version"`
|
||||
GitRevision string `json:"git_revision"`
|
||||
BuildDate string `json:"build_date"`
|
||||
Store string `json:"store"`
|
||||
}
|
||||
|
||||
func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
|
||||
resp := healthResponse{
|
||||
Status: "ok",
|
||||
Version: buildinfo.Version,
|
||||
GitRevision: buildinfo.GitRevision,
|
||||
BuildDate: buildinfo.BuildDate,
|
||||
Store: "not configured",
|
||||
}
|
||||
status := http.StatusOK
|
||||
|
||||
if s.Store != nil {
|
||||
pingCtx, cancel := context.WithTimeout(r.Context(), 2*time.Second)
|
||||
defer cancel()
|
||||
if err := s.Store.Ping(pingCtx); err != nil {
|
||||
resp.Status = "degraded"
|
||||
resp.Store = "error: " + err.Error()
|
||||
status = http.StatusServiceUnavailable
|
||||
} else {
|
||||
resp.Store = "ok"
|
||||
}
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(status)
|
||||
_ = json.NewEncoder(w).Encode(resp)
|
||||
}
|
||||
253
internal/httpserver/httpserver_test.go
Normal file
253
internal/httpserver/httpserver_test.go
Normal file
|
|
@ -0,0 +1,253 @@
|
|||
package httpserver_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"kode.naiv.no/olemd/forgejo-mcp-broker/internal/httpserver"
|
||||
brokerlog "kode.naiv.no/olemd/forgejo-mcp-broker/internal/log"
|
||||
)
|
||||
|
||||
// fakePinger implements httpserver.Pinger for /healthz tests.
|
||||
type fakePinger struct{ err error }
|
||||
|
||||
func (f *fakePinger) Ping(context.Context) error { return f.err }
|
||||
|
||||
func TestHealth_OK(t *testing.T) {
|
||||
s := &httpserver.Server{Log: brokerlog.Discard(), Store: &fakePinger{}}
|
||||
req := httptest.NewRequest(http.MethodGet, "/healthz", nil)
|
||||
w := httptest.NewRecorder()
|
||||
s.Handler().ServeHTTP(w, req)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("status = %d, want %d", w.Code, http.StatusOK)
|
||||
}
|
||||
var resp map[string]string
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("body not JSON: %v", err)
|
||||
}
|
||||
for _, k := range []string{"status", "version", "git_revision", "build_date", "store"} {
|
||||
if resp[k] == "" {
|
||||
t.Errorf("healthz response missing field %q: %v", k, resp)
|
||||
}
|
||||
}
|
||||
if resp["status"] != "ok" {
|
||||
t.Errorf("status = %q, want ok", resp["status"])
|
||||
}
|
||||
if resp["store"] != "ok" {
|
||||
t.Errorf("store = %q, want ok", resp["store"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestHealth_DegradedOnStoreFailure(t *testing.T) {
|
||||
s := &httpserver.Server{Log: brokerlog.Discard(), Store: &fakePinger{err: errors.New("boom")}}
|
||||
w := httptest.NewRecorder()
|
||||
s.Handler().ServeHTTP(w, httptest.NewRequest(http.MethodGet, "/healthz", nil))
|
||||
|
||||
if w.Code != http.StatusServiceUnavailable {
|
||||
t.Errorf("status = %d, want %d", w.Code, http.StatusServiceUnavailable)
|
||||
}
|
||||
if !strings.Contains(w.Body.String(), "degraded") {
|
||||
t.Errorf("body should mark status as degraded: %s", w.Body.String())
|
||||
}
|
||||
if !strings.Contains(w.Body.String(), "boom") {
|
||||
t.Errorf("body should include underlying error: %s", w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestHealth_NoStoreConfigured(t *testing.T) {
|
||||
s := &httpserver.Server{Log: brokerlog.Discard()}
|
||||
w := httptest.NewRecorder()
|
||||
s.Handler().ServeHTTP(w, httptest.NewRequest(http.MethodGet, "/healthz", nil))
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("status = %d, want 200 when store is unconfigured", w.Code)
|
||||
}
|
||||
if !strings.Contains(w.Body.String(), "not configured") {
|
||||
t.Errorf("expected 'not configured' marker: %s", w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandler_WrongMethodIsRejected(t *testing.T) {
|
||||
// Go 1.22+ mux: POST /healthz should not dispatch to the GET handler.
|
||||
s := &httpserver.Server{Log: brokerlog.Discard()}
|
||||
w := httptest.NewRecorder()
|
||||
s.Handler().ServeHTTP(w, httptest.NewRequest(http.MethodPost, "/healthz", nil))
|
||||
if w.Code != http.StatusMethodNotAllowed {
|
||||
t.Errorf("POST /healthz should return 405, got %d", w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandler_ExtraHandlerReceivesOtherPaths(t *testing.T) {
|
||||
extra := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
_, _ = io.WriteString(w, "extra:"+r.URL.Path)
|
||||
})
|
||||
s := &httpserver.Server{Log: brokerlog.Discard(), ExtraHandler: extra}
|
||||
w := httptest.NewRecorder()
|
||||
s.Handler().ServeHTTP(w, httptest.NewRequest(http.MethodGet, "/other", nil))
|
||||
if !strings.Contains(w.Body.String(), "extra:/other") {
|
||||
t.Errorf("extra handler not invoked, got %q", w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestRun_ShutdownOnContextCancel(t *testing.T) {
|
||||
addr := freeAddr(t)
|
||||
s := &httpserver.Server{Addr: addr, Log: brokerlog.Discard(), Store: &fakePinger{}}
|
||||
|
||||
ctx, cancel := context.WithCancel(t.Context())
|
||||
runErr := make(chan error, 1)
|
||||
go func() { runErr <- s.Run(ctx) }()
|
||||
|
||||
// Wait for the listener to be ready.
|
||||
waitReady(t, addr, 2*time.Second)
|
||||
|
||||
// Sanity: /healthz works while running.
|
||||
resp, err := http.Get("http://" + addr + "/healthz")
|
||||
if err != nil {
|
||||
t.Fatalf("pre-shutdown GET: %v", err)
|
||||
}
|
||||
_ = resp.Body.Close()
|
||||
|
||||
// Cancel the context to simulate SIGTERM.
|
||||
start := time.Now()
|
||||
cancel()
|
||||
|
||||
select {
|
||||
case err := <-runErr:
|
||||
if err != nil {
|
||||
t.Errorf("Run returned error: %v", err)
|
||||
}
|
||||
if elapsed := time.Since(start); elapsed > 2*time.Second {
|
||||
t.Errorf("Run took %s to shut down, want < 2s", elapsed)
|
||||
}
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("Run did not return within 3s of cancel")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRun_ShutdownTimeout_ForciblyClosesSlowRequests(t *testing.T) {
|
||||
addr := freeAddr(t)
|
||||
|
||||
// An extra handler that blocks until the test releases it — simulates
|
||||
// a slow in-flight request that outlives the shutdown deadline.
|
||||
release := make(chan struct{})
|
||||
startedServing := make(chan struct{})
|
||||
var once sync.Once
|
||||
extra := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
once.Do(func() { close(startedServing) })
|
||||
select {
|
||||
case <-release:
|
||||
case <-r.Context().Done():
|
||||
}
|
||||
})
|
||||
|
||||
s := &httpserver.Server{
|
||||
Addr: addr,
|
||||
Log: brokerlog.Discard(),
|
||||
ExtraHandler: extra,
|
||||
ShutdownTimeout: 200 * time.Millisecond,
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(t.Context())
|
||||
runErr := make(chan error, 1)
|
||||
go func() { runErr <- s.Run(ctx) }()
|
||||
waitReady(t, addr, 2*time.Second)
|
||||
|
||||
// Fire a request that will hang in the handler.
|
||||
reqDone := make(chan error, 1)
|
||||
go func() {
|
||||
resp, err := http.Get("http://" + addr + "/slow")
|
||||
if resp != nil {
|
||||
_ = resp.Body.Close()
|
||||
}
|
||||
reqDone <- err
|
||||
}()
|
||||
|
||||
<-startedServing // handler is blocking
|
||||
cancel() // shutdown fires; handler won't return voluntarily
|
||||
|
||||
select {
|
||||
case err := <-runErr:
|
||||
// Shutdown should complete (with an error reporting deadline breach)
|
||||
// within about the timeout + small wiggle.
|
||||
if err == nil {
|
||||
t.Logf("Run returned nil — http.Server closed the conn via ctx.Done() cascade; acceptable")
|
||||
} else if !strings.Contains(err.Error(), "shutdown") {
|
||||
t.Errorf("unexpected Run error: %v", err)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Run did not return within 2s of cancel")
|
||||
}
|
||||
|
||||
// The hanging request's conn should have been forcibly closed.
|
||||
select {
|
||||
case err := <-reqDone:
|
||||
if err == nil {
|
||||
t.Error("slow request should have been terminated by shutdown")
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("slow request did not terminate after shutdown")
|
||||
}
|
||||
|
||||
// Drain the handler so no goroutine leaks past the test.
|
||||
close(release)
|
||||
}
|
||||
|
||||
func TestRun_MissingFieldsErr(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
server *httpserver.Server
|
||||
want string
|
||||
}{
|
||||
{"no_log", &httpserver.Server{Addr: ":0"}, "Log"},
|
||||
{"no_addr", &httpserver.Server{Log: brokerlog.Discard()}, "Addr"},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
err := tc.server.Run(t.Context())
|
||||
if err == nil || !strings.Contains(err.Error(), tc.want) {
|
||||
t.Errorf("want error containing %q, got %v", tc.want, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// freeAddr returns a loopback "host:port" with a port chosen by the kernel.
|
||||
// The listener is closed immediately, so there is a tiny race window before
|
||||
// the caller rebinds — acceptable for loopback test use.
|
||||
func freeAddr(t *testing.T) string {
|
||||
t.Helper()
|
||||
l, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatalf("listen: %v", err)
|
||||
}
|
||||
addr := l.Addr().String()
|
||||
_ = l.Close()
|
||||
return addr
|
||||
}
|
||||
|
||||
// waitReady polls the target address until a TCP dial succeeds or the
|
||||
// deadline expires.
|
||||
func waitReady(t *testing.T, addr string, within time.Duration) {
|
||||
t.Helper()
|
||||
deadline := time.Now().Add(within)
|
||||
for time.Now().Before(deadline) {
|
||||
c, err := net.DialTimeout("tcp", addr, 50*time.Millisecond)
|
||||
if err == nil {
|
||||
_ = c.Close()
|
||||
return
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
t.Fatalf("server not reachable at %s within %s", addr, within)
|
||||
}
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue