Skip to content

mcp_runtime

MCPClientManager

Threaded manager for outbound MCP stdio client sessions.

Architectural rules (enforced by this implementation):

  1. Pure sync UI, pure async daemon. Public methods called from the console (register_server, start_server, stop_server, get_status, get_all_external_tools, shutdown) never wait on the background event loop without a strict timeout, and register_server performs only synchronous file I/O.
  2. Non-blocking start. :meth:start_server schedules :meth:_async_start_server on :attr:loop via :func:asyncio.run_coroutine_threadsafe and returns immediately. The async path wraps connection setup in :func:asyncio.wait_for.
  3. Aggressive stop. :meth:_async_stop_server first tries a graceful close, then unconditionally terminate() / kill() the captured subprocess so no zombie servers linger.
  4. Process tracking. Each running entry stores session, exit_stack, and process (asyncio.subprocess.Process) for direct kill access.
  5. Bounded teardown. :meth:shutdown iterates a snapshot of running_servers, calls :meth:stop_server for each, then signals the loop to stop via loop.call_soon_threadsafe.
Source code in wintermute/integrations/mcp_runtime.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
class MCPClientManager:
    """Threaded manager for outbound MCP ``stdio`` client sessions.

    Architectural rules (enforced by this implementation):

    1. **Pure sync UI, pure async daemon.** Public methods called from the
       console (``register_server``, ``start_server``, ``stop_server``,
       ``get_status``, ``get_all_external_tools``, ``shutdown``) never wait
       on the background event loop without a strict timeout, and
       ``register_server`` performs only synchronous file I/O.
    2. **Non-blocking start.** :meth:`start_server` schedules
       :meth:`_async_start_server` on :attr:`loop` via
       :func:`asyncio.run_coroutine_threadsafe` and returns immediately.
       The async path wraps connection setup in :func:`asyncio.wait_for`.
    3. **Aggressive stop.** :meth:`_async_stop_server` first tries a
       graceful close, then unconditionally ``terminate()`` / ``kill()``
       the captured subprocess so no zombie servers linger.
    4. **Process tracking.** Each running entry stores ``session``,
       ``exit_stack``, and ``process`` (``asyncio.subprocess.Process``)
       for direct kill access.
    5. **Bounded teardown.** :meth:`shutdown` iterates a snapshot of
       ``running_servers``, calls :meth:`stop_server` for each, then
       signals the loop to stop via ``loop.call_soon_threadsafe``.
    """

    DEFAULT_CONFIG_PATH = _DEFAULT_CONFIG_PATH

    def __init__(self, config_path: Union[str, Path, None] = None) -> None:
        self.config_path: Path = (
            Path(config_path) if config_path else self.DEFAULT_CONFIG_PATH
        )
        self.config_path.parent.mkdir(parents=True, exist_ok=True)
        self._registered: Dict[str, MCPServerDefinition] = self._load_config()

        self.loop: Optional[asyncio.AbstractEventLoop] = None
        self._thread: Optional[threading.Thread] = None
        self._loop_lock = threading.Lock()

        # State for every running server. Each entry has:
        #   {
        #       "session": ClientSession,
        #       "exit_stack": AsyncExitStack,
        #       "process": asyncio.subprocess.Process,
        #       "tools": list[mcp.types.Tool],
        #       "definition": MCPServerDefinition,
        #   }
        self.running_servers: Dict[str, Dict[str, Any]] = {}
        self._state_lock = threading.Lock()

    # -- config persistence (Rule 1: pure sync) ----------------------------

    def _load_config(self) -> Dict[str, MCPServerDefinition]:
        if not self.config_path.is_file():
            return {}
        try:
            with self.config_path.open("r", encoding="utf-8") as fh:
                raw = json.load(fh)
        except json.JSONDecodeError as exc:
            log.warning(
                "Ignoring malformed MCP config at %s: %s", self.config_path, exc
            )
            return {}
        if not isinstance(raw, list):
            log.warning("MCP config at %s is not a list; ignoring.", self.config_path)
            return {}
        out: Dict[str, MCPServerDefinition] = {}
        for entry in raw:
            try:
                defn = MCPServerDefinition.model_validate(entry)
            except Exception as exc:
                log.warning("Skipping invalid MCP server entry %r: %s", entry, exc)
                continue
            out[defn.name] = defn
        return out

    def _save_config(self) -> None:
        payload = [defn.model_dump() for defn in self._registered.values()]
        tmp_path = self.config_path.with_suffix(self.config_path.suffix + ".tmp")
        with tmp_path.open("w", encoding="utf-8") as fh:
            json.dump(payload, fh, indent=2)
        tmp_path.replace(self.config_path)

    # -- registration (Rule 1: pure sync, no event-loop touch) -------------

    def register_server(
        self,
        name: str,
        command: str,
        args: Optional[List[str]] = None,
        env: Optional[Dict[str, str]] = None,
    ) -> MCPServerDefinition:
        """Persist a server definition. Pure sync file I/O — no event loop.

        Existing entries with the same ``name`` are overwritten.
        """
        defn = MCPServerDefinition(
            name=name,
            command=command,
            args=list(args) if args else [],
            env=dict(env) if env else {},
        )
        self._registered[name] = defn
        self._save_config()
        return defn

    def delete_server(self, name: str) -> bool:
        """Remove a registered server from the config file.

        If the server is currently running, it is stopped (with the same
        bounded timeout as :meth:`stop_server`) before removal.
        """
        if name not in self._registered:
            return False
        with self._state_lock:
            running = name in self.running_servers
        if running:
            self.stop_server(name)
        del self._registered[name]
        self._save_config()
        return True

    def list_registered(self) -> List[MCPServerDefinition]:
        """Return all registered server definitions."""
        return list(self._registered.values())

    # -- background loop ---------------------------------------------------

    def _ensure_loop(self) -> asyncio.AbstractEventLoop:
        with self._loop_lock:
            if (
                self.loop is not None
                and self._thread is not None
                and self._thread.is_alive()
            ):
                return self.loop
            ready = threading.Event()
            container: List[asyncio.AbstractEventLoop] = []

            def runner() -> None:
                loop = asyncio.new_event_loop()
                asyncio.set_event_loop(loop)
                container.append(loop)
                ready.set()
                try:
                    loop.run_forever()
                finally:
                    try:
                        pending = asyncio.all_tasks(loop)
                        for task in pending:
                            task.cancel()
                        if pending:
                            loop.run_until_complete(
                                asyncio.gather(*pending, return_exceptions=True)
                            )
                    finally:
                        loop.close()

            self._thread = threading.Thread(
                target=runner, name="MCPClientManager-loop", daemon=True
            )
            self._thread.start()
            ready.wait()
            self.loop = container[0]
            return self.loop

    # -- start (Rule 2: non-blocking) --------------------------------------

    def start_server(self, name: str) -> str:
        """Schedule the connection on the background loop and return now.

        Returns a one-line status string suitable for the console. The
        actual stdio handshake happens on the daemon thread, bounded by
        :data:`_INIT_TIMEOUT_SECONDS`. Use :meth:`get_status` to confirm
        the server has finished initialising.
        """
        if name not in self._registered:
            return f"Server {name!r} is not registered."
        with self._state_lock:
            if name in self.running_servers:
                tool_count = len(self.running_servers[name].get("tools", []))
                return f"Server {name!r} is already running ({tool_count} tools)."
        loop = self._ensure_loop()
        asyncio.run_coroutine_threadsafe(self._async_start_server(name), loop)
        return f"Starting MCP server {name!r} in background…"

    async def _async_start_server(self, name: str) -> None:
        """Spawn the subprocess, bridge stdio, run the MCP handshake.

        Bounded by :data:`_INIT_TIMEOUT_SECONDS` so a wedged server cannot
        strand the daemon. On any failure path the subprocess is
        force-killed before the coroutine returns.
        """
        definition = self._registered.get(name)
        if definition is None:
            log.warning("MCP server %r vanished from config before start", name)
            return

        # ---- Step 1: spawn the subprocess ourselves so we own the handle.
        try:
            spawn_env: Optional[Dict[str, str]] = None
            if definition.env:
                spawn_env = {**os.environ, **definition.env}
            process = await asyncio.create_subprocess_exec(
                definition.command,
                *definition.args,
                stdin=asyncio.subprocess.PIPE,
                stdout=asyncio.subprocess.PIPE,
                env=spawn_env,
            )
        except Exception as exc:
            log.error("Failed to spawn MCP server %r: %s", name, exc)
            return

        # ---- Step 2: bridge stdio with anyio in-memory channels.
        read_writer, read_reader = anyio.create_memory_object_stream[Any](
            max_buffer_size=0
        )
        write_writer, write_reader = anyio.create_memory_object_stream[Any](
            max_buffer_size=0
        )
        out_task = asyncio.create_task(_stdout_to_session_stream(process, read_writer))
        in_task = asyncio.create_task(_session_stream_to_stdin(process, write_reader))

        exit_stack = AsyncExitStack()

        async def _stop_pumps() -> None:
            for task in (out_task, in_task):
                task.cancel()
            await asyncio.gather(out_task, in_task, return_exceptions=True)
            for stream in (read_writer, write_writer):
                try:
                    await stream.aclose()
                except Exception:
                    pass

        exit_stack.push_async_callback(_stop_pumps)

        # ---- Step 3: run the MCP handshake under a strict timeout.
        try:
            session = await exit_stack.enter_async_context(
                ClientSession(read_reader, write_writer)
            )
            await asyncio.wait_for(session.initialize(), timeout=_INIT_TIMEOUT_SECONDS)
            listing = await asyncio.wait_for(
                session.list_tools(), timeout=_INIT_TIMEOUT_SECONDS
            )
        except asyncio.TimeoutError:
            log.error(
                "MCP server %r did not finish handshake within %.1fs; aborting",
                name,
                _INIT_TIMEOUT_SECONDS,
            )
            await self._cleanup_after_failed_start(exit_stack, process)
            return
        except Exception as exc:
            log.error("MCP server %r failed during init: %s", name, exc)
            await self._cleanup_after_failed_start(exit_stack, process)
            return

        with self._state_lock:
            self.running_servers[name] = {
                "session": session,
                "exit_stack": exit_stack,
                "process": process,
                "tools": list(listing.tools),
                "definition": definition,
            }
        log.info(
            "MCP server %r ready (pid=%d, tools=%d)",
            name,
            process.pid,
            len(listing.tools),
        )

    async def _cleanup_after_failed_start(
        self,
        exit_stack: AsyncExitStack,
        process: asyncio.subprocess.Process,
    ) -> None:
        """Best-effort teardown when init times out or raises."""
        try:
            await asyncio.wait_for(exit_stack.aclose(), timeout=2.0)
        except Exception:
            pass
        await self._force_kill(process)

    @staticmethod
    async def _force_kill(process: asyncio.subprocess.Process) -> None:
        """terminate → 1s wait → kill → 1s wait. Survives all races."""
        if process.returncode is not None:
            return
        try:
            process.terminate()
        except ProcessLookupError:
            return
        try:
            await asyncio.wait_for(process.wait(), timeout=1.0)
            return
        except asyncio.TimeoutError:
            pass
        try:
            process.kill()
        except ProcessLookupError:
            return
        try:
            await asyncio.wait_for(process.wait(), timeout=1.0)
        except asyncio.TimeoutError:
            log.error(
                "Subprocess pid=%d refused SIGKILL; leaving as orphan",
                process.pid,
            )

    # -- stop (Rule 3: aggressive termination) -----------------------------

    def stop_server(self, name: str, *, timeout: float = _STOP_TIMEOUT_SECONDS) -> str:
        """Stop ``name`` synchronously, bounded by ``timeout`` seconds.

        Internally schedules :meth:`_async_stop_server` on the background
        loop and waits with a strict timeout — never indefinitely.
        """
        with self._state_lock:
            if name not in self.running_servers:
                return f"Server {name!r} is not running."
        loop = self._ensure_loop()
        future = asyncio.run_coroutine_threadsafe(self._async_stop_server(name), loop)
        try:
            future.result(timeout=timeout)
        except concurrent.futures.TimeoutError:
            log.warning(
                "stop_server(%r) exceeded %.1fs; subprocess may still be terminating",
                name,
                timeout,
            )
            return (
                f"Stop of {name!r} did not finish in {timeout:.1f}s — "
                "the subprocess may still be terminating."
            )
        except Exception as exc:
            log.exception("Unexpected error stopping MCP server %r", name)
            return f"Error stopping {name!r}: {exc}"
        return f"Stopped MCP server {name!r}."

    async def _async_stop_server(self, name: str) -> None:
        """Graceful close → fall through → terminate → kill. In that order.

        Implements Rule 3 verbatim. Any exception thrown by the graceful
        path is swallowed; subprocess termination is mandatory.
        """
        # Step 1: retrieve the state (and remove it so re-entry is safe).
        with self._state_lock:
            state = self.running_servers.pop(name, None)
        if state is None:
            return

        exit_stack: AsyncExitStack = state["exit_stack"]
        process: asyncio.subprocess.Process = state["process"]

        # Step 2: graceful closure with a 3s ceiling.
        try:
            await asyncio.wait_for(exit_stack.aclose(), timeout=3.0)
        # Step 3: catch broadly — graceful close MUST NOT block force kill.
        except asyncio.TimeoutError:
            log.warning("Graceful close of MCP server %r timed out; forcing kill", name)
        except Exception as exc:
            log.warning(
                "Graceful close of MCP server %r raised %s; forcing kill",
                name,
                exc,
            )

        # Step 4: force-kill the subprocess no matter what.
        await self._force_kill(process)
        log.info("MCP server %r stopped", name)

    # -- introspection -----------------------------------------------------

    def get_status(self) -> List[Dict[str, Any]]:
        """Snapshot of every connected server."""
        with self._state_lock:
            snapshot = list(self.running_servers.items())
        out: List[Dict[str, Any]] = []
        for name, state in snapshot:
            definition: MCPServerDefinition = state["definition"]
            process: asyncio.subprocess.Process = state["process"]
            out.append(
                {
                    "name": name,
                    "command": definition.command,
                    "args": list(definition.args),
                    "tools": len(state.get("tools", [])),
                    "pid": process.pid,
                }
            )
        return out

    def get_all_external_tools(self) -> List[ToolSpec]:
        """Return every tool exposed by every connected MCP server.

        Tool names are namespaced as ``<server>__<tool>`` so external
        tools never collide with Wintermute's internal cartridge tools.
        """
        out: List[ToolSpec] = []
        with self._state_lock:
            snapshot = [
                (name, list(state.get("tools", [])))
                for name, state in self.running_servers.items()
            ]
        for name, tools in snapshot:
            for tool in tools:
                description = getattr(tool, "description", "") or ""
                input_schema = cast(JSONObject, getattr(tool, "inputSchema", {}) or {})
                out.append(
                    ToolSpec(
                        name=f"{name}__{tool.name}",
                        description=description,
                        input_schema=input_schema,
                        output_schema={},
                    )
                )
        return out

    # -- shutdown (Rule 5) -------------------------------------------------

    def shutdown(self, *, timeout: float = _STOP_TIMEOUT_SECONDS) -> None:
        """Stop every running server, then signal the daemon loop to stop.

        Iterates a copy of ``running_servers``' keys per Rule 5 so the
        underlying dict can be mutated by the in-flight stop coroutines
        without raising.
        """
        with self._state_lock:
            names = list(self.running_servers.keys())
        for name in names:
            try:
                self.stop_server(name, timeout=timeout)
            except Exception:
                log.exception("shutdown: failed to stop %r", name)

        # Brief pause to let the loop finalise any cancellation callbacks.
        time.sleep(0.1)

        with self._loop_lock:
            loop = self.loop
            thread = self._thread
            self.loop = None
            self._thread = None
        if loop is not None and thread is not None and thread.is_alive():
            loop.call_soon_threadsafe(loop.stop)
            thread.join(timeout=timeout)

