diff --git a/cmd/arduino-app-cli/daemon/daemon.go b/cmd/arduino-app-cli/daemon/daemon.go index eeac105c..e91d5cee 100644 --- a/cmd/arduino-app-cli/daemon/daemon.go +++ b/cmd/arduino-app-cli/daemon/daemon.go @@ -24,6 +24,8 @@ import ( "github.com/jub0bs/cors" "github.com/spf13/cobra" + "golang.org/x/net/http2" + "golang.org/x/net/http2/h2c" "github.com/arduino/arduino-app-cli/cmd/arduino-app-cli/internal/servicelocator" "github.com/arduino/arduino-app-cli/internal/api" @@ -120,12 +122,15 @@ func httpHandler(ctx context.Context, cfg config.Configuration, daemonPort, vers panic(err) } apiSrv = corsMiddlware.Wrap(apiSrv) + apiSrv = httprecover.RecoverPanic(apiSrv) // Start the HTTP server address := "127.0.0.1:" + daemonPort httpSrv := http.Server{ - Addr: address, - Handler: httprecover.RecoverPanic(apiSrv), + Addr: address, + Handler: h2c.NewHandler(apiSrv, &http2.Server{ + MaxConcurrentStreams: 250, + }), ReadHeaderTimeout: 60 * time.Second, } go func() { diff --git a/debian/arduino-app-cli/etc/systemd/system/arduino-app-cli.service b/debian/arduino-app-cli/etc/systemd/system/arduino-app-cli.service index 93ed0367..fcb02007 100644 --- a/debian/arduino-app-cli/etc/systemd/system/arduino-app-cli.service +++ b/debian/arduino-app-cli/etc/systemd/system/arduino-app-cli.service @@ -4,7 +4,7 @@ After=network-online.target docker.service arduino-router.service Wants=network-online.target docker.service arduino-router.service [Service] -ExecStart=/usr/bin/arduino-app-cli daemon --port 8800 --log-level error +ExecStart=/usr/bin/arduino-app-cli daemon --port 8800 --log-level debug User=arduino Group=arduino Environment="ARDUINO_ROUTER_SOCKET=/var/run/arduino-router.sock" diff --git a/internal/api/handlers/update.go b/internal/api/handlers/update.go index 839fdcf5..036ef3a5 100644 --- a/internal/api/handlers/update.go +++ b/internal/api/handlers/update.go @@ -16,6 +16,7 @@ package handlers import ( + "context" "net/http" "strings" @@ -127,13 +128,26 @@ func HandleUpdateApply(updater *update.Manager) http.HandlerFunc { func HandleUpdateEvents(updater *update.Manager) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - sseStream, err := render.NewSSEStream(r.Context(), w) + // Handle HEAD requests with erly return + if r.Method == http.MethodHead { + render.EncodeResponse(w, http.StatusOK, nil) + return + } + + slog.Info("Client connected to SSE stream", slog.String("client", r.RemoteAddr), slog.String("path", r.URL.Path), slog.String("method", r.Method)) + sseStream, err := render.NewSSEStream(context.WithValue(r.Context(), "remote_addr", r.RemoteAddr), w) if err != nil { slog.Error("Unable to create SSE stream", slog.String("error", err.Error())) render.EncodeResponse(w, http.StatusInternalServerError, models.ErrorResponse{Details: "unable to create SSE stream"}) return } - defer sseStream.Close() + defer func() { + sseStream.Close() + slog.Info("SSE stream closed", slog.String("client", r.RemoteAddr)) + }() + + // Send an initial heartbeat to return a feedback to the client + sseStream.Send(render.SSEEvent{Type: update.UpgradeLineEvent.String(), Data: "connected to update events stream"}) ch := updater.Subscribe() defer updater.Unsubscribe(ch) @@ -142,9 +156,10 @@ func HandleUpdateEvents(updater *update.Manager) http.HandlerFunc { select { case event, ok := <-ch: if !ok { - slog.Info("APT event channel closed, stopping SSE stream") + slog.Info("event channel closed, stopping SSE stream") return } + slog.Debug("Sending update event to SSE stream", slog.String("type", event.Type.String()), slog.String("client", r.RemoteAddr)) if event.Type == update.ErrorEvent { err := event.GetError() code := render.InternalServiceErr @@ -163,6 +178,7 @@ func HandleUpdateEvents(updater *update.Manager) http.HandlerFunc { } case <-r.Context().Done(): + slog.Info("context killed SSE stream", slog.String("client", r.RemoteAddr)) return } } diff --git a/internal/render/sse.go b/internal/render/sse.go index 3f269f9a..4a467ebd 100644 --- a/internal/render/sse.go +++ b/internal/render/sse.go @@ -39,7 +39,7 @@ type SSEErrorData struct { type SSEEvent struct { Type string `json:"type"` - Data any `json:"data"` + Data any `json:"data,omitempty"` } func NewErrorEvent(data any) SSEEvent { @@ -114,11 +114,11 @@ func (s *SSEStream) loop() { for { select { case <-s.ctx.Done(): - slog.Debug("stream SSE request context done") + slog.Debug("stream SSE request context done", slog.Any("remote_addr", s.ctx.Value("remote_addr"))) return case <-time.After(s.heartbeatInterval): if err := s.heartbeat(); err != nil { - slog.Error("failed to send ping", slog.Any("error", err)) + slog.Error("failed to send ping", slog.Any("error", err), slog.Any("remote_addr", s.ctx.Value("remote_addr"))) return } case event, canProduce := <-s.messageCh: @@ -127,7 +127,7 @@ func (s *SSEStream) loop() { return } if err := s.send(event); err != nil { - slog.Debug("failed to send SSE event", slog.String("event", event.Type), slog.Any("error", err)) + slog.Debug("failed to send SSE event", slog.String("event", event.Type), slog.Any("error", err), slog.Any("remote_addr", s.ctx.Value("remote_addr"))) return } }