Skip to main content

Real-world example

This guide walks through how two production applications — flink-admin and lakehouse-admin — use the httpserver package. These patterns demonstrate best practices for structuring a real application.

Application structure

Both applications follow the same layout:

backend/
├── main.go
├── config.dist.yml
├── internal/
│ ├── handler_checkpoints.go
│ ├── handler_deployments.go
│ ├── handler_events.go
│ └── types.go
└── public/ # Embedded React frontend
└── index.html

Module registration

The server is registered as a kernel module factory using application.WithModuleFactory:

application.WithModuleFactory("http", httpserver.NewServer("default", func(ctx context.Context, config cfg.Config, logger log.Logger, router *httpserver.Router) error {
// ... route registration
return nil
}))

This gives you control over the module name ("http") and lets you combine the HTTP server with other modules (consumers, background workers, etc.) in the same application.

Middleware setup

Global middleware is registered before routes. The typical setup includes CORS and embedded static file serving:

router.Use(cors.Default())
router.UseFactory(httpserver.CreateEmbeddedStaticServe(publicFs, "public", "/api"))

The "/api" exclusion ensures API routes are handled by handlers, not the static file middleware.

Route organization with groups

Routes are organized into domain-specific groups, each with its own handler:

router.Group("/api/deployments").HandleWith(httpserver.With(internal.NewHandlerDeployments, func(r *httpserver.Router, h *internal.HandlerDeployments) {
r.GET("/watch", httpserver.BindSseN(h.WatchDeployments))
}))

deploymentGroup := router.Group("/api/deployments/:namespace/:name")
deploymentGroup.HandleWith(httpserver.With(internal.NewHandlerCheckpoints, func(r *httpserver.Router, h *internal.HandlerCheckpoints) {
r.GET("/checkpoints", httpserver.Bind(h.GetCheckpoints))
}))
deploymentGroup.HandleWith(httpserver.With(internal.NewHandlerEvents, func(r *httpserver.Router, h *internal.HandlerEvents) {
r.GET("/events", httpserver.Bind(h.GetEvents))
}))

Key patterns here:

  • A shared route group (/api/deployments/:namespace/:name) is reused across multiple handler groups
  • Each handler group has its own constructor, so each handler only loads the dependencies it needs
  • URI parameters (:namespace, :name) are captured in a shared input type

Shared input types

Common request types are defined in a shared file:

// internal/types.go
type DeploymentSelectorInput struct {
Namespace string `uri:"namespace"`
Name string `uri:"name"`
}

Multiple handlers reference this same type:

func (h *HandlerCheckpoints) GetCheckpoints(ctx context.Context, request *DeploymentSelectorInput) (httpserver.Response, error) {
// request.Namespace and request.Name are populated from the URI
}

Mixed binding sources

Handlers often combine URI parameters with query or body parameters:

type ListTasksInput struct {
Database string `uri:"database"`
Table string `form:"table"`
Kind []string `form:"kind"`
Status []string `form:"status"`
Limit int `form:"limit"`
Offset int `form:"offset"`
}

type BatchExpireSnapshotsInput struct {
Database string `uri:"database"`
Tables []string `json:"tables"`
RetentionDays int `json:"retention_days"`
}

SSE for real-time updates

The flink-admin uses SSE to stream deployment status changes:

func (h *HandlerDeployments) WatchDeployments(ctx context.Context, writer *httpserver.SseWriter) error {
deployments, updates, stop := h.watcher.Watch(ctx)
defer close(stop)

for deployment := range updates {
data, _ := json.Marshal(deployment)
err := writer.SendEvent(httpserver.SseEvent{
Data: string(data),
Id: makeDeploymentEventID(deployment),
Retry: 5000,
})
if err != nil {
return err
}
}
return nil
}

The SSE endpoint is excluded from compression in the config:

httpserver:
default:
port: 8082
compression:
exclude:
path:
- /api/deployments/watch

Error handling patterns

Handlers distinguish between client errors (returned as structured responses) and internal errors (returned as Go errors). By default, internal errors return a sanitized {"err":"internal server error"} response while the server logs the actual error:

func (h *HandlerBrowse) ListFiles(ctx context.Context, input *ListFilesInput) (httpserver.Response, error) {
items, err := h.files.ListFiles(ctx, input.Database, input.Table, input.Partitions)
if err != nil {
if isBadRequest(err) {
return httpserver.GetErrorHandler()(http.StatusBadRequest, err), nil
}
return nil, fmt.Errorf("could not list files: %w", err)
}
return httpserver.NewJsonResponse(ListFilesResponse{Files: items}), nil
}

Configuration

Production applications use extended timeouts for long-running operations:

httpserver:
default:
port: 8081
mode: release
timeout:
read: 10m
write: 10m
max_body_bytes: 10485760

Raise max_body_bytes for APIs that intentionally accept larger uploads, or set it to 0 to disable the request body limit.

Summary of patterns

PatternWhen to use
router.Group().HandleWith(With(...))Organize routes by domain
Shared route groupsMultiple handlers on the same URL prefix
Shared input typesMultiple handlers accept the same URI params
Mixed binding (uri + json + form)Endpoints that accept path params + query/body
BindSseN for SSEReal-time streaming endpoints
GetErrorHandler()(status, err)Client errors with specific status codes
nil, errorUnexpected/internal errors (500)
Compression exclusion for SSEAlways exclude SSE paths from gzip