delete_server(name)

Remove a registered server from the config file.

If the server is currently running, it is stopped (with the same bounded timeout as :meth:stop_server) before removal.

Source code in wintermute/integrations/mcp_runtime.py
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
def delete_server(self, name: str) -> bool:
    """Remove a registered server from the config file.

    If the server is currently running, it is stopped (with the same
    bounded timeout as :meth:`stop_server`) before removal.
    """
    if name not in self._registered:
        return False
    with self._state_lock:
        running = name in self.running_servers
    if running:
        self.stop_server(name)
    del self._registered[name]
    self._save_config()
    return True

get_all_external_tools()

Return every tool exposed by every connected MCP server.

Tool names are namespaced as <server>__<tool> so external tools never collide with Wintermute's internal cartridge tools.

Source code in wintermute/integrations/mcp_runtime.py
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
def get_all_external_tools(self) -> List[ToolSpec]:
    """Return every tool exposed by every connected MCP server.

    Tool names are namespaced as ``<server>__<tool>`` so external
    tools never collide with Wintermute's internal cartridge tools.
    """
    out: List[ToolSpec] = []
    with self._state_lock:
        snapshot = [
            (name, list(state.get("tools", [])))
            for name, state in self.running_servers.items()
        ]
    for name, tools in snapshot:
        for tool in tools:
            description = getattr(tool, "description", "") or ""
            input_schema = cast(JSONObject, getattr(tool, "inputSchema", {}) or {})
            out.append(
                ToolSpec(
                    name=f"{name}__{tool.name}",
                    description=description,
                    input_schema=input_schema,
                    output_schema={},
                )
            )
    return out

get_status()

Snapshot of every connected server.

Source code in wintermute/integrations/mcp_runtime.py
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
def get_status(self) -> List[Dict[str, Any]]:
    """Snapshot of every connected server."""
    with self._state_lock:
        snapshot = list(self.running_servers.items())
    out: List[Dict[str, Any]] = []
    for name, state in snapshot:
        definition: MCPServerDefinition = state["definition"]
        process: asyncio.subprocess.Process = state["process"]
        out.append(
            {
                "name": name,
                "command": definition.command,
                "args": list(definition.args),
                "tools": len(state.get("tools", [])),
                "pid": process.pid,
            }
        )
    return out

list_registered()

Return all registered server definitions.

Source code in wintermute/integrations/mcp_runtime.py
396
397
398
def list_registered(self) -> List[MCPServerDefinition]:
    """Return all registered server definitions."""
    return list(self._registered.values())

register_server(name, command, args=None, env=None)

Persist a server definition. Pure sync file I/O — no event loop.

Existing entries with the same name are overwritten.

Source code in wintermute/integrations/mcp_runtime.py
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
def register_server(
    self,
    name: str,
    command: str,
    args: Optional[List[str]] = None,
    env: Optional[Dict[str, str]] = None,
) -> MCPServerDefinition:
    """Persist a server definition. Pure sync file I/O — no event loop.

    Existing entries with the same ``name`` are overwritten.
    """
    defn = MCPServerDefinition(
        name=name,
        command=command,
        args=list(args) if args else [],
        env=dict(env) if env else {},
    )
    self._registered[name] = defn
    self._save_config()
    return defn

