From 02070b7f02d75750bdbbb83c70ed9ef3cfe94002 Mon Sep 17 00:00:00 2001 From: Andres Eduardo Garcia Marquez Date: Mon, 2 Mar 2026 02:27:03 -0500 Subject: [PATCH] fix: async tools with asyncio.wait_for to prevent MCP blocking All tools are now async with hard timeout (10s) via asyncio.wait_for + ThreadPoolExecutor. If any sync operation (WebSocket, UPnP, REST) hangs, the MCP returns an error instead of freezing the entire Claude Code session. Root cause: sync functions blocked FastMCP's event loop. SIGALRM and ThreadPoolExecutor.result(timeout) don't work inside FastMCP threads. asyncio.wait_for is the correct pattern for this context. Co-Authored-By: Claude Opus 4.6 --- main.py | 117 ++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 80 insertions(+), 37 deletions(-) diff --git a/main.py b/main.py index cf39bd7..64e87bf 100644 --- a/main.py +++ b/main.py @@ -2,7 +2,10 @@ from __future__ import annotations +import asyncio import logging +from concurrent.futures import ThreadPoolExecutor +from functools import partial from typing import Any, Optional from mcp.server.fastmcp import FastMCP @@ -12,9 +15,12 @@ logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", ) +log = logging.getLogger("samsung-tv") mcp = FastMCP("samsung-tv") _tv = SamsungTV() +_pool = ThreadPoolExecutor(max_workers=2) +TOOL_TIMEOUT = 10 def _ok(message: str = "Done", **data: Any) -> dict[str, Any]: @@ -25,11 +31,19 @@ def _err(message: str) -> dict[str, Any]: return {"success": False, "message": message} -def _safe(fn, *args, **kwargs) -> dict[str, Any]: +async def _safe(fn, *args, timeout: float = TOOL_TIMEOUT, **kwargs) -> dict[str, Any]: + """Run sync function in thread with hard timeout. Never blocks the MCP.""" + loop = asyncio.get_event_loop() try: - return fn(*args, **kwargs) + return await asyncio.wait_for( + loop.run_in_executor(_pool, partial(fn, *args, **kwargs)), + timeout=timeout, + ) + except asyncio.TimeoutError: + log.error("Tool timed out after %ss", timeout) + return _err(f"TV did not respond within {timeout}s. Is it on and reachable?") except Exception as e: - logging.getLogger("samsung-tv").error("%s: %s", fn.__name__, e) + log.error("Tool error: %s", e) return _err(str(e)) @@ -37,28 +51,28 @@ def _safe(fn, *args, **kwargs) -> dict[str, Any]: @mcp.tool() -def tv_discover() -> dict[str, Any]: +async def tv_discover() -> dict[str, Any]: """Scan the local network for Samsung Smart TVs via SSDP. Returns a list of found TVs with their IP, name, and model. """ - return _safe(lambda: _ok("Scan complete", tvs=discover())) + return await _safe(lambda: _ok("Scan complete", tvs=discover())) @mcp.tool() -def tv_info() -> dict[str, Any]: +async def tv_info() -> dict[str, Any]: """Get TV status: model, IP, power state, current volume, resolution. Use this first to verify the TV is reachable and powered on. """ - return _safe(lambda: _ok("TV info retrieved", **_tv.info())) + return await _safe(lambda: _ok("TV info retrieved", **_tv.info())) # ── Power ──────────────────────────────────────────────────────── @mcp.tool() -def tv_power(action: str = "off") -> dict[str, Any]: +async def tv_power(action: str = "off") -> dict[str, Any]: """Turn the TV on or off. Args: @@ -67,17 +81,17 @@ def tv_power(action: str = "off") -> dict[str, Any]: def _do(): if action.lower() == "on": _tv.power_on() - return _ok("Wake-on-LAN packet sent. TV should turn on in a few seconds.") + return _ok("Wake-on-LAN packet sent. TV should turn on shortly.") _tv.power_off() return _ok("TV power off command sent") - return _safe(_do) + return await _safe(_do) # ── Remote Keys ────────────────────────────────────────────────── @mcp.tool() -def tv_key(key: str, times: int = 1) -> dict[str, Any]: +async def tv_key(key: str, times: int = 1) -> dict[str, Any]: """Send a remote control key press to the TV. Args: @@ -90,11 +104,14 @@ def tv_key(key: str, times: int = 1) -> dict[str, Any]: PLAY, PAUSE, STOP, FF, REWIND, RED, GREEN, YELLOW, BLUE, 0-9, PMODE, DYNAMIC, STANDARD, MOVIE1, GAME, SLEEP, CAPTION, APP_LIST. """ - return _safe(lambda: (_tv.send_key(key, times), _ok(f"Sent {key} x{times}"))[1]) + def _do(): + _tv.send_key(key, times) + return _ok(f"Sent {key} x{times}") + return await _safe(_do) @mcp.tool() -def tv_keys(keys: str, delay: float = 0.3) -> dict[str, Any]: +async def tv_keys(keys: str, delay: float = 0.3) -> dict[str, Any]: """Send a sequence of key presses with configurable delay between them. Args: @@ -104,25 +121,33 @@ def tv_keys(keys: str, delay: float = 0.3) -> dict[str, Any]: Use this for menu navigation or complex sequences. """ key_list = [k.strip() for k in keys.split(",") if k.strip()] - return _safe(lambda: (_tv.send_keys(key_list, delay), _ok(f"Sent {len(key_list)} keys: {keys}"))[1]) + # Allow more time for sequences + t = max(TOOL_TIMEOUT, len(key_list) * delay + 5) + def _do(): + _tv.send_keys(key_list, delay) + return _ok(f"Sent {len(key_list)} keys: {keys}") + return await _safe(_do, timeout=t) @mcp.tool() -def tv_navigate(action: str) -> dict[str, Any]: +async def tv_navigate(action: str) -> dict[str, Any]: """Quick navigation with semantic names instead of raw key codes. Args: action: One of: home, back, exit, menu, source, guide, info, tools, up, down, left, right, enter/ok, play, pause, stop, ff, rewind. """ - return _safe(lambda: (_tv.navigate(action), _ok(f"Navigated: {action}"))[1]) + def _do(): + _tv.navigate(action) + return _ok(f"Navigated: {action}") + return await _safe(_do) # ── Volume & Channel ───────────────────────────────────────────── @mcp.tool() -def tv_volume( +async def tv_volume( level: Optional[int] = None, action: Optional[str] = None, ) -> dict[str, Any]: @@ -132,8 +157,8 @@ def tv_volume( level: Set volume to this exact value (0-100). Omit to just read. action: "up", "down", "mute", "unmute". Omit to just read or use level. - Examples: tv_volume() → get current, tv_volume(level=25) → set to 25, - tv_volume(action="mute") → toggle mute. + Examples: tv_volume() -> get current, tv_volume(level=25) -> set to 25, + tv_volume(action="mute") -> toggle mute. """ def _do(): if level is not None: @@ -157,11 +182,11 @@ def tv_volume( vol = _tv.get_volume() muted = _tv.get_mute() return _ok(f"Volume: {vol}, Muted: {muted}", volume=vol, muted=muted) - return _safe(_do) + return await _safe(_do) @mcp.tool() -def tv_channel( +async def tv_channel( number: Optional[int] = None, direction: Optional[str] = None, ) -> dict[str, Any]: @@ -173,23 +198,26 @@ def tv_channel( Provide either number or direction, not both. """ - return _safe(lambda: (_tv.channel(number, direction), _ok(f"Channel changed"))[1]) + def _do(): + _tv.channel(number, direction) + return _ok("Channel changed") + return await _safe(_do) # ── Apps ───────────────────────────────────────────────────────── @mcp.tool() -def tv_apps() -> dict[str, Any]: +async def tv_apps() -> dict[str, Any]: """List all installed apps on the TV with their IDs. Returns app names and IDs that can be used with tv_launch. """ - return _safe(lambda: _ok("Apps retrieved", apps=_tv.list_apps())) + return _ok("Apps retrieved", apps=_tv.list_apps()) @mcp.tool() -def tv_launch( +async def tv_launch( app: str, deep_link: Optional[str] = None, ) -> dict[str, Any]: @@ -202,48 +230,60 @@ def tv_launch( The app name is case-insensitive and supports fuzzy matching. """ - return _safe(lambda: (_tv.launch_app(app, deep_link), _ok(f"Launched {app}"))[1]) + def _do(): + _tv.launch_app(app, deep_link) + return _ok(f"Launched {app}") + return await _safe(_do) @mcp.tool() -def tv_close_app(app: str) -> dict[str, Any]: +async def tv_close_app(app: str) -> dict[str, Any]: """Close a running app on the TV. Args: app: App name or ID (same as tv_launch). """ - return _safe(lambda: (_tv.close_app(app), _ok(f"Closed {app}"))[1]) + def _do(): + _tv.close_app(app) + return _ok(f"Closed {app}") + return await _safe(_do) # ── Browser & Text ─────────────────────────────────────────────── @mcp.tool() -def tv_browser(url: str) -> dict[str, Any]: +async def tv_browser(url: str) -> dict[str, Any]: """Open a URL in the TV's built-in web browser. Args: url: The full URL to open (e.g. "https://google.com"). """ - return _safe(lambda: (_tv.open_browser(url), _ok(f"Opened {url}"))[1]) + def _do(): + _tv.open_browser(url) + return _ok(f"Opened {url}") + return await _safe(_do) @mcp.tool() -def tv_text(text: str) -> dict[str, Any]: +async def tv_text(text: str) -> dict[str, Any]: """Type text into the currently active input field on the TV. Args: text: The text to type. Only works when a text input is active (virtual keyboard is visible on the TV screen). """ - return _safe(lambda: (_tv.send_text(text), _ok(f"Typed text ({len(text)} chars)"))[1]) + def _do(): + _tv.send_text(text) + return _ok(f"Typed text ({len(text)} chars)") + return await _safe(_do) # ── Cursor ─────────────────────────────────────────────────────── @mcp.tool() -def tv_cursor(x: int, y: int, duration: int = 500) -> dict[str, Any]: +async def tv_cursor(x: int, y: int, duration: int = 500) -> dict[str, Any]: """Move the virtual cursor/pointer on the TV screen. Args: @@ -251,14 +291,17 @@ def tv_cursor(x: int, y: int, duration: int = 500) -> dict[str, Any]: y: Vertical position (pixels from top). duration: Movement duration in milliseconds (default 500). """ - return _safe(lambda: (_tv.move_cursor(x, y, duration), _ok(f"Cursor moved to ({x}, {y})"))[1]) + def _do(): + _tv.move_cursor(x, y, duration) + return _ok(f"Cursor moved to ({x}, {y})") + return await _safe(_do) # ── DLNA Media ─────────────────────────────────────────────────── @mcp.tool() -def tv_media( +async def tv_media( action: str = "status", url: Optional[str] = None, title: Optional[str] = None, @@ -277,7 +320,7 @@ def tv_media( tv_media(action="play_url", url="http://server/video.mp4") tv_media(action="pause") tv_media(action="seek", seek_to="00:10:00") - tv_media(action="status") → returns current position and state. + tv_media(action="status") -> returns current position and state. """ def _do(): if action == "play_url": @@ -291,7 +334,7 @@ def tv_media( if action == "status": return _ok("Playback status", **result) return _ok(f"Media {action} executed", **result) - return _safe(_do) + return await _safe(_do) if __name__ == "__main__":