fix: async tools with asyncio.wait_for to prevent MCP blocking

All tools are now async with hard timeout (10s) via
asyncio.wait_for + ThreadPoolExecutor. If any sync operation
(WebSocket, UPnP, REST) hangs, the MCP returns an error
instead of freezing the entire Claude Code session.

Root cause: sync functions blocked FastMCP's event loop.
SIGALRM and ThreadPoolExecutor.result(timeout) don't work
inside FastMCP threads. asyncio.wait_for is the correct
pattern for this context.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Andres Eduardo Garcia Marquez 2026-03-02 02:27:03 -05:00
parent ba81fabefc
commit 02070b7f02
1 changed files with 80 additions and 37 deletions

117
main.py
View File

@ -2,7 +2,10 @@
from __future__ import annotations
import asyncio
import logging
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from typing import Any, Optional
from mcp.server.fastmcp import FastMCP
@ -12,9 +15,12 @@ logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
)
log = logging.getLogger("samsung-tv")
mcp = FastMCP("samsung-tv")
_tv = SamsungTV()
_pool = ThreadPoolExecutor(max_workers=2)
TOOL_TIMEOUT = 10
def _ok(message: str = "Done", **data: Any) -> dict[str, Any]:
@ -25,11 +31,19 @@ def _err(message: str) -> dict[str, Any]:
return {"success": False, "message": message}
def _safe(fn, *args, **kwargs) -> dict[str, Any]:
async def _safe(fn, *args, timeout: float = TOOL_TIMEOUT, **kwargs) -> dict[str, Any]:
"""Run sync function in thread with hard timeout. Never blocks the MCP."""
loop = asyncio.get_event_loop()
try:
return fn(*args, **kwargs)
return await asyncio.wait_for(
loop.run_in_executor(_pool, partial(fn, *args, **kwargs)),
timeout=timeout,
)
except asyncio.TimeoutError:
log.error("Tool timed out after %ss", timeout)
return _err(f"TV did not respond within {timeout}s. Is it on and reachable?")
except Exception as e:
logging.getLogger("samsung-tv").error("%s: %s", fn.__name__, e)
log.error("Tool error: %s", e)
return _err(str(e))
@ -37,28 +51,28 @@ def _safe(fn, *args, **kwargs) -> dict[str, Any]:
@mcp.tool()
def tv_discover() -> dict[str, Any]:
async def tv_discover() -> dict[str, Any]:
"""Scan the local network for Samsung Smart TVs via SSDP.
Returns a list of found TVs with their IP, name, and model.
"""
return _safe(lambda: _ok("Scan complete", tvs=discover()))
return await _safe(lambda: _ok("Scan complete", tvs=discover()))
@mcp.tool()
def tv_info() -> dict[str, Any]:
async def tv_info() -> dict[str, Any]:
"""Get TV status: model, IP, power state, current volume, resolution.
Use this first to verify the TV is reachable and powered on.
"""
return _safe(lambda: _ok("TV info retrieved", **_tv.info()))
return await _safe(lambda: _ok("TV info retrieved", **_tv.info()))
# ── Power ────────────────────────────────────────────────────────
@mcp.tool()
def tv_power(action: str = "off") -> dict[str, Any]:
async def tv_power(action: str = "off") -> dict[str, Any]:
"""Turn the TV on or off.
Args:
@ -67,17 +81,17 @@ def tv_power(action: str = "off") -> dict[str, Any]:
def _do():
if action.lower() == "on":
_tv.power_on()
return _ok("Wake-on-LAN packet sent. TV should turn on in a few seconds.")
return _ok("Wake-on-LAN packet sent. TV should turn on shortly.")
_tv.power_off()
return _ok("TV power off command sent")
return _safe(_do)
return await _safe(_do)
# ── Remote Keys ──────────────────────────────────────────────────
@mcp.tool()
def tv_key(key: str, times: int = 1) -> dict[str, Any]:
async def tv_key(key: str, times: int = 1) -> dict[str, Any]:
"""Send a remote control key press to the TV.
Args:
@ -90,11 +104,14 @@ def tv_key(key: str, times: int = 1) -> dict[str, Any]:
PLAY, PAUSE, STOP, FF, REWIND, RED, GREEN, YELLOW, BLUE, 0-9,
PMODE, DYNAMIC, STANDARD, MOVIE1, GAME, SLEEP, CAPTION, APP_LIST.
"""
return _safe(lambda: (_tv.send_key(key, times), _ok(f"Sent {key} x{times}"))[1])
def _do():
_tv.send_key(key, times)
return _ok(f"Sent {key} x{times}")
return await _safe(_do)
@mcp.tool()
def tv_keys(keys: str, delay: float = 0.3) -> dict[str, Any]:
async def tv_keys(keys: str, delay: float = 0.3) -> dict[str, Any]:
"""Send a sequence of key presses with configurable delay between them.
Args:
@ -104,25 +121,33 @@ def tv_keys(keys: str, delay: float = 0.3) -> dict[str, Any]:
Use this for menu navigation or complex sequences.
"""
key_list = [k.strip() for k in keys.split(",") if k.strip()]
return _safe(lambda: (_tv.send_keys(key_list, delay), _ok(f"Sent {len(key_list)} keys: {keys}"))[1])
# Allow more time for sequences
t = max(TOOL_TIMEOUT, len(key_list) * delay + 5)
def _do():
_tv.send_keys(key_list, delay)
return _ok(f"Sent {len(key_list)} keys: {keys}")
return await _safe(_do, timeout=t)
@mcp.tool()
def tv_navigate(action: str) -> dict[str, Any]:
async def tv_navigate(action: str) -> dict[str, Any]:
"""Quick navigation with semantic names instead of raw key codes.
Args:
action: One of: home, back, exit, menu, source, guide, info, tools,
up, down, left, right, enter/ok, play, pause, stop, ff, rewind.
"""
return _safe(lambda: (_tv.navigate(action), _ok(f"Navigated: {action}"))[1])
def _do():
_tv.navigate(action)
return _ok(f"Navigated: {action}")
return await _safe(_do)
# ── Volume & Channel ─────────────────────────────────────────────
@mcp.tool()
def tv_volume(
async def tv_volume(
level: Optional[int] = None,
action: Optional[str] = None,
) -> dict[str, Any]:
@ -132,8 +157,8 @@ def tv_volume(
level: Set volume to this exact value (0-100). Omit to just read.
action: "up", "down", "mute", "unmute". Omit to just read or use level.
Examples: tv_volume() get current, tv_volume(level=25) set to 25,
tv_volume(action="mute") toggle mute.
Examples: tv_volume() -> get current, tv_volume(level=25) -> set to 25,
tv_volume(action="mute") -> toggle mute.
"""
def _do():
if level is not None:
@ -157,11 +182,11 @@ def tv_volume(
vol = _tv.get_volume()
muted = _tv.get_mute()
return _ok(f"Volume: {vol}, Muted: {muted}", volume=vol, muted=muted)
return _safe(_do)
return await _safe(_do)
@mcp.tool()
def tv_channel(
async def tv_channel(
number: Optional[int] = None,
direction: Optional[str] = None,
) -> dict[str, Any]:
@ -173,23 +198,26 @@ def tv_channel(
Provide either number or direction, not both.
"""
return _safe(lambda: (_tv.channel(number, direction), _ok(f"Channel changed"))[1])
def _do():
_tv.channel(number, direction)
return _ok("Channel changed")
return await _safe(_do)
# ── Apps ─────────────────────────────────────────────────────────
@mcp.tool()
def tv_apps() -> dict[str, Any]:
async def tv_apps() -> dict[str, Any]:
"""List all installed apps on the TV with their IDs.
Returns app names and IDs that can be used with tv_launch.
"""
return _safe(lambda: _ok("Apps retrieved", apps=_tv.list_apps()))
return _ok("Apps retrieved", apps=_tv.list_apps())
@mcp.tool()
def tv_launch(
async def tv_launch(
app: str,
deep_link: Optional[str] = None,
) -> dict[str, Any]:
@ -202,48 +230,60 @@ def tv_launch(
The app name is case-insensitive and supports fuzzy matching.
"""
return _safe(lambda: (_tv.launch_app(app, deep_link), _ok(f"Launched {app}"))[1])
def _do():
_tv.launch_app(app, deep_link)
return _ok(f"Launched {app}")
return await _safe(_do)
@mcp.tool()
def tv_close_app(app: str) -> dict[str, Any]:
async def tv_close_app(app: str) -> dict[str, Any]:
"""Close a running app on the TV.
Args:
app: App name or ID (same as tv_launch).
"""
return _safe(lambda: (_tv.close_app(app), _ok(f"Closed {app}"))[1])
def _do():
_tv.close_app(app)
return _ok(f"Closed {app}")
return await _safe(_do)
# ── Browser & Text ───────────────────────────────────────────────
@mcp.tool()
def tv_browser(url: str) -> dict[str, Any]:
async def tv_browser(url: str) -> dict[str, Any]:
"""Open a URL in the TV's built-in web browser.
Args:
url: The full URL to open (e.g. "https://google.com").
"""
return _safe(lambda: (_tv.open_browser(url), _ok(f"Opened {url}"))[1])
def _do():
_tv.open_browser(url)
return _ok(f"Opened {url}")
return await _safe(_do)
@mcp.tool()
def tv_text(text: str) -> dict[str, Any]:
async def tv_text(text: str) -> dict[str, Any]:
"""Type text into the currently active input field on the TV.
Args:
text: The text to type. Only works when a text input is active
(virtual keyboard is visible on the TV screen).
"""
return _safe(lambda: (_tv.send_text(text), _ok(f"Typed text ({len(text)} chars)"))[1])
def _do():
_tv.send_text(text)
return _ok(f"Typed text ({len(text)} chars)")
return await _safe(_do)
# ── Cursor ───────────────────────────────────────────────────────
@mcp.tool()
def tv_cursor(x: int, y: int, duration: int = 500) -> dict[str, Any]:
async def tv_cursor(x: int, y: int, duration: int = 500) -> dict[str, Any]:
"""Move the virtual cursor/pointer on the TV screen.
Args:
@ -251,14 +291,17 @@ def tv_cursor(x: int, y: int, duration: int = 500) -> dict[str, Any]:
y: Vertical position (pixels from top).
duration: Movement duration in milliseconds (default 500).
"""
return _safe(lambda: (_tv.move_cursor(x, y, duration), _ok(f"Cursor moved to ({x}, {y})"))[1])
def _do():
_tv.move_cursor(x, y, duration)
return _ok(f"Cursor moved to ({x}, {y})")
return await _safe(_do)
# ── DLNA Media ───────────────────────────────────────────────────
@mcp.tool()
def tv_media(
async def tv_media(
action: str = "status",
url: Optional[str] = None,
title: Optional[str] = None,
@ -277,7 +320,7 @@ def tv_media(
tv_media(action="play_url", url="http://server/video.mp4")
tv_media(action="pause")
tv_media(action="seek", seek_to="00:10:00")
tv_media(action="status") returns current position and state.
tv_media(action="status") -> returns current position and state.
"""
def _do():
if action == "play_url":
@ -291,7 +334,7 @@ def tv_media(
if action == "status":
return _ok("Playback status", **result)
return _ok(f"Media {action} executed", **result)
return _safe(_do)
return await _safe(_do)
if __name__ == "__main__":