shutdown(*, timeout=_STOP_TIMEOUT_SECONDS)

Stop every running server, then signal the daemon loop to stop.

Iterates a copy of running_servers' keys per Rule 5 so the underlying dict can be mutated by the in-flight stop coroutines without raising.

Source code in wintermute/integrations/mcp_runtime.py
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
def shutdown(self, *, timeout: float = _STOP_TIMEOUT_SECONDS) -> None:
    """Stop every running server, then signal the daemon loop to stop.

    Iterates a copy of ``running_servers``' keys per Rule 5 so the
    underlying dict can be mutated by the in-flight stop coroutines
    without raising.
    """
    with self._state_lock:
        names = list(self.running_servers.keys())
    for name in names:
        try:
            self.stop_server(name, timeout=timeout)
        except Exception:
            log.exception("shutdown: failed to stop %r", name)

    # Brief pause to let the loop finalise any cancellation callbacks.
    time.sleep(0.1)

    with self._loop_lock:
        loop = self.loop
        thread = self._thread
        self.loop = None
        self._thread = None
    if loop is not None and thread is not None and thread.is_alive():
        loop.call_soon_threadsafe(loop.stop)
        thread.join(timeout=timeout)

start_server(name)

Schedule the connection on the background loop and return now.

Returns a one-line status string suitable for the console. The actual stdio handshake happens on the daemon thread, bounded by :data:_INIT_TIMEOUT_SECONDS. Use :meth:get_status to confirm the server has finished initialising.

