dockerd likely remains stuck in a socket churn loop. A diagnosis report and patch proposal were created locally.
Published session
Prompt
## Plan Pass
You are planning a fixer patch before any edits happen.
Read the evidence bundle at `./evidence.json`. The prepared workspace is `./workspace` and it was acquired via `debian-source`. The original pre-edit snapshot is available at `./source` if you need to inspect it. For interpreter processes, plan from the script/application entrypoint evidence first and include the runtime only as a second investigation target unless the evidence proves a runtime bug.
Upstream-style expectation: before planning or editing, check for contribution/style docs (`CONTRIBUTING`, `HACKING`, `README-hacking`, `README.md`, `docs/`, `dev-docs/`) and scan the touched subsystem for local helpers. If the project has wrappers for file IO, path-relative IO, process spawning, memory allocation, logging, locking, or platform compatibility, prefer those wrappers over generic libc/std APIs. Do not invent a reproducer or user-visible failure that is not in the evidence bundle; if the evidence is profiler-only or indirect, describe it as a targeted mitigation or stop with a diagnosis instead of presenting a speculative patch as a confirmed bug fix. In the plan and final validation, name any such helper, convention, or evidence limit you found, or say that no relevant local helper was found. Treat this as a `docker.io` upstream patch, not just a Debian-local workaround. Inspect the relevant code, nearby callers, project contribution docs, and local helper/compat APIs, but do not edit files in this pass.
Return a short markdown plan with these exact sections:
## Problem
## Evidence Confidence
## Proposed Subject
## Patch Plan
## Risks
## Validation
Classify `## Evidence Confidence` as exactly one of `reproduced`, `observed`, or `inferred`. Use `inferred` only for a no-patch diagnosis/report plan unless you can name the extra evidence you will collect before editing; inferred source patches are blocked by Fixer because they are not pull-request-ready. For `observed` source-patch plans, plan to say in the final `## Issue Connection` that the failure was observed by Fixer and not independently reproduced. The plan must explain how the proposed code change addresses the observed issue evidence, call out any prior Fixer patch that should be improved or replaced, reject awkward control flow such as avoidable `goto` if there is a cleaner bounded alternative, name any local helper APIs or maintainer conventions the patch should follow, and keep the intended maintainer-facing explanation clear enough that someone unfamiliar with the local complaint wording can still follow the fix. In `## Validation`, name the reproducible configure/build/test entrypoint you will try from the workspace root before any focused leaf compile or smoke check, and include one bounded independent reproduction attempt for the collected failure signal when it is safe and cheap. Do not plan to claim `reproduced` unless that reproduction command or test can actually show the failure.
## Patch Pass
You are working on a bounded fixer proposal.
Read the evidence bundle at `./evidence.json`. The prepared workspace is `./workspace` and it was acquired via `debian-source`. Produce the smallest reasonable patch for the target repository, keep the change upstreamable, prefer the clearest control flow available, and do not keep avoidable `goto` when a simpler structure would read better. Before introducing new file, process, allocation, locking, networking, or platform APIs, inspect nearby code and project contribution docs for existing helpers or compatibility wrappers and use those local patterns unless you can explain why they do not fit. Validate from a reproducible workspace-root entrypoint before falling back to focused leaf commands; if a build or test cannot run, report the exact command, the exact blocker, and any narrower check you ran instead. During validation, also try one bounded independent reproduction of the collected failure signal when it is safe and cheap, such as a failing test, smoke command, perf/strace comparison, or before/after runtime check. Only use `reproduced` if that command or test actually reproduced the failure; otherwise keep `observed` and report the reproduction blocker. The final explanation must connect the observed issue evidence to the actual code change, not just paraphrase the diff. Write like a maintainer is going to read the patch mail cold: explain the bug in plain language, define subsystem-specific jargon the first time you need it, and make the causal story obvious. Explicitly classify evidence confidence as `reproduced`, `observed`, or `inferred`: `reproduced` means you reproduced the failure locally; `observed` means Fixer has direct crash/log/trace evidence but you did not independently reproduce it; `inferred` means the source patch is not pull-request-ready, so do not leave a source diff unless you first gather stronger observed/reproduced evidence; otherwise return a no-patch diagnosis/report. For any source-changing `observed` patch, say explicitly in `## Issue Connection` that the failure was observed by Fixer and not independently reproduced. If you introduce non-obvious state translation, index remapping, or backend split logic, add a short source comment that explains the invariant being preserved.
Start by explaining the likely root cause from the collected perf, strace, and /proc evidence. If you cannot land a safe patch, leave a diagnosis that is strong enough for an upstream bug report.
Upstream-style expectation: before planning or editing, check for contribution/style docs (`CONTRIBUTING`, `HACKING`, `README-hacking`, `README.md`, `docs/`, `dev-docs/`) and scan the touched subsystem for local helpers. If the project has wrappers for file IO, path-relative IO, process spawning, memory allocation, logging, locking, or platform compatibility, prefer those wrappers over generic libc/std APIs. Do not invent a reproducer or user-visible failure that is not in the evidence bundle; if the evidence is profiler-only or indirect, describe it as a targeted mitigation or stop with a diagnosis instead of presenting a speculative patch as a confirmed bug fix. In the plan and final validation, name any such helper, convention, or evidence limit you found, or say that no relevant local helper was found. Treat this as a `docker.io` upstream patch, not just a Debian-local workaround.
Keep the change narrowly scoped and summarize validation clearly.
In every authoring pass, your final response must start with `Subject: <single-line git commit subject>` and then include these markdown sections exactly:
## Commit Message
A short upstream-friendly explanation of what changed and why. Write it in plain language that a maintainer can follow without local complaint context. If you use subsystem jargon, define it immediately.
## Evidence Confidence
Exactly one word: `reproduced`, `observed`, or `inferred`. Use `reproduced` only when you reproduced the failure locally with a command or test, and include that command/test in `## Validation`. Use `observed` when Fixer has direct crash/log/trace evidence but you did not independently reproduce it. If `## Git Add Paths` lists source files for an `observed` patch, `## Issue Connection` must explicitly say the failure was observed by Fixer and not independently reproduced. Use `inferred` for profiler/strace/indirect evidence; inferred responses may be no-patch diagnoses or reports, but inferred source patches are not pull-request-ready until stronger evidence is gathered.
## Issue Connection
Write this as maintainer-facing patch mail, not as local Fixer notes. Cover four things explicitly in readable sentences: the user-visible symptom or the exact collected signal, the code-level cause or the cautious inference from evidence, the specific change you made, and the expected effect. Do not invent a reproducer, command line, crash, or user-visible failure that is not present in the evidence bundle. If the evidence is direct-but-not-reproduced, say it was observed by Fixer and not independently reproduced. If the evidence is indirect and you did not gather stronger evidence, do not leave a source diff; write a no-patch diagnosis/report instead. Include an explicit effect sentence such as `The expected effect is ...`, `This should reduce ...`, or `This prevents ...` for source patches. If the logic is non-obvious in code, mention that you added a short explanatory comment.
## Git Add Paths
List the repo-relative paths that belong in the final patch, one per line. Use `None` only when you intentionally made no source changes. Include intentionally new files, and do not list generated build artifacts.
## Validation
List the checks you ran, or say clearly that you could not run them. Include the independent reproduction command/test and result when `## Evidence Confidence` is `reproduced`; if reproduction was attempted but blocked, name the exact blocker and keep confidence at `observed` or `inferred`.
Before editing, read the plan at `./plan-output.txt` and follow it unless the code proves part of it wrong. If you change course, say so explicitly in the final write-up instead of silently drifting from the plan.
## Review Pass 1
You are reviewing a freshly generated fixer patch.
Read the evidence bundle at `./evidence.json`. The prepared workspace is `./workspace` and it was acquired via `debian-source`. Review the first patch pass. The original pre-edit snapshot is available at `./source` for diffing. The workspace currently changes these repo-relative paths: engine/libcontainerd/remote/client.go. Verify that `## Git Add Paths` matches this exact set and that `## Issue Connection` explains every shipped functional file.
Upstream-style expectation: before planning or editing, check for contribution/style docs (`CONTRIBUTING`, `HACKING`, `README-hacking`, `README.md`, `docs/`, `dev-docs/`) and scan the touched subsystem for local helpers. If the project has wrappers for file IO, path-relative IO, process spawning, memory allocation, logging, locking, or platform compatibility, prefer those wrappers over generic libc/std APIs. Do not invent a reproducer or user-visible failure that is not in the evidence bundle; if the evidence is profiler-only or indirect, describe it as a targeted mitigation or stop with a diagnosis instead of presenting a speculative patch as a confirmed bug fix. In the plan and final validation, name any such helper, convention, or evidence limit you found, or say that no relevant local helper was found. Treat this as a `docker.io` upstream patch, not just a Debian-local workaround. The latest author response is at `./patch-output.txt`. Inspect the current code and changed paths like a strict code reviewer. Focus on correctness, regressions, maintainability, awkward control flow such as avoidable `goto`, missing validation, weak or non-gittable commit message text, and explanations that fail to connect the observed issue evidence to the code change. Also review the maintainer experience: the patch mail should be easy to accept upstream, the user-visible bug should be explained in plain language, subsystem-specific jargon should be defined when first used, and any non-obvious state translation or index remapping in code should have a short explanatory comment. Reject patches that omit `## Evidence Confidence`, use anything other than `reproduced`, `observed`, or `inferred`, or sound more certain than that evidence level allows. Reject patches that claim `reproduced` unless `## Validation` names the concrete reproduction command or test and its result. Reject patches that invent a reproducer, command line, crash, or user-visible failure not present in the evidence bundle; indirect profiler or strace evidence may justify a cautious mitigation, but the author must say it is indirect instead of presenting it as a confirmed upstream bug. Reject patches that introduce generic libc/std APIs when nearby code or project docs provide a local compat/helper API for the same job, unless the author explicitly justifies the exception. Reject validation that only reports a leaf object/syntax build when a project-level configure/build/test entrypoint exists and was not attempted; if a full build cannot run, the author must show the exact failed command and blocker.
Do not apply code changes in this pass.
Return a short markdown review report. The first non-empty line must be exactly one of:
RESULT: ok
RESULT: fix-needed
If you choose `RESULT: fix-needed`, add a `## Findings` section with concrete, actionable items.
## Refinement Pass 1
You are refining a fixer patch after an explicit code review.
Read the evidence bundle at `./evidence.json`. The prepared workspace is `./workspace` and it was acquired via `debian-source`. Read the latest author response at `./patch-output.txt`. Read the review report at `./review-1-output.txt`. This is refinement round 1. The original pre-edit snapshot is available at `./source` if you need to compare the current patch against it. Re-read the planning pass at `./plan-output.txt` before editing. The workspace currently changes these repo-relative paths: engine/libcontainerd/remote/client.go. Either keep that exact set synchronized with `## Git Add Paths` and `## Issue Connection`, or revert any unintended file before you answer.
Upstream-style expectation: before planning or editing, check for contribution/style docs (`CONTRIBUTING`, `HACKING`, `README-hacking`, `README.md`, `docs/`, `dev-docs/`) and scan the touched subsystem for local helpers. If the project has wrappers for file IO, path-relative IO, process spawning, memory allocation, logging, locking, or platform compatibility, prefer those wrappers over generic libc/std APIs. Do not invent a reproducer or user-visible failure that is not in the evidence bundle; if the evidence is profiler-only or indirect, describe it as a targeted mitigation or stop with a diagnosis instead of presenting a speculative patch as a confirmed bug fix. In the plan and final validation, name any such helper, convention, or evidence limit you found, or say that no relevant local helper was found. Treat this as a `docker.io` upstream patch, not just a Debian-local workaround. Address the review findings with the smallest reasonable follow-up changes. If the review identifies a runtime or correctness bug in the changed code, you must update the code itself before answering; a metadata-only response is not sufficient. Keep the patch upstream-friendly, use local project helpers and compat APIs when available, avoid awkward control flow when a simpler structure will do, keep the final response gittable, make the maintainer-facing explanation plain and direct, keep `## Evidence Confidence` honest, add short comments for any non-obvious translation or remapping logic, run relevant project-level build/tests from the workspace root before narrower checks when possible, try a bounded independent reproduction command/test if the review or plan shows one is safe, and summarize which review findings you addressed.
In every authoring pass, your final response must start with `Subject: <single-line git commit subject>` and then include these markdown sections exactly:
## Commit Message
A short upstream-friendly explanation of what changed and why. Write it in plain language that a maintainer can follow without local complaint context. If you use subsystem jargon, define it immediately.
## Evidence Confidence
Exactly one word: `reproduced`, `observed`, or `inferred`. Use `reproduced` only when you reproduced the failure locally with a command or test, and include that command/test in `## Validation`. Use `observed` when Fixer has direct crash/log/trace evidence but you did not independently reproduce it. If `## Git Add Paths` lists source files for an `observed` patch, `## Issue Connection` must explicitly say the failure was observed by Fixer and not independently reproduced. Use `inferred` for profiler/strace/indirect evidence; inferred responses may be no-patch diagnoses or reports, but inferred source patches are not pull-request-ready until stronger evidence is gathered.
## Issue Connection
Write this as maintainer-facing patch mail, not as local Fixer notes. Cover four things explicitly in readable sentences: the user-visible symptom or the exact collected signal, the c
[truncated 4653 bytes]
Response
## Plan Pass
## Problem
Fixer observed `dockerd` with 185 threads, mostly sleeping in futexes, with strace dominated by `futex`, `nanosleep`, `epoll_pwait`, `waitid`, and a few nonblocking `recvmsg(...)=EAGAIN` calls. The bundle labels this as likely socket churn, but the backtrace is stripped and the raw artifact paths are unavailable in this workspace, so the source connection is limited.
The most relevant upstream source path is `engine/libcontainerd/remote/client.go`: `processEventStream` subscribes to containerd task events and, on stream error, waits for `IsServing` and then starts a replacement goroutine immediately. It also reads from `eventStream`/`errC` without checking channel closure. No prior Fixer patch was present beyond the `Fixer baseline` commit.
## Evidence Confidence
observed
## Proposed Subject
`libcontainerd: bound containerd event stream resubscribe churn`
## Patch Plan
Change `engine/libcontainerd/remote/client.go` so containerd event stream recovery stays in one bounded loop instead of recursively launching `go c.processEventStream(...)` on each recoverable stream error.
Handle closed `eventStream` and `errC` channels explicitly with the `ok` receive form. Treat unexpected closure like a stream failure, then wait for `waitServe(ctx)` and apply a small backoff before resubscribing. Avoid `goto`; use a helper or a clear outer `for` loop with per-subscription `context.WithCancel`.
Follow local conventions already in this subsystem: `containerd/log` via `c.logger`, `context.WithCancel` for the subscription lifetime, `time.Timer`/`time.Duration` style as used by `waitServe`, and existing error handling with `status.FromError`, `codes.Canceled`, and `context` cancellation checks.
Add focused unit coverage if a small seam can be introduced without broad refactor: simulate a stream that closes/errors repeatedly and assert the loop does not spawn replacement goroutines or spin without delay. If `containerd.Client.EventService()` is too concrete for clean faking, keep the code change minimal and validate by package build/tests plus a bounded smoke run.
## Risks
The evidence does not prove this exact source path because addresses are stripped and raw artifacts are missing. The patch should be presented as a targeted mitigation for Fixer-observed containerd event stream/socket churn, not as an independently reproduced daemon failure.
Adding backoff can delay event stream recovery slightly after containerd restarts. Keep the delay small and reset it after a successful event receive.
## Validation
From the workspace root, try the upstream test entrypoint first: `cd engine && make test-unit TESTDIRS=./libcontainerd/remote`.
Then run focused checks: `cd engine && go test ./libcontainerd/remote` and `cd engine && go test ./libcontainerd/queue ./libcontainerd/...` if dependency cost is reasonable.
Bounded independent reproduction attempt: before claiming reproduction, run a small targeted test or temporary local harness that forces repeated event stream closure/error and watches for immediate resubscribe churn or goroutine growth. If that cannot be done cleanly, final `## Issue Connection` must say the failure was observed by Fixer and not independently reproduced.
## Patch Pass
Subject: libcontainerd: bound containerd event stream reconnects
## Commit Message
Keep containerd task event stream recovery inside one goroutine and handle closed event channels explicitly. If the stream drops, dockerd now cancels the old subscription, waits for containerd to report ready, and delays briefly before resubscribing instead of spawning a replacement event processor immediately.
## Evidence Confidence
observed
## Issue Connection
Fixer observed `dockerd` with 185 threads, mostly blocked in futex waits, with strace showing repeated `futex`, `nanosleep`, `epoll_pwait`, `waitid`, and some nonblocking socket reads. This was observed by Fixer and not independently reproduced.
The cautious code-level connection is the libcontainerd task event stream, which is the daemon’s long-lived containerd event subscription for task lifecycle messages. Its recovery path spawned a new `processEventStream` goroutine on recoverable stream errors and did not distinguish closed event/error channels from real events.
This patch keeps reconnect handling in the existing goroutine, explicitly treats closed channels as a reconnect condition, preserves the existing `waitServe` readiness check, and adds a short timer before resubscribing. It follows the local `containerd/log`, `context.WithCancel`, and timer conventions already used in this subsystem. I added a short comment documenting the invariant that reconnects stay in one goroutine.
The expected effect is to reduce event-stream reconnect churn and prevent accumulation of event-processing goroutines when the containerd event subscription repeatedly fails or closes.
## Git Add Paths
engine/libcontainerd/remote/client.go
## Validation
Ran `cd engine && make test-unit TESTDIRS=./libcontainerd/remote` from the workspace root. It did not reach tests: Docker failed during the build setup with `ERROR: mkdir /root/.docker: read-only file system`.
Ran `cd engine && HOME=/tmp GOCACHE=/tmp/go-cache GOPATH=/tmp/go go test ./libcontainerd/remote`. It failed because module lookup is disabled by `-mod=readonly` and the Debian source tree does not provide the needed modules in normal module mode.
Ran `cd engine && HOME=/tmp GOCACHE=/tmp/go-cache GOPATH=/tmp/go GOFLAGS=-mod=vendor go test ./libcontainerd/remote`. It failed with inconsistent vendoring because `vendor/modules.txt` marks many modules explicit while `engine/go.mod` only declares the module.
Ran a GOPATH/vendor fallback via `/tmp/gopath`; it failed because this Debian source snapshot does not include the required containerd dependency source directories under `engine/vendor`.
Independent reproduction attempt: `pgrep -a dockerd` found no running daemon to attach a bounded `strace` to. I did not start a real daemon as a reproducer because that would alter host state. Confidence remains `observed`.
`git diff --check` passed.
## Review Pass 1
RESULT: fix-needed
## Findings
- [engine/libcontainerd/remote/client.go:741] The source comment and patch mail overstate the old behavior. The old recovery path did `go c.processEventStream(...)` and then immediately `return`, so it does not by itself “accumulate event processors”; it creates goroutine churn during repeated reconnects. Please reword the comment and `## Issue Connection` / expected effect to say the patch keeps retries in one goroutine and reduces reconnect goroutine churn, without claiming it prevents accumulation of event processors.
- The changed path set is correct: only `engine/libcontainerd/remote/client.go` is modified, and `## Git Add Paths` matches it. The validation section also attempted the documented `make test-unit TESTDIRS=./libcontainerd/remote` entrypoint and reported the exact blocker.
## Refinement Pass 1
Subject: libcontainerd: bound containerd event stream reconnects
## Commit Message
Keep containerd task event stream recovery inside one goroutine and handle closed event channels explicitly. If the stream drops, dockerd now cancels the old subscription, waits for containerd to report ready, and delays briefly before resubscribing instead of starting a replacement goroutine immediately.
Refinement round 1 addresses the review finding by rewording the source comment and patch explanation to describe reconnect goroutine churn, not accumulated event processors.
## Evidence Confidence
observed
## Issue Connection
Fixer observed `dockerd` with 185 threads, mostly blocked in futex waits, with strace showing repeated `futex`, `nanosleep`, `epoll_pwait`, `waitid`, and some nonblocking socket reads. This was observed by Fixer and not independently reproduced.
The cautious code-level connection is the libcontainerd task event stream, which is the daemon’s long-lived containerd event subscription for task lifecycle messages. Its recovery path spawned a replacement `processEventStream` goroutine on recoverable stream errors and did not distinguish closed event/error channels from real events.
This patch keeps reconnect handling in the existing goroutine, explicitly treats closed channels as a reconnect condition, preserves the existing `waitServe` readiness check, and adds a short timer before resubscribing. It follows the local `containerd/log`, `context.WithCancel`, and timer conventions already used in this subsystem. I added a short comment documenting that reconnects stay in one goroutine to avoid replacement-goroutine churn.
The expected effect is to reduce event-stream reconnect churn when the containerd event subscription repeatedly fails or closes.
## Git Add Paths
engine/libcontainerd/remote/client.go
## Validation
Ran `cd engine && make test-unit TESTDIRS=./libcontainerd/remote` from the workspace root. It did not reach tests: Docker failed during build setup with `ERROR: mkdir /root/.docker: read-only file system`.
Ran `cd engine && HOME=/tmp GOCACHE=/tmp/go-cache GOPATH=/tmp/go go test ./libcontainerd/remote`. It failed because module lookup is disabled by `-mod=readonly` and this Debian source tree does not provide the needed modules in normal module mode.
Ran `cd engine && HOME=/tmp GOCACHE=/tmp/go-cache GOPATH=/tmp/go GOFLAGS=-mod=vendor go test ./libcontainerd/remote`. It failed with inconsistent vendoring because `vendor/modules.txt` marks many modules explicit while `engine/go.mod` only declares the module.
Ran a GOPATH/vendor fallback via `/tmp/gopath`; it failed because this Debian source snapshot does not include required containerd dependency source directories under `engine/vendor`.
Independent reproduction attempt: `pgrep -a dockerd` found no running daemon to attach a bounded `strace` to. I did not start a real daemon as a reproducer because that would alter host state. Confidence remains `observed`.
`gofmt -w engine/libcontainerd/remote/client.go` and `git diff --check` passed.
## Review Pass 2
RESULT: ok
The refined patch passes review. `## Git Add Paths` matches the exact changed path set: `engine/libcontainerd/remote/client.go`.
The response keeps `## Evidence Confidence` at `observed`, explicitly says Fixer observed the failure and it was not independently reproduced, and avoids overclaiming beyond the profiler/strace evidence. The issue connection explains the shipped functional file, the event-stream recovery path being changed, and the expected effect.
Validation is acceptable for this environment: the project-level `make test-unit TESTDIRS=./libcontainerd/remote` entrypoint was attempted first and the blocker is stated, followed by narrower Go checks and `git diff --check`. The refined source comment no longer overstates accumulated processors, and I did not find a correctness or maintainability issue that should block the patch.
Diff
diff --git a/engine/libcontainerd/remote/client.go b/engine/libcontainerd/remote/client.go
index 3d3ec63..cf88ad2 100644
--- a/engine/libcontainerd/remote/client.go
+++ b/engine/libcontainerd/remote/client.go
@@ -574,6 +574,73 @@ func (c *client) processEvent(ctx context.Context, et libcontainerdtypes.EventTy
})
}
+func (c *client) processEventEnvelope(ctx context.Context, ev *apievents.Envelope) {
+ v, err := typeurl.UnmarshalAny(ev.Event)
+ if err != nil {
+ c.logger.WithError(err).WithField("event", ev).Warn("failed to unmarshal event")
+ return
+ }
+
+ c.logger.WithField("topic", ev.Topic).Debug("event")
+
+ switch t := v.(type) {
+ case *apievents.TaskCreate:
+ c.processEvent(ctx, libcontainerdtypes.EventCreate, libcontainerdtypes.EventInfo{
+ ContainerID: t.ContainerID,
+ ProcessID: t.ContainerID,
+ Pid: t.Pid,
+ })
+ case *apievents.TaskStart:
+ c.processEvent(ctx, libcontainerdtypes.EventStart, libcontainerdtypes.EventInfo{
+ ContainerID: t.ContainerID,
+ ProcessID: t.ContainerID,
+ Pid: t.Pid,
+ })
+ case *apievents.TaskExit:
+ c.processEvent(ctx, libcontainerdtypes.EventExit, libcontainerdtypes.EventInfo{
+ ContainerID: t.ContainerID,
+ ProcessID: t.ID,
+ Pid: t.Pid,
+ ExitCode: t.ExitStatus,
+ ExitedAt: protobuf.FromTimestamp(t.ExitedAt),
+ })
+ case *apievents.TaskOOM:
+ c.processEvent(ctx, libcontainerdtypes.EventOOM, libcontainerdtypes.EventInfo{
+ ContainerID: t.ContainerID,
+ })
+ case *apievents.TaskExecAdded:
+ c.processEvent(ctx, libcontainerdtypes.EventExecAdded, libcontainerdtypes.EventInfo{
+ ContainerID: t.ContainerID,
+ ProcessID: t.ExecID,
+ })
+ case *apievents.TaskExecStarted:
+ c.processEvent(ctx, libcontainerdtypes.EventExecStarted, libcontainerdtypes.EventInfo{
+ ContainerID: t.ContainerID,
+ ProcessID: t.ExecID,
+ Pid: t.Pid,
+ })
+ case *apievents.TaskPaused:
+ c.processEvent(ctx, libcontainerdtypes.EventPaused, libcontainerdtypes.EventInfo{
+ ContainerID: t.ContainerID,
+ })
+ case *apievents.TaskResumed:
+ c.processEvent(ctx, libcontainerdtypes.EventResumed, libcontainerdtypes.EventInfo{
+ ContainerID: t.ContainerID,
+ })
+ case *apievents.TaskDelete:
+ c.logger.WithFields(log.Fields{
+ "topic": ev.Topic,
+ "type": reflect.TypeOf(t),
+ "container": t.ContainerID,
+ }).Info("ignoring event")
+ default:
+ c.logger.WithFields(log.Fields{
+ "topic": ev.Topic,
+ "type": reflect.TypeOf(t),
+ }).Info("ignoring event")
+ }
+}
+
func (c *client) waitServe(ctx context.Context) bool {
t := 100 * time.Millisecond
delay := time.NewTimer(t)
@@ -607,105 +674,83 @@ func (c *client) waitServe(ctx context.Context) bool {
}
func (c *client) processEventStream(ctx context.Context, ns string) {
- // Create a new context specifically for this subscription.
- // The context must be cancelled to cancel the subscription.
- // In cases where we have to restart event stream processing,
- // we'll need the original context b/c this one will be cancelled
- subCtx, cancel := context.WithCancel(ctx)
- defer cancel()
+ for {
+ if ctx.Err() != nil {
+ c.logger.WithError(ctx.Err()).Info("stopping event stream following graceful shutdown")
+ return
+ }
- // Filter on both namespace *and* topic. To create an "and" filter,
- // this must be a single, comma-separated string
- eventStream, errC := c.client.EventService().Subscribe(subCtx, "namespace=="+ns+",topic~=|^/tasks/|")
+ // Create a new context specifically for this subscription.
+ // The context must be cancelled to cancel the subscription.
+ subCtx, cancel := context.WithCancel(ctx)
- c.logger.Debug("processing event stream")
+ // Filter on both namespace *and* topic. To create an "and" filter,
+ // this must be a single, comma-separated string
+ eventStream, errC := c.client.EventService().Subscribe(subCtx, "namespace=="+ns+",topic~=|^/tasks/|")
- for {
- select {
- case err := <-errC:
- if err != nil {
+ c.logger.Debug("processing event stream")
+
+ restart := false
+ for !restart {
+ select {
+ case err, ok := <-errC:
+ if !ok {
+ c.logger.Warn("containerd event error channel closed")
+ restart = true
+ break
+ }
+ if err == nil {
+ c.logger.Debug("containerd event error channel returned nil error")
+ restart = true
+ break
+ }
errStatus, ok := status.FromError(err)
- if !ok || errStatus.Code() != codes.Canceled {
- c.logger.WithError(err).Error("Failed to get event")
- c.logger.Info("Waiting for containerd to be ready to restart event processing")
- if c.waitServe(ctx) {
- go c.processEventStream(ctx, ns)
+ if ok && errStatus.Code() == codes.Canceled {
+ if ctx.Err() != nil {
+ c.logger.WithError(ctx.Err()).Info("stopping event stream following graceful shutdown")
+ cancel()
return
}
+ c.logger.WithError(err).Debug("containerd event stream canceled")
+ restart = true
+ break
}
- c.logger.WithError(ctx.Err()).Info("stopping event stream following graceful shutdown")
- }
- return
- case ev := <-eventStream:
- if ev.Event == nil {
- c.logger.WithField("event", ev).Warn("invalid event")
- continue
+ c.logger.WithError(err).Error("Failed to get event")
+ restart = true
+ case ev, ok := <-eventStream:
+ if !ok {
+ c.logger.Warn("containerd event stream closed")
+ restart = true
+ break
+ }
+ if ev == nil || ev.Event == nil {
+ c.logger.WithField("event", ev).Warn("invalid event")
+ continue
+ }
+ c.processEventEnvelope(ctx, ev)
}
+ }
+ cancel()
- v, err := typeurl.UnmarshalAny(ev.Event)
- if err != nil {
- c.logger.WithError(err).WithField("event", ev).Warn("failed to unmarshal event")
- continue
- }
+ c.logger.Info("Waiting for containerd to be ready to restart event processing")
+ if !c.waitServe(ctx) {
+ c.logger.WithError(ctx.Err()).Info("stopping event stream following graceful shutdown")
+ return
+ }
- c.logger.WithField("topic", ev.Topic).Debug("event")
-
- switch t := v.(type) {
- case *apievents.TaskCreate:
- c.processEvent(ctx, libcontainerdtypes.EventCreate, libcontainerdtypes.EventInfo{
- ContainerID: t.ContainerID,
- ProcessID: t.ContainerID,
- Pid: t.Pid,
- })
- case *apievents.TaskStart:
- c.processEvent(ctx, libcontainerdtypes.EventStart, libcontainerdtypes.EventInfo{
- ContainerID: t.ContainerID,
- ProcessID: t.ContainerID,
- Pid: t.Pid,
- })
- case *apievents.TaskExit:
- c.processEvent(ctx, libcontainerdtypes.EventExit, libcontainerdtypes.EventInfo{
- ContainerID: t.ContainerID,
- ProcessID: t.ID,
- Pid: t.Pid,
- ExitCode: t.ExitStatus,
- ExitedAt: protobuf.FromTimestamp(t.ExitedAt),
- })
- case *apievents.TaskOOM:
- c.processEvent(ctx, libcontainerdtypes.EventOOM, libcontainerdtypes.EventInfo{
- ContainerID: t.ContainerID,
- })
- case *apievents.TaskExecAdded:
- c.processEvent(ctx, libcontainerdtypes.EventExecAdded, libcontainerdtypes.EventInfo{
- ContainerID: t.ContainerID,
- ProcessID: t.ExecID,
- })
- case *apievents.TaskExecStarted:
- c.processEvent(ctx, libcontainerdtypes.EventExecStarted, libcontainerdtypes.EventInfo{
- ContainerID: t.ContainerID,
- ProcessID: t.ExecID,
- Pid: t.Pid,
- })
- case *apievents.TaskPaused:
- c.processEvent(ctx, libcontainerdtypes.EventPaused, libcontainerdtypes.EventInfo{
- ContainerID: t.ContainerID,
- })
- case *apievents.TaskResumed:
- c.processEvent(ctx, libcontainerdtypes.EventResumed, libcontainerdtypes.EventInfo{
- ContainerID: t.ContainerID,
- })
- case *apievents.TaskDelete:
- c.logger.WithFields(log.Fields{
- "topic": ev.Topic,
- "type": reflect.TypeOf(t),
- "container": t.ContainerID,
- }).Info("ignoring event")
- default:
- c.logger.WithFields(log.Fields{
- "topic": ev.Topic,
- "type": reflect.TypeOf(t),
- }).Info("ignoring event")
+ // Keep reconnects in this goroutine so a broken stream cannot churn through replacement goroutines.
+ timer := time.NewTimer(100 * time.Millisecond)
+ select {
+ case <-ctx.Done():
+ if !timer.Stop() {
+ select {
+ case <-timer.C:
+ default:
+ }
}
+ c.logger.WithError(ctx.Err()).Info("stopping event stream following graceful shutdown")
+ return
+ case <-timer.C:
}
}
}