fix: eliminate all blocking operations that freeze MCP

- Remove ThreadPoolExecutor/SIGALRM approaches (don't work in FastMCP threads)
- list_apps: return known aliases directly (WS app_list blocks on TU7000)
- _send_ws: simplified with auto-reconnect, no thread wrapping
- All timeouts via SamsungTVWS timeout param + urlopen timeout
- No operation can block the MCP process anymore

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Andres Eduardo Garcia Marquez 2026-03-02 02:13:27 -05:00
parent 4dd9132a9b
commit ba81fabefc
1 changed files with 55 additions and 158 deletions

213
tv.py
View File

@ -7,19 +7,17 @@ import logging
import socket
import time
import xml.etree.ElementTree as ET
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeout
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
from urllib.request import Request, urlopen
from urllib.error import URLError
WS_TIMEOUT = 8 # seconds — max wait for any WebSocket operation
from samsungtvws import SamsungTVWS
log = logging.getLogger("samsung-tv")
WS_TIMEOUT = 5
TOKEN_FILE = str(Path(__file__).parent / "token.json")
_UPNP_NS = "urn:schemas-upnp-org:service"
_SOAP_ENV = (
@ -29,7 +27,6 @@ _SOAP_ENV = (
"<s:Body>{body}</s:Body></s:Envelope>"
)
# App aliases → official IDs (region-independent first, then fallbacks)
APP_ALIASES: dict[str, list[str]] = {
"netflix": ["11101200001", "3201907018807"],
"youtube": ["111299001912"],
@ -52,32 +49,20 @@ APP_ALIASES: dict[str, list[str]] = {
}
NAVIGATE_KEYS = {
"home": "KEY_HOME",
"back": "KEY_RETURN",
"exit": "KEY_EXIT",
"menu": "KEY_MENU",
"source": "KEY_SOURCE",
"guide": "KEY_GUIDE",
"info": "KEY_INFO",
"tools": "KEY_TOOLS",
"up": "KEY_UP",
"down": "KEY_DOWN",
"left": "KEY_LEFT",
"right": "KEY_RIGHT",
"enter": "KEY_ENTER",
"ok": "KEY_ENTER",
"play": "KEY_PLAY",
"pause": "KEY_PAUSE",
"stop": "KEY_STOP",
"ff": "KEY_FF",
"rewind": "KEY_REWIND",
"home": "KEY_HOME", "back": "KEY_RETURN", "exit": "KEY_EXIT",
"menu": "KEY_MENU", "source": "KEY_SOURCE", "guide": "KEY_GUIDE",
"info": "KEY_INFO", "tools": "KEY_TOOLS",
"up": "KEY_UP", "down": "KEY_DOWN", "left": "KEY_LEFT", "right": "KEY_RIGHT",
"enter": "KEY_ENTER", "ok": "KEY_ENTER",
"play": "KEY_PLAY", "pause": "KEY_PAUSE", "stop": "KEY_STOP",
"ff": "KEY_FF", "rewind": "KEY_REWIND",
}
# ── SSDP Discovery ──────────────────────────────────────────────
def discover(timeout: float = 5.0) -> list[dict[str, Any]]:
def discover(timeout: float = 4.0) -> list[dict[str, Any]]:
"""Discover Samsung TVs on the local network via SSDP."""
msg = (
"M-SEARCH * HTTP/1.1\r\n"
@ -120,42 +105,35 @@ def discover(timeout: float = 5.0) -> list[dict[str, Any]]:
mfr = dev.findtext("d:manufacturer", "", ns)
if "samsung" not in (mfr or "").lower():
continue
ip = urlparse(loc).hostname
tvs.append({
"ip": ip,
"name": fn,
"model": mn,
"manufacturer": mfr,
"location": loc,
"ip": urlparse(loc).hostname,
"name": fn, "model": mn, "manufacturer": mfr,
})
except Exception:
continue
return tvs
# ── UPnP SOAP helpers ───────────────────────────────────────────
# ── UPnP SOAP ───────────────────────────────────────────────────
def _soap_call(
ip: str, control_url: str, service: str, action: str, args: str = ""
) -> str:
"""Execute a UPnP SOAP action and return raw XML response."""
body = f'<u:{action} xmlns:u="{_UPNP_NS}:{service}:1">{args}</u:{action}>'
envelope = _SOAP_ENV.format(body=body)
url = f"http://{ip}:9197{control_url}"
req = Request(
url,
f"http://{ip}:9197{control_url}",
data=envelope.encode(),
headers={
"Content-Type": 'text/xml; charset="utf-8"',
"SOAPAction": f'"{_UPNP_NS}:{service}:1#{action}"',
},
)
return urlopen(req, timeout=5).read().decode()
return urlopen(req, timeout=WS_TIMEOUT).read().decode()
def _soap_value(xml_text: str, tag: str) -> str | None:
"""Extract a single value from SOAP response XML."""
root = ET.fromstring(xml_text)
for elem in root.iter():
if elem.tag.endswith(tag):
@ -167,7 +145,6 @@ def _soap_value(xml_text: str, tag: str) -> str | None:
def wake_on_lan(mac: str) -> None:
"""Send WoL magic packet to power on the TV."""
mac_bytes = bytes.fromhex(mac.replace(":", "").replace("-", ""))
packet = b"\xff" * 6 + mac_bytes * 16
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
@ -179,12 +156,11 @@ def wake_on_lan(mac: str) -> None:
class SamsungTV:
"""Unified Samsung TV controller combining WebSocket, REST, and UPnP."""
"""Unified Samsung TV controller — WebSocket, REST, UPnP."""
def __init__(self, ip: str | None = None):
self._ip = ip
self._ws: SamsungTVWS | None = None
self._info: dict[str, Any] | None = None
self._mac: str | None = None
# ── Connection ───────────────────────────────────────────
@ -219,48 +195,32 @@ class SamsungTV:
self._ws.open()
return self._ws
def _reconnect(self) -> SamsungTVWS:
self._ws = None
return self._ensure_ws()
def _send_ws(self, method: str, timeout: float = WS_TIMEOUT, **kwargs: Any) -> Any:
"""Send WebSocket command with auto-reconnect and timeout."""
ws = self._ensure_ws()
def _send_ws(self, method: str, **kwargs: Any) -> Any:
"""Send WebSocket command with auto-reconnect."""
for attempt in range(2):
ws = self._ensure_ws()
try:
with ThreadPoolExecutor(max_workers=1) as pool:
future = pool.submit(getattr(ws, method), **kwargs)
return future.result(timeout=timeout)
except FuturesTimeout:
log.error("WS %s timed out after %ss", method, timeout)
self._ws = None
raise TimeoutError(
f"TV did not respond to '{method}' within {timeout}s. "
"TV may be unresponsive or in a state that doesn't support this command."
)
return getattr(ws, method)(**kwargs)
except Exception as e:
self._ws = None
if attempt == 0:
log.warning("WS error (%s), reconnecting...", e)
ws = self._reconnect()
else:
raise
raise ConnectionError(f"TV WebSocket failed after retry: {e}") from e
# ── Device Info ──────────────────────────────────────────
def info(self) -> dict[str, Any]:
ip = self._ensure_ip()
try:
raw = urlopen(f"http://{ip}:8001/api/v2/", timeout=5).read()
raw = urlopen(f"http://{ip}:8001/api/v2/", timeout=WS_TIMEOUT).read()
data = json.loads(raw)
device = data.get("device", {})
self._mac = device.get("wifiMac")
# Enrich with UPnP volume
try:
vol = self.get_volume()
device["currentVolume"] = vol
device["currentVolume"] = self.get_volume()
except Exception:
pass
self._info = device
return {
"name": device.get("name", "Unknown"),
"model": device.get("modelName", "Unknown"),
@ -283,14 +243,13 @@ class SamsungTV:
def power_on(self, mac: str | None = None) -> None:
target_mac = mac or self._mac
if not target_mac:
# Try to get MAC from previous info
try:
self.info()
target_mac = self._mac
except Exception:
pass
if not target_mac:
raise ValueError("MAC address required for Wake-on-LAN. Get it with tv_info first.")
raise ValueError("MAC address required. Use tv_info first.")
wake_on_lan(target_mac)
# ── Keys ─────────────────────────────────────────────────
@ -313,9 +272,7 @@ class SamsungTV:
def navigate(self, action: str) -> None:
key = NAVIGATE_KEYS.get(action.lower())
if not key:
raise ValueError(
f"Unknown action '{action}'. Valid: {', '.join(NAVIGATE_KEYS)}"
)
raise ValueError(f"Unknown: '{action}'. Valid: {', '.join(NAVIGATE_KEYS)}")
self.send_key(key)
# ── Volume ───────────────────────────────────────────────
@ -331,12 +288,11 @@ class SamsungTV:
def set_volume(self, level: int) -> None:
ip = self._ensure_ip()
level = max(0, min(100, level))
_soap_call(
ip, "/upnp/control/RenderingControl1", "RenderingControl",
"SetVolume",
f"<InstanceID>0</InstanceID><Channel>Master</Channel>"
f"<DesiredVolume>{level}</DesiredVolume>",
f"<DesiredVolume>{max(0, min(100, level))}</DesiredVolume>",
)
def get_mute(self) -> bool:
@ -345,8 +301,7 @@ class SamsungTV:
ip, "/upnp/control/RenderingControl1", "RenderingControl",
"GetMute", "<InstanceID>0</InstanceID><Channel>Master</Channel>",
)
val = _soap_value(xml, "CurrentMute")
return val == "1" if val else False
return _soap_value(xml, "CurrentMute") == "1"
def set_mute(self, mute: bool) -> None:
ip = self._ensure_ip()
@ -367,8 +322,7 @@ class SamsungTV:
time.sleep(0.3)
self.send_key("KEY_ENTER")
elif direction:
key = "KEY_CHUP" if direction.lower() == "up" else "KEY_CHDOWN"
self.send_key(key)
self.send_key("KEY_CHUP" if direction.lower() == "up" else "KEY_CHDOWN")
else:
raise ValueError("Provide either number or direction ('up'/'down')")
@ -378,68 +332,29 @@ class SamsungTV:
alias = name_or_id.lower().strip()
if alias in APP_ALIASES:
return APP_ALIASES[alias][0]
# Check if it looks like an app ID already
if name_or_id.replace(".", "").replace("_", "").isalnum() and (
len(name_or_id) > 8 or "." in name_or_id
):
return name_or_id
# Fuzzy match against aliases
for key, ids in APP_ALIASES.items():
if alias in key or key in alias:
return ids[0]
return name_or_id
def list_apps(self) -> list[dict[str, Any]]:
import signal
def _timeout_handler(signum, frame):
raise TimeoutError("app_list timeout")
from samsungtvws.remote import ChannelEmitCommand
from samsungtvws.helper import process_api_response
from samsungtvws.event import ED_INSTALLED_APP_EVENT, parse_installed_app
ws = self._ensure_ws()
assert ws.connection
old_handler = signal.signal(signal.SIGALRM, _timeout_handler)
signal.alarm(WS_TIMEOUT)
try:
ws._ws_send(ChannelEmitCommand.get_installed_app())
response = process_api_response(ws.connection.recv())
signal.alarm(0)
if response.get("event") == ED_INSTALLED_APP_EVENT:
raw = parse_installed_app(response)
return [{"id": a.get("appId"), "name": a.get("name", "Unknown")} for a in (raw or [])]
return self._known_apps_fallback()
except (TimeoutError, Exception) as e:
signal.alarm(0)
self._ws = None
log.warning("list_apps from TV failed (%s), using known aliases", e)
return self._known_apps_fallback()
finally:
signal.signal(signal.SIGALRM, old_handler)
@staticmethod
def _known_apps_fallback() -> list[dict[str, Any]]:
"""Return known launchable apps. Direct query not supported on TU7000."""
return [{"id": ids[0], "name": name.title()} for name, ids in APP_ALIASES.items()]
def launch_app(
self, name_or_id: str, meta_tag: str | None = None
) -> None:
def launch_app(self, name_or_id: str, meta_tag: str | None = None) -> None:
app_id = self._resolve_app_id(name_or_id)
ip = self._ensure_ip()
# Try REST first (more reliable for launching)
try:
req = Request(f"http://{ip}:8001/api/v2/applications/{app_id}", method="POST")
urlopen(req, timeout=5)
urlopen(req, timeout=WS_TIMEOUT)
return
except Exception:
pass
# Fallback to WebSocket
self._send_ws(
"run_app", app_id=app_id, app_type=2, meta_tag=meta_tag or ""
)
self._send_ws("run_app", app_id=app_id, app_type=2, meta_tag=meta_tag or "")
def close_app(self, name_or_id: str) -> None:
app_id = self._resolve_app_id(name_or_id)
@ -447,20 +362,18 @@ class SamsungTV:
req = Request(
f"http://{ip}:8001/api/v2/applications/{app_id}", method="DELETE"
)
urlopen(req, timeout=5)
urlopen(req, timeout=WS_TIMEOUT)
# ── Browser ──────────────────────────────────────────────
def open_browser(self, url: str) -> None:
self._send_ws("open_browser", url=url)
# ── Text Input ───────────────────────────────────────────
# ── Text & Cursor ────────────────────────────────────────
def send_text(self, text: str) -> None:
self._send_ws("send_text", text=text)
# ── Cursor ───────────────────────────────────────────────
def move_cursor(self, x: int, y: int, duration: int = 500) -> None:
self._send_ws("move_cursor", x=x, y=y, duration=duration)
@ -488,39 +401,25 @@ class SamsungTV:
def media_control(self, action: str, target: str | None = None) -> dict[str, Any]:
ip = self._ensure_ip()
action_lower = action.lower()
if action_lower == "play":
_soap_call(
ip, "/upnp/control/AVTransport1", "AVTransport",
"Play", "<InstanceID>0</InstanceID><Speed>1</Speed>",
)
elif action_lower == "pause":
_soap_call(
ip, "/upnp/control/AVTransport1", "AVTransport",
"Pause", "<InstanceID>0</InstanceID>",
)
elif action_lower == "stop":
_soap_call(
ip, "/upnp/control/AVTransport1", "AVTransport",
"Stop", "<InstanceID>0</InstanceID>",
)
elif action_lower == "seek" and target:
_soap_call(
ip, "/upnp/control/AVTransport1", "AVTransport",
"Seek",
f"<InstanceID>0</InstanceID>"
f"<Unit>REL_TIME</Unit><Target>{target}</Target>",
)
elif action_lower == "status":
xml_t = _soap_call(
ip, "/upnp/control/AVTransport1", "AVTransport",
"GetTransportInfo", "<InstanceID>0</InstanceID>",
)
xml_p = _soap_call(
ip, "/upnp/control/AVTransport1", "AVTransport",
"GetPositionInfo", "<InstanceID>0</InstanceID>",
)
a = action.lower()
if a == "play":
_soap_call(ip, "/upnp/control/AVTransport1", "AVTransport",
"Play", "<InstanceID>0</InstanceID><Speed>1</Speed>")
elif a == "pause":
_soap_call(ip, "/upnp/control/AVTransport1", "AVTransport",
"Pause", "<InstanceID>0</InstanceID>")
elif a == "stop":
_soap_call(ip, "/upnp/control/AVTransport1", "AVTransport",
"Stop", "<InstanceID>0</InstanceID>")
elif a == "seek" and target:
_soap_call(ip, "/upnp/control/AVTransport1", "AVTransport",
"Seek", f"<InstanceID>0</InstanceID>"
f"<Unit>REL_TIME</Unit><Target>{target}</Target>")
elif a == "status":
xml_t = _soap_call(ip, "/upnp/control/AVTransport1", "AVTransport",
"GetTransportInfo", "<InstanceID>0</InstanceID>")
xml_p = _soap_call(ip, "/upnp/control/AVTransport1", "AVTransport",
"GetPositionInfo", "<InstanceID>0</InstanceID>")
return {
"state": _soap_value(xml_t, "CurrentTransportState"),
"position": _soap_value(xml_p, "RelTime"),
@ -528,10 +427,8 @@ class SamsungTV:
"uri": _soap_value(xml_p, "TrackURI"),
}
else:
raise ValueError(
f"Unknown action '{action}'. Valid: play, pause, stop, seek, status"
)
return {"action": action_lower, "done": True}
raise ValueError(f"Unknown: '{action}'. Valid: play, pause, stop, seek, status")
return {"action": a, "done": True}
def close(self) -> None:
if self._ws: