Skip to content

backend

SurgeonBackend

Source code in wintermute/integrations/surgeon/backend.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
class SurgeonBackend:
    # [FIX] Add return type annotation
    def __init__(self, surgeon_root: str) -> None:
        current_dir = Path(__file__).parent
        server_script = current_dir / "server.py"

        self.controller = SurgeonController(str(server_script), surgeon_root)
        self.session: Optional[ClientSession] = None
        self._exit_stack: Optional[AsyncExitStack] = None

    async def start(self) -> None:
        """Starts the subprocess and initializes the MCP session."""
        if not self.controller.start():
            raise RuntimeError("Failed to start SURGEON MCP server.")

        server_params = StdioServerParameters(
            command="python3",
            args=[str(self.controller.mcp_script)],
            env={"SURGEON_ROOT": str(self.controller.surgeon_root)},
        )

        self._exit_stack = AsyncExitStack()

        try:
            read, write = await self._exit_stack.enter_async_context(
                stdio_client(server_params)
            )
            self.session = await self._exit_stack.enter_async_context(
                ClientSession(read, write)
            )
            await self.session.initialize()
            log.info("Connected to SURGEON MCP Session.")

        except Exception as e:
            log.error(f"Failed to connect MCP session: {e}")
            await self.stop()

    # [FIX] Add return type annotation
    async def stop(self) -> None:
        if self._exit_stack:
            await self._exit_stack.aclose()
        self.controller.stop()

    async def get_ai_tools(self) -> List[Dict[str, Any]]:
        """
        Fetches tools from MCP and converts them to OpenAI/Wintermute format.
        """
        if not self.session:
            return []

        try:
            mcp_tools = await self.session.list_tools()
            openai_tools = []

            for tool in mcp_tools.tools:
                openai_tools.append(
                    {
                        "type": "function",
                        "function": {
                            "name": tool.name,
                            "description": tool.description,
                            "parameters": tool.inputSchema,
                        },
                    }
                )

            return openai_tools
        except Exception as e:
            log.error(f"Failed to list tools from SURGEON MCP: {e}")
            return []

    async def execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str:
        """
        Executes a tool on the remote MCP server.
        """
        if not self.session:
            return "Error: SURGEON Backend not connected."

        try:
            result = await self.session.call_tool(tool_name, arguments)
            return "\n".join([c.text for c in result.content if c.type == "text"])
        except Exception as e:
            log.error(f"SURGEON Execution Error: {e}")
            return f"Error executing {tool_name}: {str(e)}"

execute_tool(tool_name, arguments) async

Executes a tool on the remote MCP server.

Source code in wintermute/integrations/surgeon/backend.py
193
194
195
196
197
198
199
200
201
202
203
204
205
async def execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str:
    """
    Executes a tool on the remote MCP server.
    """
    if not self.session:
        return "Error: SURGEON Backend not connected."

    try:
        result = await self.session.call_tool(tool_name, arguments)
        return "\n".join([c.text for c in result.content if c.type == "text"])
    except Exception as e:
        log.error(f"SURGEON Execution Error: {e}")
        return f"Error executing {tool_name}: {str(e)}"

get_ai_tools() async

Fetches tools from MCP and converts them to OpenAI/Wintermute format.

Source code in wintermute/integrations/surgeon/backend.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
async def get_ai_tools(self) -> List[Dict[str, Any]]:
    """
    Fetches tools from MCP and converts them to OpenAI/Wintermute format.
    """
    if not self.session:
        return []

    try:
        mcp_tools = await self.session.list_tools()
        openai_tools = []

        for tool in mcp_tools.tools:
            openai_tools.append(
                {
                    "type": "function",
                    "function": {
                        "name": tool.name,
                        "description": tool.description,
                        "parameters": tool.inputSchema,
                    },
                }
            )

        return openai_tools
    except Exception as e:
        log.error(f"Failed to list tools from SURGEON MCP: {e}")
        return []

start() async

Starts the subprocess and initializes the MCP session.

Source code in wintermute/integrations/surgeon/backend.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
async def start(self) -> None:
    """Starts the subprocess and initializes the MCP session."""
    if not self.controller.start():
        raise RuntimeError("Failed to start SURGEON MCP server.")

    server_params = StdioServerParameters(
        command="python3",
        args=[str(self.controller.mcp_script)],
        env={"SURGEON_ROOT": str(self.controller.surgeon_root)},
    )

    self._exit_stack = AsyncExitStack()

    try:
        read, write = await self._exit_stack.enter_async_context(
            stdio_client(server_params)
        )
        self.session = await self._exit_stack.enter_async_context(
            ClientSession(read, write)
        )
        await self.session.initialize()
        log.info("Connected to SURGEON MCP Session.")

    except Exception as e:
        log.error(f"Failed to connect MCP session: {e}")
        await self.stop()

SurgeonController

Source code in wintermute/integrations/surgeon/backend.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
class SurgeonController:
    def __init__(self, mcp_script_path: str, surgeon_root: str) -> None:
        self.mcp_script = Path(mcp_script_path)
        self.surgeon_root = Path(surgeon_root)
        # [FIX] Add type parameter [str] to Popen for text mode
        self._process: Optional[subprocess.Popen[str]] = None

        # Ensure cleanup on script exit
        atexit.register(self.stop)

    def start(self) -> bool:
        """
        Spawns the MCP server in a non-blocking subprocess.
        """
        if self._process and self._process.poll() is None:
            log.warning("SURGEON MCP server is already running.")
            return True

        if not self.mcp_script.exists():
            log.error(f"MCP Script not found at: {self.mcp_script}")
            return False

        cmd = [sys.executable, str(self.mcp_script)]

        env = {
            "SURGEON_ROOT": str(self.surgeon_root),
            **dict(sys.modules["os"].environ),  # Inherit current env vars
        }

        try:
            log.info("Starting SURGEON MCP Server...")

            # Popen is non-blocking.
            self._process = subprocess.Popen(
                cmd,
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True,
                env=env,
                bufsize=0,
            )

            time.sleep(0.5)
            if self._process.poll() is not None:
                # If it crashed immediately, read stderr
                _, err = self._process.communicate()
                log.error(f"SURGEON MCP failed to start: {err}")
                return False

            log.info(f"SURGEON MCP Server started (PID: {self._process.pid})")
            return True

        except Exception as e:
            log.error(f"Failed to spawn SURGEON MCP: {e}")
            return False

    # [FIX] Add return type annotation
    def get_stdio(self) -> Tuple[Optional[IO[str]], Optional[IO[str]]]:
        """
        Returns the stdin/stdout pipes to be used by the MCP Client.
        """
        if not self._process:
            raise RuntimeError("Process not running. Call start() first.")
        return self._process.stdin, self._process.stdout

    # [FIX] Add return type annotation
    def stop(self) -> None:
        """
        Terminates the MCP server gracefully.
        """
        if self._process and self._process.poll() is None:
            log.info("Stopping SURGEON MCP Server...")
            self._process.terminate()
            try:
                self._process.wait(timeout=2)
            except subprocess.TimeoutExpired:
                self._process.kill()
            log.info("SURGEON MCP Server stopped.")

get_stdio()

Returns the stdin/stdout pipes to be used by the MCP Client.

Source code in wintermute/integrations/surgeon/backend.py
 99
100
101
102
103
104
105
def get_stdio(self) -> Tuple[Optional[IO[str]], Optional[IO[str]]]:
    """
    Returns the stdin/stdout pipes to be used by the MCP Client.
    """
    if not self._process:
        raise RuntimeError("Process not running. Call start() first.")
    return self._process.stdin, self._process.stdout

start()

Spawns the MCP server in a non-blocking subprocess.

Source code in wintermute/integrations/surgeon/backend.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def start(self) -> bool:
    """
    Spawns the MCP server in a non-blocking subprocess.
    """
    if self._process and self._process.poll() is None:
        log.warning("SURGEON MCP server is already running.")
        return True

    if not self.mcp_script.exists():
        log.error(f"MCP Script not found at: {self.mcp_script}")
        return False

    cmd = [sys.executable, str(self.mcp_script)]

    env = {
        "SURGEON_ROOT": str(self.surgeon_root),
        **dict(sys.modules["os"].environ),  # Inherit current env vars
    }

    try:
        log.info("Starting SURGEON MCP Server...")

        # Popen is non-blocking.
        self._process = subprocess.Popen(
            cmd,
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
            env=env,
            bufsize=0,
        )

        time.sleep(0.5)
        if self._process.poll() is not None:
            # If it crashed immediately, read stderr
            _, err = self._process.communicate()
            log.error(f"SURGEON MCP failed to start: {err}")
            return False

        log.info(f"SURGEON MCP Server started (PID: {self._process.pid})")
        return True

    except Exception as e:
        log.error(f"Failed to spawn SURGEON MCP: {e}")
        return False

stop()

Terminates the MCP server gracefully.

Source code in wintermute/integrations/surgeon/backend.py
108
109
110
111
112
113
114
115
116
117
118
119
def stop(self) -> None:
    """
    Terminates the MCP server gracefully.
    """
    if self._process and self._process.poll() is None:
        log.info("Stopping SURGEON MCP Server...")
        self._process.terminate()
        try:
            self._process.wait(timeout=2)
        except subprocess.TimeoutExpired:
            self._process.kill()
        log.info("SURGEON MCP Server stopped.")