commit 90c882997d68cfd6c1ccd45a9eb08a839288f7a9 Author: yqsphp Date: Mon Jan 12 10:48:20 2026 +0800 媒体投屏初次提交 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a09c56d --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/.idea diff --git a/README.md b/README.md new file mode 100644 index 0000000..00afa76 --- /dev/null +++ b/README.md @@ -0,0 +1,70 @@ +# 媒体投屏器 (Media Caster) + +

+ 媒体投屏器 Logo +
+ 一个优雅、高效的本地媒体投屏工具 +

+ +

+ 功能特性 • + 快速开始 • + 使用指南 • + 技术架构 • + 开发指南 +

+ +

+ Python版本 + PyQt5 + 许可证 + 平台支持 +

+ +## 🌟 简介 + +媒体投屏器是一款基于 Python 和 PyQt5 开发的高性能媒体投屏工具,支持将本地音视频文件通过 DLNA/UPnP 协议投屏到智能电视、投影仪等设备。 + +### 主要特点 +- 🎯 **一键投屏**:简单三步完成媒体投屏 +- 📱 **设备自动发现**:智能扫描局域网内的投屏设备 +- ⚡ **高性能传输**:内置 HTTP 服务器,流畅播放体验 + +## ✨ 功能特性 + +### 🚀 核心功能 +| 功能 | 描述 | 状态 | +|------|------|------| +| **设备发现** | 自动扫描局域网内 DLNA/UPnP 设备 | ✅ | +| **文件浏览** | 支持多种音视频格式选择 | ✅ | +| **投屏控制** | 播放、暂停、停止、音量控制 | ✅ | +| **音量调节** | 滑块控制、静音切换 | ✅ | +| **多设备支持** | 同时发现和管理多个设备 | ✅ | + + +### 🖥️ 系统要求 +- **操作系统**: Windows 10/11, macOS 10.15+, Ubuntu 18.04+ +- **Python版本**: Python 3.7 或更高版本 +- **内存要求**: 最少 4GB RAM +- **网络要求**: 设备与电脑需在同一局域网 + +## 🚀 快速开始 + +### 安装方法 + +#### 方法一:从源代码安装(推荐) +```bash +# 1. 克隆仓库 +git clone https://github.com/yqsphp/media-caster.git +cd media-caster + +# 2. 创建虚拟环境 +python -m venv venv + +# Windows +venv\Scripts\activate +# Linux/Mac +source venv/bin/activate + +# 3. 运行程序 +python main.py \ No newline at end of file diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..f89fb24 --- /dev/null +++ b/__init__.py @@ -0,0 +1,21 @@ +# __init__.py (项目根目录) +""" +本地媒体投屏应用 + +此应用支持将本地媒体文件投屏到投影仪、电视等设备。 +支持多协议自动检测,包括DLNA、Chromecast等设备。 +""" + +__version__ = "1.0.1" +__author__ = "Local Cast" +__description__ = "本地媒体投屏工具" + +# 定义公开的接口 +__all__ = [ + "DeviceDiscovery", + "MainWindow" +] + +# 导入核心功能,便于直接访问 +from core.device_discovery import * +from ui.main_window import * diff --git a/core/__init__.py b/core/__init__.py new file mode 100644 index 0000000..655c966 --- /dev/null +++ b/core/__init__.py @@ -0,0 +1,15 @@ +# core/__init__.py +""" +核心功能模块 +包含设备发现、投屏服务和设备管理等功能类。 +""" +from .logger import AppLogger +from .device_discovery import DeviceDiscovery +from .dlna_controller import DLNAController +from .http_file_server import HTTPFileServer +__all__ = [ + "AppLogger", + "DeviceDiscovery", + "DLNAController", + "HTTPFileServer" +] diff --git a/core/device_discovery.py b/core/device_discovery.py new file mode 100644 index 0000000..45617e3 --- /dev/null +++ b/core/device_discovery.py @@ -0,0 +1,488 @@ +import socket +import time +import urllib.request +import urllib.error +import xml.etree.ElementTree as ET +from core.logger import AppLogger + +class DeviceDiscovery: + """ + DLNA/UPnP 媒体渲染器发现类 + 用于发现局域网中的投屏设备 + """ + + def __init__(self): + """ + 初始化设备发现器 + """ + self.logger = AppLogger('DeviceDiscovery') + self.discovered_devices = [] + + def discover_media_renderers(self, timeout=3): + """ + 发现局域网中的媒体渲染器(投屏设备) + 只返回 deviceType 为 urn:schemas-upnp-org:device:MediaRenderer:1 的设备 + 通过 UUID 去重 + + Args: + timeout: 搜索超时时间(秒) + + Returns: + list: 发现的设备列表 + """ + # 首先检查网络连接 + try: + test_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + test_sock.settimeout(1) + test_sock.sendto(b'test', ('239.255.255.250', 1900)) + test_sock.recvfrom(1024) + test_sock.close() + except socket.timeout: + self.logger.warning("网络可能没有响应,检查防火墙设置") + except Exception as e: + self.logger.warning(f"网络测试异常: {e}") + + devices_by_uuid = {} # 用UUID去重 + pending_devices = {} # 待处理的设备,key为location,value为(ip, headers) + + # SSDP 发现消息 + ssdp_msg = ( + "M-SEARCH * HTTP/1.1\r\n" + "HOST: 239.255.255.250:1900\r\n" + "MAN: \"ssdp:discover\"\r\n" + "MX: 3\r\n" + "ST: ssdp:all\r\n" + "USER-AGENT: Python DLNA Discovery\r\n" + "\r\n" + ) + + # 创建 UDP socket + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) + sock.bind(('0.0.0.0', 0)) + + # 发送发现请求 + sock.sendto(ssdp_msg.encode(), ('239.255.255.250', 1900)) + sock.setblocking(0) # 设置为非阻塞模式 + + self.logger.info(f"开始搜索媒体渲染器,超时时间: {timeout}秒...") + + # 使用 select 监听socket,避免while循环 + import select + start_time = time.time() + response_count = 0 + + while True: + # 计算剩余超时时间 + remaining_time = timeout - (time.time() - start_time) + if remaining_time <= 0: + break + + # 使用select监听socket + ready_to_read, _, _ = select.select([sock], [], [], remaining_time) + + if sock in ready_to_read: + try: + data, addr = sock.recvfrom(4096) + response_count += 1 + response = data.decode('utf-8', errors='ignore') + + # 解析SSDP响应头 + lines = response.split('\r\n') + headers = {} + for line in lines: + if ':' in line: + key, value = line.split(':', 1) + headers[key.strip().upper()] = value.strip() + + # 检查是否有LOCATION头 + location = headers.get('LOCATION') + if location: + pending_devices[location] = (addr[0], headers) + + except socket.error as e: + self.logger.debug(f"接收数据时出错: {e}") + continue + else: + # select 超时 + break + + sock.close() + + # 并发获取设备详情 + if pending_devices: + from concurrent.futures import ThreadPoolExecutor, as_completed + + self.logger.info(f"收到{response_count}个SSDP响应,准备并发获取{len(pending_devices)}个设备详情...") + + with ThreadPoolExecutor(max_workers=min(10, len(pending_devices))) as executor: + # 提交所有设备详情获取任务 + future_to_location = { + executor.submit(self._get_device_details, location, ip, headers): location + for location, (ip, headers) in pending_devices.items() + } + + # 收集结果 + for future in as_completed(future_to_location): + try: + device_info = future.result(timeout=5) + if device_info: + uuid = device_info.get('udn', '') + if uuid and uuid not in devices_by_uuid: + devices_by_uuid[uuid] = device_info + device_name = device_info.get('friendly_name', '未知') + self.logger.info(f"发现新设备: {device_name} (UUID: {uuid[:20]}...)") + except Exception as e: + location = future_to_location[future] + self.logger.debug(f"获取设备详情失败 {location}: {e}") + + self.logger.info(f"搜索完成: 发现{len(devices_by_uuid)}个媒体渲染器") + self.discovered_devices = list(devices_by_uuid.values()) + + return self.discovered_devices + + def _get_device_details(self, location, ip, headers): + """ + 获取单个设备的详细信息(提取出来便于并发调用) + + Args: + location: 设备描述文档URL + ip: 设备IP地址 + headers: SSDP响应头 + + Returns: + dict: 设备信息,如果不是MediaRenderer则返回None + """ + usn = headers.get('USN', '') + st = headers.get('ST', headers.get('NT', '')) + + try: + # 获取设备描述文档 + req = urllib.request.Request(location) + req.add_header('User-Agent', 'Python DLNA Client') + req.timeout = 3 + + with urllib.request.urlopen(req, timeout=3) as response: + xml_data = response.read().decode('utf-8', errors='ignore') + + # 解析 XML + root = ET.fromstring(xml_data) + ns = {'upnp': 'urn:schemas-upnp-org:device-1-0'} + + # 提取设备信息 + device = root.find('.//upnp:device', ns) + if device is None: + self.logger.debug(f"从 {location} 未找到设备信息") + return None + + # 检查设备类型 + device_type_elem = device.find('upnp:deviceType', ns) + if device_type_elem is None or device_type_elem.text != 'urn:schemas-upnp-org:device:MediaRenderer:1': + return None # 不是 MediaRenderer 设备,跳过 + + # 提取设备信息 + friendly_name_elem = device.find('upnp:friendlyName', ns) + manufacturer_elem = device.find('upnp:manufacturer', ns) + model_name_elem = device.find('upnp:modelName', ns) + model_description_elem = device.find('upnp:modelDescription', ns) + udn_elem = device.find('upnp:UDN', ns) + + # 提取服务列表 + services = [] + service_list = root.find('.//upnp:serviceList', ns) + if service_list is not None: + for service in service_list.findall('upnp:service', ns): + service_type_elem = service.find('upnp:serviceType', ns) + if service_type_elem is not None: + services.append(service_type_elem.text) + + device_info = { + 'ip': ip, + 'location': location, + 'server': headers.get('SERVER', ''), + 'usn': usn, + 'st': st, + 'friendly_name': friendly_name_elem.text if friendly_name_elem is not None else '', + 'manufacturer': manufacturer_elem.text if manufacturer_elem is not None else '', + 'model_name': model_name_elem.text if model_name_elem is not None else '', + 'model_description': model_description_elem.text if model_description_elem is not None else '', + 'device_type': device_type_elem.text, + 'udn': udn_elem.text if udn_elem is not None else '', + 'services': services, + 'response_time': time.strftime('%H:%M:%S') + } + + self.logger.debug(f"成功获取设备详情: {device_info.get('friendly_name')} ({ip})") + return device_info + + except (urllib.error.URLError, socket.timeout, ConnectionError) as e: + self.logger.debug(f"无法访问设备描述文档 {location}: {e}") + return None + except ET.ParseError as e: + self.logger.debug(f"解析设备XML失败: {location}, 错误: {e}") + return None + except Exception as e: + self.logger.debug(f"获取设备详情时出错: {e}") + return None + + def _get_device_details_from_response(self, response, ip): + """ + 从SSDP响应中获取设备详情,只返回 MediaRenderer 设备 + + Args: + response: SSDP响应数据 + ip: 设备IP地址 + + Returns: + dict: 设备信息,如果不是MediaRenderer则返回None + """ + # 解析SSDP响应头 + lines = response.split('\r\n') + headers = {} + + for line in lines: + if ':' in line: + key, value = line.split(':', 1) + headers[key.strip().upper()] = value.strip() + + # 检查是否有LOCATION头 + location = headers.get('LOCATION') + if not location: + return None + + # 先检查USN是否包含MediaRenderer字样,快速过滤 + usn = headers.get('USN', '') + st = headers.get('ST', headers.get('NT', '')) + + try: + # 获取设备描述文档 + req = urllib.request.Request(location) + req.add_header('User-Agent', 'Python DLNA Client') + + with urllib.request.urlopen(req, timeout=4) as response: + xml_data = response.read().decode('utf-8', errors='ignore') + + # 解析 XML + root = ET.fromstring(xml_data) + ns = {'upnp': 'urn:schemas-upnp-org:device-1-0'} + + # 提取设备信息 + device = root.find('.//upnp:device', ns) + if device is None: + self.logger.debug(f"从 {location} 未找到设备信息") + return None + + # 检查设备类型 + device_type_elem = device.find('upnp:deviceType', ns) + if device_type_elem is None or device_type_elem.text != 'urn:schemas-upnp-org:device:MediaRenderer:1': + return None # 不是 MediaRenderer 设备,跳过 + + # 提取设备信息 + friendly_name_elem = device.find('upnp:friendlyName', ns) + manufacturer_elem = device.find('upnp:manufacturer', ns) + model_name_elem = device.find('upnp:modelName', ns) + model_description_elem = device.find('upnp:modelDescription', ns) + udn_elem = device.find('upnp:UDN', ns) + + # 提取服务列表 + services = [] + service_list = root.find('.//upnp:serviceList', ns) + if service_list is not None: + for service in service_list.findall('upnp:service', ns): + service_type_elem = service.find('upnp:serviceType', ns) + if service_type_elem is not None: + services.append(service_type_elem.text) + + device_info = { + 'ip': ip, + 'location': location, + 'server': headers.get('SERVER', ''), + 'usn': usn, + 'st': st, + 'friendly_name': friendly_name_elem.text if friendly_name_elem is not None else '', + 'manufacturer': manufacturer_elem.text if manufacturer_elem is not None else '', + 'model_name': model_name_elem.text if model_name_elem is not None else '', + 'model_description': model_description_elem.text if model_description_elem is not None else '', + 'device_type': device_type_elem.text, + 'udn': udn_elem.text if udn_elem is not None else '', + 'services': services, + 'response_time': time.strftime('%H:%M:%S') + } + + self.logger.debug(f"成功获取设备详情: {device_info.get('friendly_name')} ({ip})") + return device_info + + except urllib.error.URLError as e: + self.logger.debug(f"无法访问设备描述文档 {location}: {e}") + return None + except socket.timeout: + self.logger.debug(f"访问设备描述文档超时: {location}") + return None + except ET.ParseError as e: + self.logger.debug(f"解析设备XML失败: {location}, 错误: {e}") + return None + except Exception as e: + self.logger.debug(f"获取设备详情时出错: {e}") + return None + + def display_discovered_devices(self): + """ + 显示已发现的媒体渲染器列表 + """ + if not self.discovered_devices: + self.logger.info("未发现任何媒体渲染器(投屏设备)") + return + + self.logger.info(f"发现 {len(self.discovered_devices)} 个媒体渲染器(投屏设备):") + + for i, device in enumerate(self.discovered_devices, 1): + device_name = device.get('friendly_name', '未知') + device_ip = device.get('ip', '未知') + manufacturer = device.get('manufacturer', '未知') + model = device.get('model_name', '未知') + model_desc = device.get('model_description', '') + + self.logger.info(f"设备{i}:{device_name},IP地址:{device_ip},制造商:{manufacturer},型号:{model},描述:{model_desc}") + + def check_device_isline(self, devices, timeout=2, max_threads=5): + """ + 快速检测指定的设备列表是否在线 + 通过访问设备的 location URL 来检测 + + Args: + devices: 待检测的设备列表 + timeout: 每个设备的检测超时时间(秒) + max_threads: 最大并发线程数,默认5个 + + Returns: + dict: 包含在线设备列表和离线设备列表的字典 + """ + if not devices: + self.logger.info("没有需要检测的设备") + return [] + + from concurrent.futures import ThreadPoolExecutor, as_completed + + self.logger.info(f"开始检测 {len(devices)} 个设备是否在线,超时时间: {timeout}秒") + + online_devices = [] + + def check_single_device(device): + """检测单个设备是否在线""" + uuid = device.get('udn', '未知') + device_name = device.get('friendly_name', '未知') + location = device.get('location', '') + ip = device.get('ip', '未知') + + if not location: + self.logger.debug(f"设备 {device_name} 没有location信息") + return device, False + + try: + # 设置超时访问设备的location URL + req = urllib.request.Request(location) + req.add_header('User-Agent', 'Python DLNA Availability Checker') + req.timeout = timeout + + with urllib.request.urlopen(req, timeout=timeout) as response: + # 检查响应状态码 + if response.status == 200: + self.logger.debug(f"设备 {device_name} ({ip}) 在线") + return device, True + else: + self.logger.debug(f"设备 {device_name} ({ip}) 响应异常: {response.status}") + return device, False + + except urllib.error.URLError as e: + self.logger.debug(f"设备 {device_name} ({ip}) 无法访问: {e}") + return device, False + except socket.timeout: + self.logger.debug(f"设备 {device_name} ({ip}) 检测超时") + return device, False + except Exception as e: + self.logger.debug(f"设备 {device_name} ({ip}) 检测出错: {e}") + return device, False + + # 使用线程池并发检测设备 + with ThreadPoolExecutor(max_workers=max_threads) as executor: + # 提交所有检测任务 + future_to_device = { + executor.submit(check_single_device, device): device + for device in devices + } + + # 收集结果 + for future in as_completed(future_to_device): + try: + device, is_online = future.result(timeout=timeout + 1) + if is_online: + online_devices.append(device) + self.logger.info(f"设备:{device.get('friendly_name')}[{device.get('ip')}]在线") + except Exception as e: + device = future_to_device[future] + self.logger.error(f"检测设备 {device.get('friendly_name', '未知')} 时出错: {e}") + + return online_devices + + def test_raw_discovery(self, timeout=5): + """ + 测试原始的SSDP发现,查看所有响应 + """ + ssdp_msg = ( + "M-SEARCH * HTTP/1.1\r\n" + "HOST: 239.255.255.250:1900\r\n" + "MAN: \"ssdp:discover\"\r\n" + "MX: 3\r\n" + "ST: ssdp:all\r\n" # 搜索所有设备 + "USER-AGENT: Python DLNA Discovery\r\n" + "\r\n" + ) + + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) + sock.bind(('', 0)) + + # 发送到所有可能的端口 + sock.sendto(ssdp_msg.encode(), ('239.255.255.250', 1900)) + sock.settimeout(timeout) + + responses = [] + start_time = time.time() + + while time.time() - start_time < timeout: + try: + data, addr = sock.recvfrom(4096) + response = data.decode('utf-8', errors='ignore') + responses.append((addr, response)) + + # 打印原始响应 + print(f"\n收到来自 {addr} 的响应:") + print(response[:200] + "..." if len(response) > 200 else response) + + # 尝试解析USN + lines = response.split('\r\n') + for line in lines: + if 'USN:' in line or 'ST:' in line or 'LOCATION:' in line: + print(f" {line.strip()}") + + except socket.timeout: + break + except Exception as e: + print(f"接收错误: {e}") + + sock.close() + print(f"\n总共收到 {len(responses)} 个原始响应") + return responses + + +def main(): + discovery = DeviceDiscovery() + devices = discovery.discover_media_renderers() + discovery.display_discovered_devices() + + print(devices) + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/core/dlna_controller.py b/core/dlna_controller.py new file mode 100644 index 0000000..900f40c --- /dev/null +++ b/core/dlna_controller.py @@ -0,0 +1,658 @@ +import urllib.parse +import xml.etree.ElementTree as ET +import requests +from urllib.parse import urlparse + +from core import AppLogger + + +class DLNAController: + """ + DLNA 媒体渲染器控制器 + 只负责控制已发现的设备,不包含发现功能 + """ + + def __init__(self, device_info): + """ + 初始化控制器 + + Args: + device_info: 设备信息字典,必须包含: + - 'location': 设备描述文档的URL + - 'friendly_name': 设备友好名称 (可选) + - 'ip': 设备IP地址 (可选) + """ + if 'location' not in device_info: + raise ValueError("device_info 必须包含 'location' 字段") + + self.device_info = device_info + self.location = device_info['location'] + self.base_url = None + self.av_transport_url = None + self.rendering_control_url = None + self.connection_manager_url = None + + # 解析设备信息 + self._parse_device_info() + + self.logger = AppLogger('DLNAController') + + def _parse_device_info(self): + """ + 解析设备描述文档,提取控制URL + """ + try: + # 获取基础URL + parsed = urlparse(self.location) + self.base_url = f"{parsed.scheme}://{parsed.netloc}" + + # 获取设备描述文档 + response = requests.get(self.location, timeout=5) + response.raise_for_status() + + # 解析XML + root = ET.fromstring(response.content) + ns = {'upnp': 'urn:schemas-upnp-org:device-1-0'} + + # 查找服务列表 + service_list = root.find('.//upnp:device/upnp:serviceList', ns) + if service_list is None: + raise ValueError("设备描述中未找到服务列表") + + # 遍历服务,提取控制URL + for service in service_list.findall('upnp:service', ns): + service_type_elem = service.find('upnp:serviceType', ns) + control_url_elem = service.find('upnp:controlURL', ns) + + if service_type_elem is None or control_url_elem is None: + continue + + service_type = service_type_elem.text + control_url = control_url_elem.text + + # 确保URL是完整的 + if control_url.startswith('/'): + control_url = f"{self.base_url}{control_url}" + + if 'AVTransport' in service_type: + self.av_transport_url = control_url + elif 'RenderingControl' in service_type: + self.rendering_control_url = control_url + elif 'ConnectionManager' in service_type: + self.connection_manager_url = control_url + + # 验证必要的服务是否存在 + if not self.av_transport_url: + raise ValueError("设备不支持 AVTransport 服务") + + except requests.exceptions.RequestException as e: + raise ConnectionError(f"无法连接设备: {e}") + except ET.ParseError as e: + raise ValueError(f"设备描述文档格式错误: {e}") + except Exception as e: + raise RuntimeError(f"解析设备信息失败: {e}") + + def _send_soap_request(self, service_url, action, body): + """ + 发送SOAP请求到设备 + + Args: + service_url: 服务控制URL + action: SOAP Action + body: SOAP Body内容 + + Returns: + tuple: (success, response_xml) - 成功标志和响应XML + """ + soap_envelope = f""" + + + {body} + +""" + + headers = { + 'Content-Type': 'text/xml; charset="utf-8"', + 'SOAPAction': f'"{action}"', + 'User-Agent': 'Python DLNA Controller' + } + + try: + response = requests.post(service_url, data=soap_envelope, + headers=headers, timeout=5) + response.raise_for_status() + return True, response.text + except requests.exceptions.Timeout: + return False, "请求超时" + except requests.exceptions.ConnectionError: + return False, "连接失败" + except Exception as e: + return False, str(e) + + # ================== AVTransport 服务方法 ================== + + def set_av_transport_uri(self, media_url, metadata=""): + """ + 设置播放URI + + Args: + media_url: 媒体文件的URL + metadata: 媒体元数据(可选) + + Returns: + bool: 是否成功 + """ + if not self.av_transport_url: + return False + + # 转义XML特殊字符 + media_url_escaped = media_url.replace('&', '&').replace('<', '<').replace('>', '>') + media_url_escaped = urllib.parse.quote(media_url_escaped,safe=':/') + metadata_escaped = metadata.replace('&', '&').replace('<', '<').replace('>', '>') + + soap_body = f""" + 0 + {media_url_escaped} + {metadata_escaped} +""" + + action = "urn:schemas-upnp-org:service:AVTransport:1#SetAVTransportURI" + success, response = self._send_soap_request(self.av_transport_url, action, soap_body) + self.logger.info(f"媒体播放地址:{media_url_escaped}") + self.logger.info(f"DLNA控制响应:{response}") + + return success + + def play(self, speed="1"): + """ + 开始播放 + + Args: + speed: 播放速度,默认为1 + + Returns: + bool: 是否成功 + """ + if not self.av_transport_url: + return False + + soap_body = f""" + 0 + {speed} +""" + + action = "urn:schemas-upnp-org:service:AVTransport:1#Play" + success, response = self._send_soap_request(self.av_transport_url, action, soap_body) + + return success + + def pause(self): + """ + 暂停播放 + + Returns: + bool: 是否成功 + """ + if not self.av_transport_url: + return False + + soap_body = """ + 0 +""" + + action = "urn:schemas-upnp-org:service:AVTransport:1#Pause" + success, response = self._send_soap_request(self.av_transport_url, action, soap_body) + + return success + + def stop(self): + """ + 停止播放 + + Returns: + bool: 是否成功 + """ + if not self.av_transport_url: + return False + + soap_body = """ + 0 +""" + + action = "urn:schemas-upnp-org:service:AVTransport:1#Stop" + success, response = self._send_soap_request(self.av_transport_url, action, soap_body) + + return success + + def seek(self, unit, target): + """ + 跳转到指定位置 + + Args: + unit: 时间单位,如 "REL_TIME"(相对时间)或 "TRACK_NR"(曲目号) + target: 目标位置,如 "00:05:30" 或 "2"(第2首) + + Returns: + bool: 是否成功 + """ + if not self.av_transport_url: + return False + + target_escaped = target.replace('&', '&').replace('<', '<').replace('>', '>') + + soap_body = f""" + 0 + {unit} + {target_escaped} +""" + + action = "urn:schemas-upnp-org:service:AVTransport:1#Seek" + success, response = self._send_soap_request(self.av_transport_url, action, soap_body) + + return success + + def get_transport_info(self): + """ + 获取传输状态信息 + + Returns: + dict: 传输状态信息,包含: + - state: 当前传输状态 + - status: 当前传输状态 + - speed: 当前播放速度 + None: 获取失败 + """ + if not self.av_transport_url: + return None + + soap_body = """ + 0 +""" + + action = "urn:schemas-upnp-org:service:AVTransport:1#GetTransportInfo" + success, response = self._send_soap_request(self.av_transport_url, action, soap_body) + + if not success: + return None + + try: + # 解析响应XML + root = ET.fromstring(response) + ns = {'s': 'http://schemas.xmlsoap.org/soap/envelope/', + 'u': 'urn:schemas-upnp-org:service:AVTransport:1'} + + # 查找Body中的响应 + body = root.find('.//s:Body', ns) + if body is None: + return None + + # 获取状态信息 + current_transport_state = body.find('.//CurrentTransportState') + current_transport_status = body.find('.//CurrentTransportStatus') + current_speed = body.find('.//CurrentSpeed') + + return { + 'state': current_transport_state.text if current_transport_state is not None else '未知', + 'status': current_transport_status.text if current_transport_status is not None else '未知', + 'speed': current_speed.text if current_speed is not None else '未知' + } + + except ET.ParseError: + return None + + def get_position_info(self): + """ + 获取当前位置信息 + + Returns: + dict: 位置信息,包含: + - track: 当前曲目 + - track_duration: 曲目时长 + - track_meta_data: 曲目元数据 + - track_uri: 曲目URI + - rel_time: 相对时间 + - abs_time: 绝对时间 + - rel_count: 相对计数 + - abs_count: 绝对计数 + None: 获取失败 + """ + if not self.av_transport_url: + return None + + soap_body = """ + 0 +""" + + action = "urn:schemas-upnp-org:service:AVTransport:1#GetPositionInfo" + success, response = self._send_soap_request(self.av_transport_url, action, soap_body) + + if not success: + return None + + try: + root = ET.fromstring(response) + ns = {'s': 'http://schemas.xmlsoap.org/soap/envelope/', + 'u': 'urn:schemas-upnp-org:service:AVTransport:1'} + + body = root.find('.//s:Body', ns) + if body is None: + return None + + # 提取位置信息 + info = {} + fields = [ + 'Track', 'TrackDuration', 'TrackMetaData', 'TrackURI', + 'RelTime', 'AbsTime', 'RelCount', 'AbsCount' + ] + + for field in fields: + elem = body.find(f'.//{field}') + info[field.lower()] = elem.text if elem is not None else '' + + return info + + except ET.ParseError: + return None + + # ================== RenderingControl 服务方法 ================== + + def set_volume(self, volume, channel='Master'): + """ + 设置音量 + + Args: + volume: 音量值 (0-100) + channel: 声道,默认为 'Master' + + Returns: + bool: 是否成功 + """ + if not self.rendering_control_url: + return False + + # 确保音量在0-100范围内 + volume = max(0, min(100, int(volume))) + + soap_body = f""" + 0 + {channel} + {volume} +""" + + action = "urn:schemas-upnp-org:service:RenderingControl:1#SetVolume" + success, response = self._send_soap_request(self.rendering_control_url, action, soap_body) + + return success + + def get_volume(self, channel='Master'): + """ + 获取当前音量 + + Args: + channel: 声道,默认为 'Master' + + Returns: + int: 音量值 (0-100),None表示获取失败 + """ + if not self.rendering_control_url: + return None + + soap_body = f""" + 0 + {channel} +""" + + action = "urn:schemas-upnp-org:service:RenderingControl:1#GetVolume" + success, response = self._send_soap_request(self.rendering_control_url, action, soap_body) + + if not success: + return None + + try: + root = ET.fromstring(response) + ns = {'s': 'http://schemas.xmlsoap.org/soap/envelope/', + 'u': 'urn:schemas-upnp-org:service:RenderingControl:1'} + + body = root.find('.//s:Body', ns) + if body is None: + return None + + current_volume = body.find('.//CurrentVolume') + if current_volume is not None and current_volume.text: + return int(current_volume.text) + + except (ET.ParseError, ValueError): + pass + + return None + + def set_mute(self, mute, channel='Master'): + """ + 设置静音 + + Args: + mute: 是否静音 (True/False) + channel: 声道,默认为 'Master' + + Returns: + bool: 是否成功 + """ + if not self.rendering_control_url: + return False + + desired_mute = "1" if mute else "0" + + soap_body = f""" + 0 + {channel} + {desired_mute} +""" + + action = "urn:schemas-upnp-org:service:RenderingControl:1#SetMute" + success, response = self._send_soap_request(self.rendering_control_url, action, soap_body) + + return success + + def get_mute(self, channel='Master'): + """ + 获取静音状态 + + Args: + channel: 声道,默认为 'Master' + + Returns: + bool: 是否静音,None表示获取失败 + """ + if not self.rendering_control_url: + return None + + soap_body = f""" + 0 + {channel} +""" + + action = "urn:schemas-upnp-org:service:RenderingControl:1#GetMute" + success, response = self._send_soap_request(self.rendering_control_url, action, soap_body) + + if not success: + return None + + try: + root = ET.fromstring(response) + ns = {'s': 'http://schemas.xmlsoap.org/soap/envelope/', + 'u': 'urn:schemas-upnp-org:service:RenderingControl:1'} + + body = root.find('.//s:Body', ns) + if body is None: + return None + + current_mute = body.find('.//CurrentMute') + if current_mute is not None and current_mute.text: + return current_mute.text == "1" + + except ET.ParseError: + pass + + return None + + # ================== ConnectionManager 服务方法 ================== + + def get_protocol_info(self): + """ + 获取设备支持的协议信息 + + Returns: + dict: 协议信息,包含: + - source: 源协议信息 + - sink: 接收协议信息 + None: 获取失败 + """ + if not self.connection_manager_url: + return None + + soap_body = """ +""" + + action = "urn:schemas-upnp-org:service:ConnectionManager:1#GetProtocolInfo" + success, response = self._send_soap_request(self.connection_manager_url, action, soap_body) + + if not success: + return None + + try: + root = ET.fromstring(response) + ns = {'s': 'http://schemas.xmlsoap.org/soap/envelope/', + 'u': 'urn:schemas-upnp-org:service:ConnectionManager:1'} + + body = root.find('.//s:Body', ns) + if body is None: + return None + + source = body.find('.//Source') + sink = body.find('.//Sink') + + return { + 'source': source.text if source is not None else '', + 'sink': sink.text if sink is not None else '' + } + + except ET.ParseError: + return None + + # ================== 高级方法 ================== + + def play_media(self, media_url, metadata="", wait=1): + """ + 播放媒体文件(组合操作) + + Args: + media_url: 媒体文件URL + metadata: 媒体元数据 + wait: 设置URI后的等待时间(秒) + + Returns: + bool: 是否成功 + """ + import time + + if self.set_av_transport_uri(media_url, metadata): + time.sleep(wait) + return self.play() + return False + + def get_device_status(self): + """ + 获取设备完整状态 + + Returns: + dict: 设备状态信息 + """ + status = { + 'device_info': { + 'friendly_name': self.device_info.get('friendly_name', '未知'), + 'ip': self.device_info.get('ip', '未知'), + 'location': self.device_info.get('location', '未知') + }, + 'transport_info': self.get_transport_info(), + 'position_info': self.get_position_info(), + 'volume': self.get_volume(), + 'mute': self.get_mute(), + 'protocol_info': self.get_protocol_info(), + 'services': { + 'av_transport': self.av_transport_url is not None, + 'rendering_control': self.rendering_control_url is not None, + 'connection_manager': self.connection_manager_url is not None + } + } + + return status + + def get_supported_formats(self): + """ + 获取设备支持的媒体格式 + + Returns: + list: 支持的格式列表 + """ + protocol_info = self.get_protocol_info() + if not protocol_info or 'sink' not in protocol_info: + return [] + + # 解析sink字段,提取支持的格式 + sink_info = protocol_info['sink'] + formats = [] + + # DLNA格式通常是逗号分隔的 + for fmt in sink_info.split(','): + fmt = fmt.strip() + if fmt: + formats.append(fmt) + + return formats + + def can_play_format(self, media_url): + """ + 检查设备是否可能支持指定格式 + + Args: + media_url: 媒体URL + + Returns: + bool: 是否可能支持 + """ + import mimetypes + + # 获取文件扩展名 + if '.' in media_url: + ext = media_url.split('.')[-1].lower() + + # 常见扩展名到MIME类型的映射 + ext_to_mime = { + 'mp4': 'video/mp4', + 'mp3': 'audio/mp3', + 'm4a': 'audio/mp4', + 'wav': 'audio/wav', + 'flac': 'audio/flac', + 'mkv': 'video/x-matroska', + 'avi': 'video/x-msvideo', + 'mov': 'video/quicktime', + 'wmv': 'video/x-ms-wmv', + 'flv': 'video/x-flv', + 'webm': 'video/webm', + 'ogg': 'audio/ogg', + 'oga': 'audio/ogg', + 'ogv': 'video/ogg', + 'm3u8': 'application/x-mpegURL', + 'mpd': 'application/dash+xml' + } + + mime_type = ext_to_mime.get(ext) + if mime_type: + # 检查设备是否支持该MIME类型 + supported_formats = self.get_supported_formats() + for fmt in supported_formats: + if mime_type in fmt: + return True + + return False \ No newline at end of file diff --git a/core/http_file_server.py b/core/http_file_server.py new file mode 100644 index 0000000..bf0cbc9 --- /dev/null +++ b/core/http_file_server.py @@ -0,0 +1,693 @@ +# core/http_file_server.py +""" +支持流式传输的HTTP文件服务器,适合大文件投屏 +支持动态修改根目录和流式传输 +""" +import http.server +import socketserver +import threading +import os +import urllib.parse +import mimetypes +import time +from typing import Optional, Tuple +from core.logger import AppLogger + +class StreamingHTTPRequestHandler(http.server.BaseHTTPRequestHandler): + """支持流式传输的HTTP请求处理器""" + + # 协议版本 + protocol_version = 'HTTP/1.1' + + # 支持的MIME类型映射 + VIDEO_MIME_TYPES = { + '.mp4': 'video/mp4', + '.mkv': 'video/x-matroska', + '.avi': 'video/x-msvideo', + '.mov': 'video/quicktime', + '.wmv': 'video/x-ms-wmv', + '.flv': 'video/x-flv', + '.webm': 'video/webm', + '.m4v': 'video/x-m4v', + '.3gp': 'video/3gpp', + '.ts': 'video/mp2t', + '.m2ts': 'video/mp2t', + '.mts': 'video/mp2t', + } + + AUDIO_MIME_TYPES = { + '.mp3': 'audio/mpeg', + '.wav': 'audio/wav', + '.flac': 'audio/flac', + '.aac': 'audio/aac', + '.ogg': 'audio/ogg', + '.m4a': 'audio/mp4', + '.wma': 'audio/x-ms-wma', + } + + IMAGE_MIME_TYPES = { + '.jpg': 'image/jpeg', + '.jpeg': 'image/jpeg', + '.png': 'image/png', + '.gif': 'image/gif', + '.bmp': 'image/bmp', + '.webp': 'image/webp', + } + + def __init__(self, *args, **kwargs): + self.extensions_map = {} + self.extensions_map.update(self.VIDEO_MIME_TYPES) + self.extensions_map.update(self.AUDIO_MIME_TYPES) + self.extensions_map.update(self.IMAGE_MIME_TYPES) + + # 文本和其他类型 + self.extensions_map.update({ + '.html': 'text/html', + '.htm': 'text/html', + '.css': 'text/css', + '.js': 'application/javascript', + '.json': 'application/json', + '.txt': 'text/plain', + '.xml': 'application/xml', + '.pdf': 'application/pdf', + '.ico': 'image/x-icon', + '.svg': 'image/svg+xml', + }) + + self.logger = AppLogger('StreamingHTTPServer') + super().__init__(*args, **kwargs) + + def get_server_root(self) -> str: + """获取服务器当前根目录""" + if hasattr(self.server, 'current_directory'): + return self.server.current_directory + return "." + + def translate_path(self, path: str) -> str: + """ + 转换URL路径为文件系统路径 + + Args: + path: URL路径 + + Returns: + 文件系统路径,如果无效则返回空字符串 + """ + try: + # 解析URL + parsed_path = urllib.parse.urlparse(path) + url_path = urllib.parse.unquote(parsed_path.path) + + # 获取根目录 + root_dir = self.get_server_root() + + # 去掉开头的斜杠 + if url_path.startswith('/'): + url_path = url_path[1:] + + # 安全检查:防止目录遍历攻击 + if '..' in url_path: + self.logger.warning(f"检测到目录遍历攻击: {path}") + return "" + + # 构建完整路径 + full_path = os.path.join(root_dir, url_path) + + # 标准化路径 + full_path = os.path.normpath(full_path) + + # 再次安全检查:确保路径在根目录下 + abs_root = os.path.abspath(root_dir) + abs_path = os.path.abspath(full_path) + + if not abs_path.startswith(abs_root): + self.logger.warning(f"路径超出根目录范围: {full_path}") + return "" + + return full_path + + except Exception as e: + self.logger.error(f"路径转换失败: {e}") + return "" + + def guess_mime_type(self, file_path: str) -> str: + """ + 根据文件扩展名猜测MIME类型 + + Args: + file_path: 文件路径 + + Returns: + MIME类型字符串 + """ + # 获取扩展名 + _, ext = os.path.splitext(file_path) + ext = ext.lower() + + # 使用自定义映射 + if ext in self.extensions_map: + return self.extensions_map[ext] + + # 使用系统猜测 + mime_type, _ = mimetypes.guess_type(file_path) + if mime_type: + return mime_type + + # 默认类型 + return 'application/octet-stream' + + def parse_range_header(self, range_header: str, file_size: int) -> Optional[Tuple[int, int]]: + """ + 解析Range请求头 + + Args: + range_header: Range头值 + file_size: 文件大小 + + Returns: + (start, end)元组,或None表示无效范围 + """ + if not range_header: + return None + + try: + # 格式: bytes=start-end + if range_header.startswith('bytes='): + range_str = range_header[6:] # 去掉'bytes=' + + if '-' in range_str: + start_str, end_str = range_str.split('-', 1) + + start = 0 + end = file_size - 1 + + if start_str: + start = int(start_str) + if end_str: + end = int(end_str) + else: + # 格式: bytes=start- + end = file_size - 1 + + # 验证范围 + if start < 0 or end >= file_size or start > end: + return None + + return (start, end) + + except Exception as e: + self.logger.error(f"解析Range头失败: {e}") + + return None + + def send_file_streaming(self, file_path: str, start_pos: int = 0, end_pos: Optional[int] = None): + """ + 流式发送文件内容 + + Args: + file_path: 文件路径 + start_pos: 开始位置 + end_pos: 结束位置,None表示到文件末尾 + """ + try: + file_size = os.path.getsize(file_path) + + # 计算实际发送范围 + if end_pos is None: + end_pos = file_size - 1 + + content_length = end_pos - start_pos + 1 + + # 打开文件 + with open(file_path, 'rb') as f: + # 定位到开始位置 + f.seek(start_pos) + + # 设置传输编码为分块传输(Chunked) + self.send_response(200 if start_pos == 0 and end_pos == file_size - 1 else 206) + + # 对于范围请求,发送Content-Range头 + if start_pos > 0 or end_pos < file_size - 1: + self.send_header('Content-Range', f'bytes {start_pos}-{end_pos}/{file_size}') + self.send_header('Accept-Ranges', 'bytes') + + # 设置Content-Length而不是使用Transfer-Encoding: chunked + # 这样可以支持视频播放器的进度控制和跳转 + self.send_header('Content-Length', str(content_length)) + self.send_header('Content-Type', self.guess_mime_type(file_path)) + self.send_header('Accept-Ranges', 'bytes') + self.send_header('Access-Control-Allow-Origin', '*') + self.send_header('Access-Control-Allow-Methods', 'GET, HEAD, OPTIONS') + self.send_header('Access-Control-Allow-Headers', 'Range') + self.send_header('Access-Control-Expose-Headers', 'Content-Length, Content-Range') + + # 缓存控制头 + self.send_header('Cache-Control', 'no-cache, no-store, must-revalidate') + self.send_header('Pragma', 'no-cache') + self.send_header('Expires', '0') + + self.end_headers() + + # 流式发送数据 + remaining = content_length + chunk_size = 1024 * 256 # 256KB chunk size for streaming + + while remaining > 0: + # 读取数据块 + read_size = min(chunk_size, remaining) + data = f.read(read_size) + + if not data: + break + + # 发送数据块 + try: + self.wfile.write(data) + self.wfile.flush() # 确保数据立即发送 + except (ConnectionError, BrokenPipeError): + self.logger.debug("客户端断开连接") + break + + remaining -= len(data) + + # 可选:小延迟以避免占用太多CPU + if read_size == chunk_size: + time.sleep(0.001) # 1ms延迟 + + self.logger.debug(f"文件传输完成: {file_path} ({content_length} bytes)") + + except Exception as e: + self.logger.error(f"流式发送文件失败: {e}") + raise + + def do_GET(self): + """处理GET请求""" + try: + client_ip = self.client_address[0] + self.logger.debug(f"GET请求: {self.path} from {client_ip}") + + # 转换路径 + file_path = self.translate_path(self.path) + if not file_path: + self.send_error(404, "文件未找到") + return + + # 检查文件是否存在 + if not os.path.exists(file_path): + self.send_error(404, f"文件不存在: {self.path}") + return + + if not os.path.isfile(file_path): + self.send_error(403, f"不是文件: {self.path}") + return + + # 获取文件信息 + file_size = os.path.getsize(file_path) + mime_type = self.guess_mime_type(file_path) + + # 检查Range请求 + range_header = self.headers.get('Range') + range_info = self.parse_range_header(range_header, file_size) + + # 如果是视频或音频文件,支持范围请求 + is_media_file = any(ext in file_path.lower() for ext in + ['.mp4', '.mkv', '.avi', '.mov', '.mp3', '.wav', '.flac']) + + if is_media_file and range_info: + # 范围请求 + start_pos, end_pos = range_info + self.send_file_streaming(file_path, start_pos, end_pos) + else: + # 完整文件请求 + # 对于大文件,也使用流式传输 + self.send_file_streaming(file_path, 0, file_size - 1) + + except ConnectionError: + self.logger.debug("客户端断开连接") + except Exception as e: + self.logger.error(f"处理GET请求失败: {e}") + if not self.headers_sent: + self.send_error(500, "内部服务器错误") + + def do_HEAD(self): + """处理HEAD请求""" + try: + self.logger.debug(f"HEAD请求: {self.path}") + + # 转换路径 + file_path = self.translate_path(self.path) + if not file_path: + self.send_error(404, "文件未找到") + return + + # 检查文件是否存在 + if not os.path.exists(file_path): + self.send_error(404, f"文件不存在: {self.path}") + return + + if not os.path.isfile(file_path): + self.send_error(403, f"不是文件: {self.path}") + return + + # 获取文件信息 + file_size = os.path.getsize(file_path) + mime_type = self.guess_mime_type(file_path) + + # 设置响应头 + self.send_response(200) + self.send_header('Content-Type', mime_type) + self.send_header('Content-Length', str(file_size)) + self.send_header('Accept-Ranges', 'bytes') + self.send_header('Access-Control-Allow-Origin', '*') + self.send_header('Access-Control-Allow-Methods', 'GET, HEAD, OPTIONS') + self.end_headers() + + except Exception as e: + self.logger.error(f"处理HEAD请求失败: {e}") + self.send_error(500, "内部服务器错误") + + def do_OPTIONS(self): + """处理OPTIONS请求(CORS预检)""" + self.send_response(200) + self.send_header('Access-Control-Allow-Origin', '*') + self.send_header('Access-Control-Allow-Methods', 'GET, HEAD, OPTIONS') + self.send_header('Access-Control-Allow-Headers', 'Range') + self.send_header('Access-Control-Max-Age', '86400') + self.end_headers() + + def log_message(self, format, *args): + """重写日志消息""" + # 减少日志输出,避免性能影响 + message = format % args + if 'code' in message and 'message' in message: + # 只记录错误响应 + if '200' not in message: + self.logger.info(f"HTTP响应: {message}") + +class StreamingHTTPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): + """支持流式传输和多线程的HTTP服务器""" + + allow_reuse_address = True + daemon_threads = True + + def __init__(self, server_address, request_handler_class, initial_directory="."): + """ + 初始化流式HTTP服务器 + + Args: + server_address: 服务器地址和端口 + request_handler_class: 请求处理器类 + initial_directory: 初始根目录 + """ + self.current_directory = os.path.abspath(initial_directory) + self.request_queue_size = 50 # 增加请求队列大小 + super().__init__(server_address, request_handler_class) + + def set_directory(self, directory: str) -> bool: + """ + 设置新的根目录 + + Args: + directory: 新的根目录 + + Returns: + 是否设置成功 + """ + new_dir = os.path.dirname(directory) + if os.path.exists(directory) and os.path.isdir(new_dir): + old_dir = self.current_directory + self.current_directory = new_dir + return True + return False + # 设置新的根目录 + new_dir = os.path.abspath(directory) + old_dir = self.httpd.current_directory + self.httpd.current_directory = new_dir + self.logger.info(f"HTTP服务器根目录已更改: {old_dir} -> {new_dir}") + +class StreamingFileServer: + """流式文件服务器管理器""" + + def __init__(self, port: int = 8000, bind_address: str = "0.0.0.0"): + """ + 初始化流式文件服务器 + + Args: + port: 服务器端口 + bind_address: 绑定地址 + """ + self.port = port + self.bind_address = bind_address + self.httpd: Optional[StreamingHTTPServer] = None + self.server_thread: Optional[threading.Thread] = None + self.is_running = False + self.logger = AppLogger.create_default_logger('StreamingFileServer') + + # 初始化MIME类型 + mimetypes.init() + + def start(self, directory: str = ".") -> bool: + """ + 启动流式文件服务器 + + Args: + directory: 初始根目录 + + Returns: + 是否启动成功 + """ + try: + if self.is_running: + self.logger.warning("服务器已经在运行") + return False + + # 检查目录 + if not os.path.exists(directory): + self.logger.error(f"目录不存在: {directory}") + return False + + if not os.path.isdir(directory): + self.logger.error(f"路径不是目录: {directory}") + return False + + # 创建服务器 + self.httpd = StreamingHTTPServer( + (self.bind_address, self.port), + StreamingHTTPRequestHandler, + initial_directory=directory + ) + + # 设置超时(秒) + self.httpd.timeout = 30 + + # 启动服务器线程 + self.server_thread = threading.Thread( + target=self._run_server, + daemon=True, + name="StreamingHTTPServer" + ) + self.server_thread.start() + + self.is_running = True + self.logger.info(f"流式文件服务器已启动: http://{self.bind_address}:{self.port}") + self.logger.info(f"初始根目录: {directory}") + self.logger.info("服务器支持流式传输和断点续传") + + return True + + except Exception as e: + self.logger.error(f"启动服务器失败: {e}") + return False + + def _run_server(self): + """运行服务器线程""" + try: + self.logger.info(f"服务器监听在 {self.bind_address}:{self.port}") + self.httpd.serve_forever() + except Exception as e: + if self.is_running: + self.logger.error(f"服务器错误: {e}") + finally: + self.is_running = False + + def set_root_directory(self, directory: str) -> bool: + """ + 动态设置根目录 + + Args: + directory: 新的根目录 + + Returns: + 是否设置成功 + """ + if not self.is_running or not self.httpd: + self.logger.error("服务器未运行") + return False + + try: + if not os.path.exists(directory): + self.logger.error(f"目录不存在: {directory}") + return False + + if self.httpd.set_directory(directory): + self.logger.info(f"根目录已更改为: {directory}") + return True + else: + self.logger.error(f"设置根目录失败: {directory}") + return False + + except Exception as e: + self.logger.error(f"设置根目录时出错: {e}") + return False + + def get_root_directory(self) -> str: + """获取当前根目录""" + if self.httpd: + return self.httpd.current_directory + return "." + + def stop(self): + """停止服务器""" + try: + self.is_running = False + if self.httpd: + self.httpd.shutdown() + self.httpd.server_close() + self.logger.info("流式文件服务器已停止") + except Exception as e: + self.logger.error(f"停止服务器时出错: {e}") + + def get_file_url(self, file_path: str) -> Optional[str]: + """ + 获取文件的HTTP URL + + Args: + file_path: 文件路径(绝对或相对) + + Returns: + 文件的HTTP URL,如果无效则返回None + """ + try: + # 获取当前根目录 + root_dir = self.get_root_directory() + + # 如果是绝对路径 + if os.path.isabs(file_path): + abs_file_path = os.path.abspath(file_path) + abs_root_dir = os.path.abspath(root_dir) + + # 确保文件在根目录下 + if not abs_file_path.startswith(abs_root_dir): + self.logger.warning(f"文件不在当前根目录下: {file_path}") + return None + + # 获取相对路径 + relative_path = os.path.relpath(abs_file_path, abs_root_dir) + else: + # 相对路径 + relative_path = file_path + + # 检查文件是否存在 + full_path = os.path.join(root_dir, relative_path) + if not os.path.exists(full_path) or not os.path.isfile(full_path): + self.logger.warning(f"文件不存在: {full_path}") + return None + + # URL编码路径 + encoded_path = urllib.parse.quote(relative_path.replace('\\', '/')) + + # 构建URL + return f"http://{self.bind_address}:{self.port}/{encoded_path}" + + except Exception as e: + self.logger.error(f"生成文件URL失败: {e}") + return None + + def get_server_info(self) -> dict: + """获取服务器信息""" + return { + 'is_running': self.is_running, + 'bind_address': self.bind_address, + 'port': self.port, + 'root_directory': self.get_root_directory(), + 'url': f"http://{self.bind_address}:{self.port}" + } + +# 高级流式服务器类 +class HTTPFileServer(StreamingFileServer): + """高级流式服务器,支持更多功能""" + + def __init__(self, port: int = 8000, bind_address: str = "0.0.0.0", + max_chunk_size: int = 1024 * 256): # 256KB + """ + 初始化高级流式服务器 + + Args: + port: 服务器端口 + bind_address: 绑定地址 + max_chunk_size: 最大块大小(字节) + """ + super().__init__(port, bind_address) + self.max_chunk_size = max_chunk_size + self.active_connections = 0 + self.total_served_bytes = 0 + self.stats_lock = threading.Lock() + + def _run_server(self): + """运行服务器线程,包含统计信息""" + self.logger.info(f"高级流式服务器启动,块大小: {self.max_chunk_size}字节") + super()._run_server() + + def get_stats(self) -> dict: + """获取服务器统计信息""" + with self.stats_lock: + return { + 'active_connections': self.active_connections, + 'total_served_bytes': self.total_served_bytes, + 'is_running': self.is_running, + 'root_directory': self.get_root_directory() + } + + +# 使用示例 +def test_streaming_server(): + """测试流式服务器""" + import time + + # 创建服务器 + server = HTTPFileServer(port=8080) + + print("启动流式文件服务器...") + + # 启动服务器 + if server.start("."): + print(f"服务器已启动: http://localhost:8080") + print(f"当前目录: {server.get_root_directory()}") + + # 测试获取文件URL + test_file = "test.mp4" # 假设存在一个测试文件 + file_url = server.get_file_url(test_file) + if file_url: + print(f"文件URL: {file_url}") + print("你可以使用以下命令测试:") + print(f" curl -I {file_url} # 查看文件信息") + print(f" curl --range 0-999 {file_url} # 测试范围请求") + + # 运行一段时间 + try: + while True: + time.sleep(1) + stats = server.get_stats() + print(f"\r活跃连接: {stats['active_connections']}, " + f"已传输: {stats['total_served_bytes'] / 1024 / 1024:.2f} MB", end='') + except KeyboardInterrupt: + print("\n\n正在停止服务器...") + finally: + server.stop() + print("服务器已停止") + else: + print("启动服务器失败") + + +if __name__ == "__main__": + test_streaming_server() \ No newline at end of file diff --git a/core/logger.py b/core/logger.py new file mode 100644 index 0000000..d563867 --- /dev/null +++ b/core/logger.py @@ -0,0 +1,83 @@ +# logger.py +import logging +import sys +from datetime import datetime +import os + + +class AppLogger: + """ + 应用程序日志工具类 + 提供统一的日志记录功能 + """ + + def __init__(self, name='', log_level=logging.INFO, log_to_file=False, log_file_path=None): + """ + 初始化日志记录器 + + Args: + name: 日志记录器名称 + log_level: 日志级别 + log_to_file: 是否记录到文件 + log_file_path: 日志文件路径,如果为None则使用默认路径 + """ + self.logger = logging.getLogger(name) + + # 避免重复添加处理器 + if self.logger.handlers: + return + + self.logger.setLevel(log_level) + + # 创建格式化器 + formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + + # 创建控制台处理器 + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setLevel(log_level) + console_handler.setFormatter(formatter) + self.logger.addHandler(console_handler) + + # 如果需要记录到文件 + if log_to_file: + if log_file_path is None: + # 创建logs目录 + if not os.path.exists('logs'): + os.makedirs('logs') + # 生成日志文件名 + timestamp = datetime.now().strftime('%Y%m%d') + log_file_path = f'logs/{timestamp}.log' + + file_handler = logging.FileHandler(log_file_path, encoding='utf-8') + file_handler.setLevel(log_level) + file_handler.setFormatter(formatter) + self.logger.addHandler(file_handler) + + def info(self, message): + """记录信息级别日志""" + self.logger.info(message) + + def debug(self, message): + """记录调试级别日志""" + self.logger.debug(message) + + def warning(self, message): + """记录警告级别日志""" + self.logger.warning(message) + + def error(self, message): + """记录错误级别日志""" + self.logger.error(message) + + def critical(self, message): + """记录严重级别日志""" + self.logger.critical(message) + + def set_level(self, level): + """设置日志级别""" + self.logger.setLevel(level) + for handler in self.logger.handlers: + handler.setLevel(level) \ No newline at end of file diff --git a/icon.ico b/icon.ico new file mode 100644 index 0000000..fbbf82c Binary files /dev/null and b/icon.ico differ diff --git a/main.py b/main.py new file mode 100644 index 0000000..5e84fcc --- /dev/null +++ b/main.py @@ -0,0 +1,30 @@ +# main.py +import os +import sys + +from PyQt5.QtWidgets import QApplication + +# 获取当前文件所在目录(main.py所在目录) +current_dir = os.path.dirname(os.path.abspath(__file__)) +sys.path.insert(0, current_dir) # 添加当前目录到搜索路径 + +from ui.main_window import MainWindow + + +def main(): + """应用主函数""" + app = QApplication(sys.argv) + # 设置应用样式 + app.setStyle("Fusion") + + # 创建并显示主窗口 + window = MainWindow() + + window.show() + + # 启动应用事件循环 + sys.exit(app.exec_()) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/ui/__init__.py b/ui/__init__.py new file mode 100644 index 0000000..384ae38 --- /dev/null +++ b/ui/__init__.py @@ -0,0 +1,11 @@ +""" +用户界面模块 + +包含主窗口和各种UI组件。 +""" + +__all__ = [ + "MainWindow", +] +# 导入UI组件 +from .main_window import MainWindow diff --git a/ui/main_window.py b/ui/main_window.py new file mode 100644 index 0000000..85cdf6d --- /dev/null +++ b/ui/main_window.py @@ -0,0 +1,841 @@ +import sys +import os +import threading +import socket +from datetime import datetime + +from PyQt5.QtCore import Qt +from PyQt5.QtGui import QIcon +from PyQt5.QtWidgets import QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QLabel, QGroupBox, QFrame, QPushButton, \ + QListWidget, QSlider, QTextEdit, QFileDialog, QMessageBox, QListWidgetItem + +from core.device_discovery import DeviceDiscovery +from core.dlna_controller import DLNAController +from core.http_file_server import HTTPFileServer +from core.logger import AppLogger + + +class MainWindow(QMainWindow): + """智能媒体投屏器主窗口""" + + def __init__(self): + super().__init__() + self.device_discovery = None + self.dlna_controller = None + self.http_server = None + self.selected_device = None + self.selected_file = None + self.discovered_devices = [] + self.logger = AppLogger('MediaCastUI') + self.http_port = 19735 + self.is_paused = False # 新增:跟踪暂停状态 + self.init_ui() + self.init_connections() + + def init_ui(self): + """初始化UI界面""" + version = "v1.0.1" + app_name = "多媒体投屏 by yqsphp" + icon = "icon.ico" + + self.setWindowTitle(app_name) + self.setGeometry(100, 100, 700, 500) + + # 获取图标路径(支持打包和开发环境) + if getattr(sys, 'frozen', False): + # 打包后的exe环境 + base_path = sys._MEIPASS + else: + # 开发环境 + base_path = os.path.abspath(".") + + icon_path = os.path.join(base_path, icon) + + # 设置应用程序图标(影响任务栏) + self.setWindowIcon(QIcon(icon_path)) + + # Windows专用:设置AppUserModelID(非常重要!) + if sys.platform == "win32": + try: + from ctypes import windll + # 唯一ID,格式:公司名.程序名.版本 + myappid = app_name + " " + version + windll.shell32.SetCurrentProcessExplicitAppUserModelID(myappid) + except ImportError: + pass + + # 创建中央部件 + central_widget = QWidget() + self.setCentralWidget(central_widget) + + # 主布局 + main_layout = QVBoxLayout(central_widget) + main_layout.setSpacing(10) + main_layout.setContentsMargins(10, 10, 10, 10) + + # 水平布局用于左右面板 + horizontal_layout = QHBoxLayout() + horizontal_layout.setSpacing(10) + + # 左侧控制面板 + left_panel = self.create_left_panel() + horizontal_layout.addWidget(left_panel, 1) + + # 右侧信息面板 + # right_panel = self.create_right_panel() + # horizontal_layout.addWidget(right_panel, 1) + + # 将水平布局添加到垂直布局中 + main_layout.addLayout(horizontal_layout) + + # 状态栏 + self.statusBar().showMessage("准备就绪") + + # 添加弹性空间 + main_layout.addStretch() + + # 底部版权信息 + copyright_label = QLabel(f"© {app_name} {version}") + copyright_label.setAlignment(Qt.AlignCenter) + copyright_label.setStyleSheet(""" + color: #999; + font-size: 11px; + padding: 15px 0 5px 0; + border-top: 1px solid #eee; + margin-top: 10px; + """) + main_layout.addWidget(copyright_label) + + # 设置扁平化样式 + self.setStyleSheet(""" + /* 主窗口 */ + QMainWindow { + background-color: #f5f5f5; + font-family: 'Microsoft YaHei', 'Segoe UI', sans-serif; + } + + /* 分组框 - 扁平化 */ + QGroupBox { + font-size: 13px; + border: 1px solid #ddd; + border-radius: 6px; + margin-top: 8px; + padding-top: 8px; + background-color: white; + } + + QGroupBox::title { + subcontrol-origin: margin; + left: 10px; + padding: 0 8px 0 8px; + color: #333; + } + + /* 按钮 - 扁平化 */ + QPushButton { + border: none; + border-radius: 4px; + padding: 13px 10px; + font-size: 15px; + color: white; + background-color: #6c757d; + } + QPushButton:disabled { + color: #999; + background-color: #f5f5f5; + } + + /* 特殊按钮样式 */ + #browseBtn{ + background:white; + border: 1px solid #ddd; + color:blank; + } + + #refreshBtn { + background-color: #28a745; + color: white; + border: none; + } + + #startCastBtn { + background-color: #007bff; + color: white; + border: none; + } + + #pauseCastBtn { + background-color: #ffc107; + color: white; + border: none; + } + + #stopCastBtn { + background-color: #dc3545; + color: white; + border: none; + } + /*音量图标*/ + #muteBtn{ + background-color:none; + } + /* 列表控件 */ + QListWidget { + border: 1px solid #ddd; + border-radius: 4px; + background-color: white; + padding: 4px; + } + + QListWidget::item { + padding: 6px 8px; + border-radius: 4px; + border-bottom: 1px solid #f0f0f0; + } + + QListWidget::item:selected { + color: white; + background-color: #007bff; + } + + /* 文本编辑框 */ + QTextEdit { + border: 1px solid #ddd; + border-radius: 4px; + background-color: white; + font-size: 11.5px; + padding: 0 4px; + } + + /* 标签 */ + QLabel { + color: #333; + font-size: 12px; + } + + /* 输入框 */ + QLineEdit { + border: 1px solid #ddd; + border-radius: 4px; + padding: 0 8px; + font-size: 12px; + } + """) + + def create_left_panel(self): + """创建左侧控制面板(文件和设备控制合并)""" + panel = QWidget() + layout = QVBoxLayout(panel) + layout.setSpacing(10) + + # ==================== 系统信息区域 ==================== + system_group = QGroupBox("系统信息") + system_layout = QVBoxLayout() + system_layout.setSpacing(8) + + # 获取本机IP + try: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.connect(("8.8.8.8", 80)) + local_ip = s.getsockname()[0] + s.close() + except: + local_ip = "获取失败" + + # 服务器状态 + server_frame = QFrame() + server_layout = QHBoxLayout(server_frame) + + server_layout.addWidget(QLabel("文件服务:")) + self.ip_label = QLabel(f"{local_ip}:{self.http_port}") + server_layout.addWidget(self.ip_label, 1) + + system_layout.addWidget(server_frame) + + # 文件服务状态 + self.file_service_label = QLabel("就绪") + self.file_service_label.setStyleSheet("color: #28a745;") + server_layout.addWidget(self.file_service_label, 1) + + system_group.setLayout(system_layout) + + layout.addWidget(system_group) + + # ==================== 文件和设备控制区域 ==================== + control_group = QGroupBox("文件与设备控制") + control_layout = QVBoxLayout() + control_layout.setSpacing(12) + + # 文件选择区域 + file_frame = QFrame() + file_layout = QHBoxLayout(file_frame) + file_layout.setSpacing(8) + + self.file_label = QLabel("未选择文件") + self.file_label.setWordWrap(True) + self.file_label.setMinimumHeight(25) + self.file_label.setStyleSheet(""" + border: 1px solid #dee2e6; + border-radius: 4px; + padding-left: 5px; + """) + file_layout.addWidget(self.file_label, 1) + + browse_btn = QPushButton("浏览文件") + browse_btn.setCursor(Qt.PointingHandCursor) + browse_btn.setFixedWidth(80) + browse_btn.setObjectName("browseBtn") + browse_btn.clicked.connect(self.browse_file) + file_layout.addWidget(browse_btn) + + control_layout.addWidget(file_frame) + + # 设备列表区域 + device_frame = QFrame() + device_layout = QVBoxLayout(device_frame) + device_layout.setSpacing(8) + + # 设备列表标题和刷新按钮 + device_header = QHBoxLayout() + device_header.addWidget(QLabel("可用设备列表:")) + device_header.addStretch() + device_layout.addLayout(device_header) + + # 设备列表 + self.device_list = QListWidget() + self.device_list.itemClicked.connect(self.select_device_from_list) + self.device_list.setMinimumHeight(100) + device_layout.addWidget(self.device_list) + + control_layout.addWidget(device_frame) + + # 播放控制按钮 + playback_frame = QFrame() + playback_layout = QHBoxLayout(playback_frame) + playback_layout.setSpacing(8) + + self.refresh_btn = QPushButton("刷新设备") + self.refresh_btn.setObjectName("refreshBtn") + self.refresh_btn.setCursor(Qt.PointingHandCursor) + self.refresh_btn.clicked.connect(self.refresh_devices) + + self.start_btn = QPushButton("开始投屏") + self.start_btn.setObjectName("startCastBtn") + self.start_btn.setCursor(Qt.PointingHandCursor) + self.start_btn.clicked.connect(self.start_casting) + + self.pause_btn = QPushButton("暂停投屏") + self.pause_btn.setObjectName("pauseCastBtn") + self.pause_btn.setCursor(Qt.PointingHandCursor) + self.pause_btn.clicked.connect(self.pause_casting) + self.pause_btn.setEnabled(False) + + self.stop_btn = QPushButton("结束投屏") + self.stop_btn.setObjectName("stopCastBtn") + self.stop_btn.setCursor(Qt.PointingHandCursor) + self.stop_btn.clicked.connect(self.stop_casting) + self.stop_btn.setEnabled(False) + + playback_layout.addWidget(self.refresh_btn) + playback_layout.addWidget(self.start_btn) + playback_layout.addWidget(self.pause_btn) + playback_layout.addWidget(self.stop_btn) + + control_layout.addWidget(playback_frame) + + # 音量控制 + volume_frame = QFrame() + volume_layout = QHBoxLayout(volume_frame) + volume_layout.setSpacing(8) + + volume_layout.addWidget(QLabel("音量:")) + self.volume_slider = QSlider(Qt.Horizontal) + self.volume_slider.setRange(0, 100) + self.volume_slider.setValue(50) + self.volume_slider.setCursor(Qt.PointingHandCursor) + self.volume_slider.valueChanged.connect(self.volume_changed) + volume_layout.addWidget(self.volume_slider, 1) + + self.mute_btn = QPushButton("🔊") + self.mute_btn.setCheckable(True) + self.mute_btn.setFixedWidth(40) + self.mute_btn.setObjectName("muteBtn") + self.mute_btn.clicked.connect(self.toggle_mute) + volume_layout.addWidget(self.mute_btn) + + control_layout.addWidget(volume_frame) + + control_group.setLayout(control_layout) + layout.addWidget(control_group, 1) # 主要区域占据更多空间 + + layout.addStretch() + return panel + + def create_right_panel(self): + """创建右侧信息面板""" + panel = QWidget() + layout = QVBoxLayout(panel) + layout.setSpacing(10) + + + # ==================== 系统信息区域 ==================== + system_group = QGroupBox("系统信息") + system_layout = QVBoxLayout() + system_layout.setSpacing(8) + + # 获取本机IP + try: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.connect(("8.8.8.8", 80)) + local_ip = s.getsockname()[0] + s.close() + except: + local_ip = "获取失败" + + # 服务器状态 + server_frame = QFrame() + server_layout = QHBoxLayout(server_frame) + + server_layout.addWidget(QLabel("服务地址:")) + self.ip_label = QLabel(f"{local_ip}:{self.http_port}") + server_layout.addWidget(self.ip_label, 1) + + system_layout.addWidget(server_frame) + # 文件服务状态 + service_frame = QFrame() + service_layout = QHBoxLayout(service_frame) + + service_layout.addWidget(QLabel("文件服务:")) + self.file_service_label = QLabel("就绪") + self.file_service_label.setStyleSheet("color: #28a745;") + service_layout.addWidget(self.file_service_label, 1) + + system_layout.addWidget(service_frame) + + system_group.setLayout(system_layout) + + layout.addWidget(system_group) + # ==================== 操作日志区域 ==================== + log_group = QGroupBox("操作日志") + log_layout = QVBoxLayout() + log_layout.setSpacing(2) + + self.log_text = QTextEdit() + self.log_text.setReadOnly(True) + self.log_text.setMinimumHeight(250) + + # 添加初始日志 + self.log_message("系统", "智能媒体投屏器已启动") + self.log_message("系统", f"本地IP地址: {local_ip}") + + log_layout.addWidget(self.log_text, 1) + + # 日志控制按钮 + log_btn_layout = QHBoxLayout() + log_btn_layout.setSpacing(8) + + clear_log_btn = QPushButton("清空日志") + clear_log_btn.clicked.connect(self.log_text.clear) + + log_btn_layout.addWidget(clear_log_btn) + log_btn_layout.addStretch() + + log_layout.addLayout(log_btn_layout) + + log_group.setLayout(log_layout) + layout.addWidget(log_group, 1) # 日志区域占据更多空间 + + layout.addStretch() + + return panel + + def init_connections(self): + """初始化信号连接""" + # 设备发现 + self.refresh_devices() + + # 启动HTTP文件服务器 + self.start_http_server() + + def start_http_server(self): + """启动HTTP文件服务器""" + try: + self.http_server = HTTPFileServer(self.http_port) + if self.http_server.start(): + self.log_message("服务器", "HTTP文件服务器启动成功") + self.file_service_label.setText("运行中") + self.file_service_label.setStyleSheet("color: #20c997; font-weight: bold;") + else: + self.log_message("服务器", "HTTP文件服务器启动失败", "error") + self.file_service_label.setText("异常") + self.file_service_label.setStyleSheet("color: #dc3545; font-weight: bold;") + except Exception as e: + self.log_message("服务器", f"启动HTTP服务器出错: {e}", "error") + + def enable_refresh_button(self): + """启用刷新按钮""" + self.refresh_btn.setEnabled(True) + self.refresh_btn.setText("刷新设备") + + def on_discovery_error(self, error_message): + """设备发现错误""" + self.log_message("设备发现", f"搜索失败: {error_message}", "error") + self.enable_refresh_button() + QMessageBox.warning(self, "设备发现失败", f"搜索设备时出错:\n{error_message}") + + def refresh_devices(self): + """刷新设备列表""" + self.log_message("设备发现", "开始搜索网络中的投屏设备...", "info") + self.refresh_btn.setEnabled(False) + self.refresh_btn.setText("搜索中...") + + # 在线程中进行,避免阻塞UI + def do_quick_discovery(): + if not self.device_discovery: + self.device_discovery = DeviceDiscovery() + # 获取刷新后设备信息 + new_device = self.device_discovery.discover_media_renderers() + self.update_device_list(new_device, self.selected_device) + # 搜索完成后启用按钮 + self.enable_refresh_button() + + # 确保在UI线程中更新 + thread = threading.Thread(target=do_quick_discovery, daemon=True) + thread.start() + + def update_device_list(self, new_devices, current_selected_device=None): + """ + 更新设备列表显示,保留选中状态(使用UDN作为唯一标识) + Args: + new_devices: 搜索的设备 + current_selected_device:当前选中的设备 + """ + #当前选中设备 + current_selected_udn = None + if current_selected_device: + current_selected_udn = current_selected_device.get('udn') + + # 创建新设备字典,以UDN为键 + new_device_dict = {} + for device in new_devices: + device_udn = device.get('udn') + if device_udn: # 只有有UDN的设备才处理 + new_device_dict[device_udn] = device + + # 获取之前列表中所有设备的UDN + current_device_udn_list = [] + for i in range(self.device_list.count()): + item = self.device_list.item(i) + device = item.data(Qt.UserRole) + device_udn = device.get('udn') + if device_udn: + current_device_udn_list.append(device_udn) + + # 移除不存在的设备 + items_to_remove = [] + for i in range(self.device_list.count()): + item = self.device_list.item(i) + device = item.data(Qt.UserRole) + + # 获取设备的标识(优先使用UDN,没有则用IP) + device_udn = device.get('udn') + # 如果设备标识不在新设备列表中,标记为删除 + if device_udn not in new_device_dict: + items_to_remove.append(i) + + # 从后往前删除,避免索引变化 + for index in reversed(items_to_remove): + self.device_list.takeItem(index) + # 如果删除的是当前选中的设备,清空选中状态 + if item and item == self.device_list.currentItem(): + self.device_list.clearSelection() + self.selected_device = None + + # 更新或添加设备 + current_selected_item = None + + for device in new_devices: + # 获取设备标识(优先使用UDN,没有则用IP) + device_identifier = device.get('udn') + + device_name = device.get('friendly_name', '未知设备') + device_ip = device.get('ip', '未知IP') + + # 创建显示文本 + if device.get('udn'): + # 简化UDN显示,只显示最后一部分 + udn_parts = device_identifier.split(':') + udn_display = udn_parts[-1][:8] if len(udn_parts) > 1 else device_identifier[:8] + item_text = f"{device_name} [{device_ip}]" + else: + item_text = f"{device_name} [{device_ip}] (无UDN)" + + # 检查设备是否已存在 + found_item = None + for i in range(self.device_list.count()): + item = self.device_list.item(i) + existing_device = item.data(Qt.UserRole) + + # 比较设备标识 + existing_identifier = existing_device.get('udn') + if not existing_identifier: + existing_identifier = existing_device.get('ip', '') + + if existing_identifier == device_identifier: + found_item = item + break + + if found_item: + # 设备已存在,更新显示文本和设备数据 + if found_item.text() != item_text: + found_item.setText(item_text) + # 更新设备数据(可能有新信息) + found_item.setData(Qt.UserRole, device) + + # 检查是否是之前选中的设备 + if current_selected_udn and device.get('udn') == current_selected_udn: + current_selected_item = found_item + elif current_selected_device and device.get('ip') == current_selected_device.get('ip'): + current_selected_item = found_item + else: + # 设备不存在,添加新项目 + item = QListWidgetItem(item_text) + item.setData(Qt.UserRole, device) + self.device_list.addItem(item) + + # 检查是否是之前选中的设备 + if current_selected_udn and device.get('udn') == current_selected_udn: + current_selected_item = item + elif current_selected_device and device.get('ip') == current_selected_device.get('ip'): + current_selected_item = item + + # 恢复选中状态 + if current_selected_item: + self.device_list.setCurrentItem(current_selected_item) + self.selected_device = current_selected_item.data(Qt.UserRole) + + device_name = self.selected_device.get('friendly_name', '未知设备') + self.log_message("设备选择", f"已恢复选择设备: {device_name}") + elif self.device_list.count() > 0 and self.device_list.currentItem() is None: + # 如果之前选中的设备不存在了,清空选中 + self.device_list.clearSelection() + self.selected_device = None + elif self.device_list.count() == 0: + # 没有设备 + self.selected_device = None + + # 更新设备数量统计 + device_count = self.device_list.count() + if device_count > 0: + self.log_message("设备发现", f"发现 {device_count} 个投屏设备") + + # 如果有选中的设备,启用控制按钮 + if self.selected_device: + self.enable_control_buttons(True) + device_name = self.selected_device.get('friendly_name', '未知设备') + self.log_message("设备选择", f"当前选中: {device_name}") + else: + self.log_message("设备发现", "未发现任何投屏设备", "warning") + self.selected_device = None + self.enable_control_buttons(False) + + def select_device_from_list(self, item): + """从列表中选择设备""" + device = item.data(Qt.UserRole) + if device: + self.selected_device = device + + self.log_message("设备选择", f"已选择设备: {device.get('friendly_name', '未知')}") + + # 启用控制按钮 + self.enable_control_buttons(True) + + def browse_file(self): + """浏览并选择媒体文件""" + file_dialog = QFileDialog() + file_dialog.setFileMode(QFileDialog.ExistingFile) + + # 设置文件过滤器 + file_dialog.setNameFilter( + "媒体文件 (*.mp4 *.avi *.mkv *.mov *.mp3 *.wav *.flac *.wmv *.flv *.webm " + "*.mpeg *.mpg *.m4v *.3gp *.m4a *.aac *.ogg *.wma *.ape *.alac " + "*.amr *.opus *.ac3 *.dts *.ts *.m2ts *.vob *.ogv *.asf *.rm " + "*.rmvb *.divx *.xvid *.f4v *.m2v *.mpe *.mp2 *.mp1 *.ra *.ram " + "*.mid *.midi *.cda *.gsm *.vox *.voc *.dvf *.raw *.mka *.tta " + "*.spx *.8svx *.aiff *.au *.snd *.paf *.svx *.nist *.ircam " + "*.voc *.smp *.vox *.sou);;" + ) + + if file_dialog.exec_(): + files = file_dialog.selectedFiles() + if files: + self.selected_file = files[0] + file_name = os.path.basename(self.selected_file) + self.file_label.setText(file_name) + #选择文件后启用播放按钮 + self.start_btn.setEnabled(True) + self.log_message("文件选择", f"已选择文件: {file_name}") + + def start_casting(self): + """开始投屏""" + if not self.selected_device : + self.log_message("投屏", "请先选择设备", "error") + return + if not self.selected_file: + self.log_message("投屏", "请先选择文件", "error") + return + try: + # 如果是第一次播放,需要创建DLNA控制器并设置URI + if not self.dlna_controller: + # 创建DLNA控制器 + self.dlna_controller = DLNAController(self.selected_device) + + # 启动HTTP服务器并获取文件URL + file_name = os.path.basename(self.selected_file) + dir_bool = self.http_server.set_root_directory(self.selected_file) + self.log_message("设置服务路径", "成功" if dir_bool else "失败") + + # 获取IP地址(从IP标签中提取) + ip_text = self.ip_label.text() + file_url = f"http://{ip_text}/{file_name}" + self.log_message("设置播放地址", file_url) + + # 设置播放URI并开始播放 + if self.dlna_controller.set_av_transport_uri(file_url): + self.dlna_controller.play() + self.start_btn.setEnabled(False) + self.pause_btn.setEnabled(True) + self.stop_btn.setEnabled(True) + + self.log_message("投屏", f"开始投屏: {file_name}") + self.statusBar().showMessage(f"正在投屏: {file_name}") + self.is_paused = False # 确保不是暂停状态 + else: + QMessageBox.critical(self, "错误", "投屏失败,请检查设备连接") + return + else: + # 如果已经创建了DLNA控制器,可能是从暂停状态恢复 + if self.is_paused: + # 从暂停状态恢复播放 + if self.dlna_controller.play(): + self.is_paused = False + self.start_btn.setEnabled(False) + self.pause_btn.setEnabled(True) + self.stop_btn.setEnabled(True) + self.log_message("投屏", "继续播放") + else: + QMessageBox.critical(self, "错误", "继续播放失败") + else: + # 如果不是暂停状态,说明是停止后的重新播放 + # 需要重新设置URI + file_name = os.path.basename(self.selected_file) + ip_text = self.ip_label.text() + file_url = f"http://{ip_text}/{file_name}" + + if self.dlna_controller.set_av_transport_uri(file_url): + self.dlna_controller.play() + self.start_btn.setEnabled(False) + self.pause_btn.setEnabled(True) + self.stop_btn.setEnabled(True) + + self.log_message("投屏", f"重新开始投屏: {file_name}") + self.statusBar().showMessage(f"正在投屏: {file_name}") + else: + QMessageBox.critical(self, "错误", "重新投屏失败") + + except Exception as e: + QMessageBox.critical(self, "错误", f"投屏时出错: {str(e)}") + self.log_message("投屏", f"投屏失败: {str(e)}", "error") + + def pause_casting(self): + """暂停投屏""" + if self.dlna_controller: + if self.dlna_controller.pause(): + self.is_paused = True # 标记为暂停状态 + + self.start_btn.setText("▶ 继续播放") + self.start_btn.setEnabled(True) + self.pause_btn.setEnabled(False) + + self.log_message("投屏", "已暂停播放") + + def stop_casting(self): + """停止投屏""" + if self.dlna_controller: + if self.dlna_controller.stop(): + self.is_paused = False # 重置暂停状态 + + self.start_btn.setText("▶ 开始投屏") + self.start_btn.setEnabled(True) + self.pause_btn.setEnabled(False) + self.stop_btn.setEnabled(False) + + self.log_message("投屏", "已停止播放") + + def volume_changed(self, value): + """音量改变""" + if self.dlna_controller: + self.dlna_controller.set_volume(value) + + def toggle_mute(self, checked): + """切换静音""" + if self.dlna_controller: + self.dlna_controller.set_mute(checked) + icon = "🔇" if checked else "🔊" + self.mute_btn.setText(icon) + + status = "静音" if checked else "取消静音" + self.log_message("控制", f"{status}") + + def log_message(self, category, message, level="info"): + """记录日志消息""" + # timestamp = datetime.now().strftime("%H:%M:%S") + # + # if level == "error": + # color = "#dc3545" + # prefix = "❌" + # elif level == "warning": + # color = "#ffc107" + # prefix = "⚠️" + # else: + # color = "#0d6efd" + # prefix = "ℹ️" + # + # log_entry = f'[{timestamp}]{category}: {prefix} {message}
' + # + # self.log_text.insertHtml(log_entry) + # + # # 自动滚动到底部 + # scrollbar = self.log_text.verticalScrollBar() + # scrollbar.setValue(scrollbar.maximum()) + + # 同时输出到状态栏 + if level == "error": + self.statusBar().showMessage(f"错误: {message}", 1000) + elif level == "warning": + self.statusBar().showMessage(f"警告: {message}", 1000) + else: + self.statusBar().showMessage(message, 1000) + + def enable_control_buttons(self, enabled): + """启用/禁用控制按钮""" + self.pause_btn.setEnabled(False) + self.stop_btn.setEnabled(False) + self.volume_slider.setEnabled(enabled) + self.mute_btn.setEnabled(enabled) + + def closeEvent(self, event): + """窗口关闭事件""" + # 停止HTTP服务器 + if self.http_server: + self.http_server.stop() + + # 停止DLNA控制器 + if self.dlna_controller: + try: + self.dlna_controller.stop() + except: + pass + + self.log_message("系统", "应用程序已关闭") + event.accept() \ No newline at end of file