Source code in wintermute/integrations/mcp_runtime.py
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
def start_server(self, name: str) -> str:
    """Schedule the connection on the background loop and return now.

    Returns a one-line status string suitable for the console. The
    actual stdio handshake happens on the daemon thread, bounded by
    :data:`_INIT_TIMEOUT_SECONDS`. Use :meth:`get_status` to confirm
    the server has finished initialising.
    """
    if name not in self._registered:
        return f"Server {name!r} is not registered."
    with self._state_lock:
        if name in self.running_servers:
            tool_count = len(self.running_servers[name].get("tools", []))
            return f"Server {name!r} is already running ({tool_count} tools)."
    loop = self._ensure_loop()
    asyncio.run_coroutine_threadsafe(self._async_start_server(name), loop)
    return f"Starting MCP server {name!r} in background…"

stop_server(name, *, timeout=_STOP_TIMEOUT_SECONDS)

Stop name synchronously, bounded by timeout seconds.

Internally schedules :meth:_async_stop_server on the background loop and waits with a strict timeout — never indefinitely.

Source code in wintermute/integrations/mcp_runtime.py
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
def stop_server(self, name: str, *, timeout: float = _STOP_TIMEOUT_SECONDS) -> str:
    """Stop ``name`` synchronously, bounded by ``timeout`` seconds.

    Internally schedules :meth:`_async_stop_server` on the background
    loop and waits with a strict timeout — never indefinitely.
    """
    with self._state_lock:
        if name not in self.running_servers:
            return f"Server {name!r} is not running."
    loop = self._ensure_loop()
    future = asyncio.run_coroutine_threadsafe(self._async_stop_server(name), loop)
    try:
        future.result(timeout=timeout)
    except concurrent.futures.TimeoutError:
        log.warning(
            "stop_server(%r) exceeded %.1fs; subprocess may still be terminating",
            name,
            timeout,
        )
        return (
            f"Stop of {name!r} did not finish in {timeout:.1f}s — "
            "the subprocess may still be terminating."
        )
    except Exception as exc:
        log.exception("Unexpected error stopping MCP server %r", name)
        return f"Error stopping {name!r}: {exc}"
    return f"Stopped MCP server {name!r}."

