Conversation
| # Module info | ||
| self._module_class = module_class | ||
| self._config = config | ||
| self.config = config |
There was a problem hiding this comment.
changed to match Module's more closely (how module's init normally sets config)
| """Forward to the container's Module.set_transport RPC.""" | ||
| result, _ = self.rpc.call_sync( | ||
| f"{self.remote_name}/configure_stream", ([stream_name, str(topic)], {}) | ||
| f"{self.remote_name}/set_transport", ([stream_name, transport], {}) |
There was a problem hiding this comment.
This is what allows setting transport type (and also simplifies DockerModule)
Greptile SummaryThis PR restores and significantly hardens the Docker module deployment system. The core change moves container lifecycle management (build/pull/start/RPC readiness) into Key points:
Confidence Score: 3/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant C as ModuleCoordinator
participant DWM as DockerWorkerManager
participant DM as DockerModule.__init__
participant WM as WorkerManager
participant STM as safe_thread_map
C->>WM: deploy_parallel(worker_specs)
WM->>STM: safe_thread_map(assignments, deploy_fn, on_errors)
STM-->>WM: [RPCClient, ...]
WM-->>C: worker_results
C->>DWM: deploy_parallel(docker_specs)
DWM->>STM: safe_thread_map(specs, DockerModule(...), on_errors)
par For each Docker spec
STM->>DM: DockerModule(cls, *args, **kwargs)
DM->>DM: build_image / docker pull
DM->>DM: docker run -d
DM->>DM: rpc.start() + _wait_for_rpc()
DM-->>STM: DockerModule instance
end
alt Any failure
STM->>DWM: on_errors(outcomes, successes, errors)
DWM->>DM: mod.stop() for each success
DWM-->>C: raise ExceptionGroup
else All succeed
STM-->>DWM: [DockerModule, ...]
DWM-->>C: docker_results
end
C->>C: reassemble results + register _deployed_modules
Note over C: start_all_modules()
C->>C: ThreadPoolExecutor.map(m.start(), modules)
par For each module
C->>DM: DockerModule.start() → rpc.call_sync(".../start")
C->>WM: RPCClient.start() → RPC call
end
Last reviewed commit: 55cc94c |
dimos/core/module_coordinator.py
Outdated
| if docker_specs: | ||
| with ThreadPoolExecutor(max_workers=len(docker_specs)) as executor: | ||
| docker_results = list( | ||
| executor.map( | ||
| lambda spec: DockerModule(spec[0], *spec[1], **spec[2]), docker_specs | ||
| ) | ||
| ) | ||
| finally: | ||
| # Reassemble results in original input order | ||
| results: list[Any] = [None] * len(module_specs) | ||
| for idx, mod in zip(worker_indices, worker_results, strict=False): | ||
| results[idx] = mod | ||
| for idx, mod in zip(docker_indices, docker_results, strict=False): | ||
| results[idx] = mod |
There was a problem hiding this comment.
Container leak on partial parallel Docker deployment failure
When executor.map is used with list() and one DockerModule.__init__ raises, list() propagates the first exception immediately, discarding the already-computed results for successfully-started containers. docker_results is never reassigned from [], so those containers are registered in neither docker_results nor _deployed_modules — they become orphaned running containers with no cleanup path.
Concrete example with two docker specs:
- Future 0 completes → container A starts successfully
- Future 1 raises →
list(executor.map(...))re-raises,docker_resultsstays[] finallyblock:docker_results = []→ container A is not registered → leaked
The only containers that DockerModule.__init__'s own except/_cleanup() guard removes are those whose __init__ actually raised; successful ones have no one to call stop() on them.
Consider collecting futures explicitly so partial results can always be returned to the finally block:
futures = {executor.submit(lambda s=spec: DockerModule(s[0], *s[1], **s[2])): i
for i, spec in zip(docker_indices, docker_specs)}
first_exc = None
for fut in concurrent.futures.as_completed(futures):
idx = futures[fut]
try:
docker_results_by_idx[idx] = fut.result()
except Exception as e:
if first_exc is None:
first_exc = e
# then re-raise first_exc if present, after registering all partial resultsThere was a problem hiding this comment.
is this a problem for self._client.deploy_parallel too? (look at its implementation) @greptile-apps
There was a problem hiding this comment.
Yes, WorkerManager.deploy_parallel has the same category of leak:
# dimos/core/worker_manager.py:43-56
def deploy_parallel(...) -> list[RPCClient]:
workers: list[Worker] = []
for module_class, args, kwargs in module_specs:
worker = Worker(module_class, args=args, kwargs=kwargs)
worker.start_process() # ← starts a process
workers.append(worker)
for worker in workers:
worker.wait_until_ready() # ← can raise
self._workers.append(worker) # ← registration for cleanup
return [worker.get_instance() for worker in workers]If wait_until_ready() raises (timeout, crash, etc.), all workers in the local workers list that haven't been appended to self._workers yet have running processes but won't be cleaned up by close_all().
Same root cause: partial success isn't tracked for cleanup when exceptions abort the registration loop.
There was a problem hiding this comment.
okay, check the new implementation(s) now
| module_name = spec.name | ||
| else: | ||
| # Fallback: derive from file path relative to cwd | ||
| main_file = getattr(__main__, "__file__", None) |
There was a problem hiding this comment.
This fallback case was a claude side effect, probably doesn't need to be this complex but looked fine.
|
|
||
| try: | ||
| self.rpc.call_sync( | ||
| f"{self.remote_name}/start", ([], {}), rpc_timeout=RPC_READY_TIMEOUT |
There was a problem hiding this comment.
start has side effects, so its not great to call it in a loop, which is why this was switched to get_rpc_method_names
| _n: int | None = None | ||
| _memory_limit: str = "auto" | ||
| _deployed_modules: dict[type[Module], ModuleProxy] | ||
| _deployed_modules: dict[type[Module], ModuleProxyProtocol] |
There was a problem hiding this comment.
This type had to be broadened to support DockerModule, it more or less only shows its effects internally though
dimos/core/module_coordinator.py
Outdated
| def deploy_parallel( | ||
| self, module_specs: list[tuple[type[ModuleT], tuple[Any, ...], dict[str, Any]]] | ||
| ) -> list[ModuleProxy]: | ||
| from dimos.core.docker_runner import DockerModule, is_docker_module |
There was a problem hiding this comment.
Here It might be a personal preference to not to have this inline import and can feel that it can go to top level imports
There was a problem hiding this comment.
its avoiding a circular import:
module_coordinator → docker_runner → module → blueprints → module_coordinator
I'll add a comment
| self._stop_rpc_client = None | ||
|
|
||
|
|
||
| class ModuleProxyProtocol(Protocol): |
There was a problem hiding this comment.
needed to make this to have an umbrella type for DockerModule and RPCClient. Ideally we'd just use ProxyModule itself but that doesn't work for a handful of reasons that I'm too tired to write out at this point (just follow the import traces and attr inheritance)
| actor = worker.deploy_module(module_class, args=args, kwargs=kwargs) | ||
| return RPCClient(actor, module_class) | ||
|
|
||
| with ThreadPoolExecutor(max_workers=len(assignments)) as pool: |
There was a problem hiding this comment.
given [ A-finished, B-Error, C-pending ], the C won't get properly cleaned up because B will throw immediately. safe_thread_map fixes that
Also I added tests to cover this case so we don't get a regression later
| COPY examples/docker_hello_world/hello_docker.py /dimos/source/examples/docker_hello_world/hello_docker.py | ||
| RUN touch /dimos/source/examples/__init__.py /dimos/source/examples/docker_hello_world/__init__.py | ||
|
|
||
| WORKDIR /app |
| """ | ||
| Hello World Docker Module | ||
| ========================== | ||
There was a problem hiding this comment.
example for Input stream, Output stream, rpc call, and config value passing
|
@greptile-apps please rescore/recheck the PR |
| @@ -0,0 +1,195 @@ | |||
| # Copyright 2026 Dimensional Inc. | |||
| @@ -0,0 +1,219 @@ | |||
| # Copyright 2026 Dimensional Inc. | |||
There was a problem hiding this comment.
meaninful / rigorous tests
|
|
||
| return safe_thread_map( | ||
| specs, lambda spec: DockerModule(spec[0], *spec[1], **spec[2]), _on_errors | ||
| ) |
There was a problem hiding this comment.
I don't like this class (I don't really like that WorkerManager is a class either) but I'll follow Chestertons fence in this case.
I think addingself._docker_client (next to self._client in ModuleCoordinator) is too far though
| R = TypeVar("R") | ||
|
|
||
|
|
||
| def safe_thread_map( |
There was a problem hiding this comment.
a generic tool used by both DockerWorkerManager and WorkerManager: parallel execution with correct cleanup
| # Ignore build artifacts | ||
| dist/ | ||
| build/ | ||
| .Dockerfile.dimos |
There was a problem hiding this comment.
I don't like that this artifact is generated but I've already made enough changes to pre-existing DockerModule and I need to get rosnav done.
This just prevents build artifacts from building up
|
@greptile-apps recheck |
| reconnect = False | ||
| if _is_container_running(config, self._container_name): | ||
| if config.docker_reconnect_container: | ||
| logger.info(f"Reconnecting to running container: {self._container_name}") | ||
| reconnect = True | ||
| else: | ||
| logger.info(f"Stopping existing container: {self._container_name}") | ||
| _run( | ||
| [_docker_bin(config), "stop", self._container_name], | ||
| timeout=DOCKER_STOP_TIMEOUT, | ||
| ) | ||
|
|
||
| if not reconnect: | ||
| _remove_container(config, self._container_name) | ||
| cmd = self._build_docker_run_command() | ||
| logger.info(f"Starting docker container: {self._container_name}") | ||
| r = _run(cmd, timeout=DOCKER_RUN_TIMEOUT) | ||
| if r.returncode != 0: | ||
| raise RuntimeError( | ||
| f"Failed to start container.\nSTDOUT:\n{r.stdout}\nSTDERR:\n{r.stderr}" | ||
| ) | ||
| self.rpc.start() | ||
| self._running = True | ||
| # docker run -d returns before Module.__init__ finishes in the container, | ||
| # so we poll until the RPC server is reachable before returning. | ||
| self._wait_for_rpc() | ||
| except Exception: | ||
| with suppress(Exception): | ||
| self._cleanup() |
There was a problem hiding this comment.
Reconnect mode destroys the pre-existing container on cleanup failure
When docker_reconnect_container=True and the existing container is reached (reconnect = True), the _remove_container / docker run steps are correctly skipped. However, if _wait_for_rpc() subsequently raises (e.g. RPC timeout), the except Exception block calls _cleanup(), which unconditionally runs both docker stop <name> and docker rm -f <name> — permanently destroying the container the caller wanted to preserve.
# _cleanup() always does:
_run([_docker_bin(self.config), "stop", self._container_name], ...)
_remove_container(self.config, self._container_name)A user who sets docker_reconnect_container=True almost certainly wants the container to survive a failed reconnect attempt. Consider tracking whether __init__ created the container itself (vs reconnected to one that was already running) and only stopping/removing it in _cleanup() when DockerModule was responsible for starting it:
self._owns_container: bool = not reconnect # set just before rpc.start()
# _cleanup():
if self._owns_container:
with suppress(Exception):
_run([_docker_bin(self.config), "stop", self._container_name], ...)
with suppress(Exception):
_remove_container(self.config, self._container_name)| def start_all_modules(self) -> None: | ||
| modules = list(self._deployed_modules.values()) | ||
| if isinstance(self._client, WorkerManager): | ||
| with ThreadPoolExecutor(max_workers=len(modules)) as executor: | ||
| list(executor.map(lambda m: m.start(), modules)) | ||
| else: | ||
| for module in modules: | ||
| module.start() | ||
| if not modules: | ||
| raise ValueError("No modules deployed. Call deploy() before start_all_modules().") | ||
| with ThreadPoolExecutor(max_workers=len(modules)) as executor: | ||
| list(executor.map(lambda m: m.start(), modules)) |
There was a problem hiding this comment.
start_all_modules silently discards all-but-first exception
executor.map returns an ordered iterator; list() re-raises the first exception it encounters and discards all later ones. When two or more module.start() calls fail concurrently (e.g. two Docker RPC calls time out), only the first failure is surfaced to the caller — the rest are silently swallowed, making diagnosis very hard.
DockerModule.start() does call self.stop() on its own failure, so containers are cleaned up individually, but the exception itself is lost.
Consider replacing with safe_thread_map (already available in this PR) and an on_errors handler that re-raises an ExceptionGroup containing all failures:
def _start_one(m):
m.start()
def _on_start_errors(outcomes, successes, errors):
raise ExceptionGroup("start_all_modules failed", errors)
safe_thread_map(modules, _start_one, _on_start_errors)| worker_results: list[Any] = [] | ||
| docker_results: list[Any] = [] | ||
| try: | ||
| worker_results = self._client.deploy_parallel(worker_specs) | ||
| docker_results = DockerWorkerManager.deploy_parallel(docker_specs) # type: ignore[arg-type] | ||
| finally: | ||
| # Reassemble results in original input order | ||
| results: list[Any] = [None] * len(module_specs) | ||
| for idx, mod in zip(worker_indices, worker_results, strict=False): | ||
| results[idx] = mod | ||
| for idx, mod in zip(docker_indices, docker_results, strict=False): # type: ignore[assignment] | ||
| results[idx] = mod | ||
| # Register whatever succeeded so stop() can clean them up | ||
| for (module_class, _, _), module in zip(module_specs, results, strict=False): | ||
| if module is not None: | ||
| self._deployed_modules[module_class] = module | ||
|
|
||
| return results |
There was a problem hiding this comment.
Worker and Docker deploys are sequential, not parallel
deploy_parallel first blocks on self._client.deploy_parallel(worker_specs), and only then calls DockerWorkerManager.deploy_parallel(docker_specs). Since DockerModule.__init__ (which does the actual image build/pull, container start and RPC readiness wait) now runs inside deploy_parallel, a mixed spec list (worker + Docker modules) will block for all worker processes before beginning the Docker container starts — potentially adding minutes to startup for scenarios like the rosnav stack.
If true parallel launch of both groups is desired, consider submitting both into a single ThreadPoolExecutor (or composing two safe_thread_map calls with run_in_background) before joining results.
Draft because I need to test this with the rosnav
Problem(s)
configure_streamdoes not support different transport types (Blocker for DIM-569)Closes DIM-662
Solution
ModuleCoordinatorto restore functionality: deploy docker modules in parallelconfigure_stream(not needed) in favor of usingset_transportwork as intended to allow different stream typesBreaking Changes
None
How to Test
python ./examples/docker_hello_world/hello_docker.pyContributor License Agreement