MCPRuntime

Manages the lifecycle of an MCP connection and registers its tools into the Wintermute ToolRegistry.

Source code in wintermute/integrations/mcp_runtime.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
class MCPRuntime:
    """
    Manages the lifecycle of an MCP connection and registers its tools
    into the Wintermute ToolRegistry.
    """

    def __init__(
        self, command: str, args: List[str], env: Optional[Dict[str, str]] = None
    ) -> None:
        self.server_params = StdioServerParameters(command=command, args=args, env=env)
        self.session: Optional[ClientSession] = None
        self._exit_stack: Optional[AsyncExitStack] = None

    async def initialize(self) -> None:
        """Connects to MCP and registers tools into Wintermute's global registry."""
        self._exit_stack = AsyncExitStack()

        # 1. Connect
        # We explicitly assert to satisfy mypy that _exit_stack is initialized
        assert self._exit_stack is not None

        read, write = await self._exit_stack.enter_async_context(
            stdio_client(self.server_params)
        )
        self.session = await self._exit_stack.enter_async_context(
            ClientSession(read, write)
        )
        await self.session.initialize()

        # 2. List Tools from MCP
        mcp_tools = await self.session.list_tools()

        # 3. Register as Wintermute Tools
        for mt in mcp_tools.tools:
            # We create a closure to bind the tool name for the handler
            def make_handler(tool_name: str) -> Any:
                def handler(args: JSONObject) -> JSONObject:
                    # Wintermute expects a synchronous return (JSONObject), but MCP is async.
                    # We return the Coroutine object and rely on run_surgeon.py to await it.
                    # We use cast() to silence mypy complaining about returning a Coroutine.
                    try:
                        loop = asyncio.get_event_loop()
                    except RuntimeError:
                        loop = asyncio.new_event_loop()

                    # Convert Mapping to Dict for MCP
                    mcp_args = cast(Dict[str, Any], args)

                    if loop.is_running():
                        return cast(
                            JSONObject, self._execute_mcp_tool(tool_name, mcp_args)
                        )
                    else:
                        return loop.run_until_complete(
                            self._execute_mcp_tool(tool_name, mcp_args)
                        )

                return handler

            # Create Wintermute Tool
            wm_tool = Tool(
                name=mt.name,
                input_schema=mt.inputSchema,
                output_schema={},
                handler=make_handler(mt.name),
            )

            # Register in wintermute/ai/tools_runtime.py
            global_registry.register(wm_tool)
            # print(f"[*] Registered MCP Tool: {mt.name}")

    async def _execute_mcp_tool(
        self, name: str, args: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Actual execution logic."""
        if not self.session:
            raise RuntimeError("MCP Session is not initialized")

        result = await self.session.call_tool(name, args)

        # Flatten content blocks to a single string for the LLM
        output_text = []
        if result.content:
            for c in result.content:
                if c.type == "text":
                    # Accessing .text is safe here because we checked type == 'text'
                    output_text.append(c.text)
                elif c.type == "image":
                    output_text.append("[Image Data]")
                elif (
                    c.type == "resource"
                ):  # <--- FIXED: Changed from "embedded_resource" to "resource"
                    output_text.append("[Embedded Resource]")
                # We can ignore 'audio' or 'resource_link' for text-based LLM output for now

        return {"output": "\n".join(output_text)}

    async def shutdown(self) -> None:
        if self._exit_stack:
            await self._exit_stack.aclose()

initialize() async

Connects to MCP and registers tools into Wintermute's global registry.

Source code in wintermute/integrations/mcp_runtime.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
async def initialize(self) -> None:
    """Connects to MCP and registers tools into Wintermute's global registry."""
    self._exit_stack = AsyncExitStack()

    # 1. Connect
    # We explicitly assert to satisfy mypy that _exit_stack is initialized
    assert self._exit_stack is not None

    read, write = await self._exit_stack.enter_async_context(
        stdio_client(self.server_params)
    )
    self.session = await self._exit_stack.enter_async_context(
        ClientSession(read, write)
    )
    await self.session.initialize()

    # 2. List Tools from MCP
    mcp_tools = await self.session.list_tools()

    # 3. Register as Wintermute Tools
    for mt in mcp_tools.tools:
        # We create a closure to bind the tool name for the handler
        def make_handler(tool_name: str) -> Any:
            def handler(args: JSONObject) -> JSONObject:
                # Wintermute expects a synchronous return (JSONObject), but MCP is async.
                # We return the Coroutine object and rely on run_surgeon.py to await it.
                # We use cast() to silence mypy complaining about returning a Coroutine.
                try:
                    loop = asyncio.get_event_loop()
                except RuntimeError:
                    loop = asyncio.new_event_loop()

                # Convert Mapping to Dict for MCP
                mcp_args = cast(Dict[str, Any], args)

                if loop.is_running():
                    return cast(
                        JSONObject, self._execute_mcp_tool(tool_name, mcp_args)
                    )
                else:
                    return loop.run_until_complete(
                        self._execute_mcp_tool(tool_name, mcp_args)
                    )

            return handler

        # Create Wintermute Tool
        wm_tool = Tool(
            name=mt.name,
            input_schema=mt.inputSchema,
            output_schema={},
            handler=make_handler(mt.name),
        )

        # Register in wintermute/ai/tools_runtime.py
        global_registry.register(wm_tool)

MCPServerDefinition

Bases: BaseModel

Persisted definition of an external MCP server.

Stored in ~/.wintermute/mcp_servers.json as a JSON array. Each entry captures the spawn recipe for a stdio MCP server the operator wants the console to connect to (Ghidra, Binary Ninja, custom helpers, etc.).

Source code in wintermute/integrations/mcp_runtime.py
173
174
175
176
177
178
179
180
181
182
183
184
class MCPServerDefinition(BaseModel):
    """Persisted definition of an external MCP server.

    Stored in ``~/.wintermute/mcp_servers.json`` as a JSON array. Each entry
    captures the spawn recipe for a stdio MCP server the operator wants the
    console to connect to (Ghidra, Binary Ninja, custom helpers, etc.).
    """

    name: str
    command: str
    args: List[str] = Field(default_factory=list)
    env: Dict[str, str] = Field(default_factory=dict)