媒体投屏初次提交

This commit is contained in:
yqsphp
2026-01-12 10:48:20 +08:00
commit d084e84360
12 changed files with 2911 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
/.idea

70
README.md Normal file
View File

@@ -0,0 +1,70 @@
# 媒体投屏器 (Media Caster)
<p align="center">
<img src="docs/images/logo.png" alt="媒体投屏器 Logo" width="200">
<br>
<em>一个优雅、高效的本地媒体投屏工具</em>
</p>
<p align="center">
<a href="#-功能特性">功能特性</a> •
<a href="#-快速开始">快速开始</a> •
<a href="#-使用指南">使用指南</a> •
<a href="#-技术架构">技术架构</a> •
<a href="#-开发指南">开发指南</a>
</p>
<p align="center">
<img src="https://img.shields.io/badge/Python-3.7+-blue.svg" alt="Python版本">
<img src="https://img.shields.io/badge/PyQt5-5.15-green.svg" alt="PyQt5">
<img src="https://img.shields.io/badge/License-MIT-yellow.svg" alt="许可证">
<img src="https://img.shields.io/badge/Platform-Windows%20%7C%20macOS%20%7C%20Linux-lightgrey.svg" alt="平台支持">
</p>
## 🌟 简介
媒体投屏器是一款基于 Python 和 PyQt5 开发的高性能媒体投屏工具,支持将本地音视频文件通过 DLNA/UPnP 协议投屏到智能电视、投影仪等设备。
### 主要特点
- 🎯 **一键投屏**:简单三步完成媒体投屏
- 📱 **设备自动发现**:智能扫描局域网内的投屏设备
-**高性能传输**:内置 HTTP 服务器,流畅播放体验
## ✨ 功能特性
### 🚀 核心功能
| 功能 | 描述 | 状态 |
|------|------|------|
| **设备发现** | 自动扫描局域网内 DLNA/UPnP 设备 | ✅ |
| **文件浏览** | 支持多种音视频格式选择 | ✅ |
| **投屏控制** | 播放、暂停、停止、音量控制 | ✅ |
| **音量调节** | 滑块控制、静音切换 | ✅ |
| **多设备支持** | 同时发现和管理多个设备 | ✅ |
### 🖥️ 系统要求
- **操作系统**: Windows 10/11, macOS 10.15+, Ubuntu 18.04+
- **Python版本**: Python 3.7 或更高版本
- **内存要求**: 最少 4GB RAM
- **网络要求**: 设备与电脑需在同一局域网
## 🚀 快速开始
### 安装方法
#### 方法一:从源代码安装(推荐)
```bash
# 1. 克隆仓库
git clone https://github.com/yqsphp/media-caster.git
cd media-caster
# 2. 创建虚拟环境
python -m venv venv
# Windows
venv\Scripts\activate
# Linux/Mac
source venv/bin/activate
# 3. 运行程序
python main.py

21
__init__.py Normal file
View File

@@ -0,0 +1,21 @@
# __init__.py (项目根目录)
"""
本地媒体投屏应用
此应用支持将本地媒体文件投屏到投影仪、电视等设备。
支持多协议自动检测包括DLNA、Chromecast等设备。
"""
__version__ = "1.0.1"
__author__ = "Local Cast"
__description__ = "本地媒体投屏工具"
# 定义公开的接口
__all__ = [
"DeviceDiscovery",
"MainWindow"
]
# 导入核心功能,便于直接访问
from core.device_discovery import *
from ui.main_window import *

15
core/__init__.py Normal file
View File

@@ -0,0 +1,15 @@
# core/__init__.py
"""
核心功能模块
包含设备发现、投屏服务和设备管理等功能类。
"""
from .logger import AppLogger
from .device_discovery import DeviceDiscovery
from .dlna_controller import DLNAController
from .http_file_server import HTTPFileServer
__all__ = [
"AppLogger",
"DeviceDiscovery",
"DLNAController",
"HTTPFileServer"
]

488
core/device_discovery.py Normal file
View File

@@ -0,0 +1,488 @@
import socket
import time
import urllib.request
import urllib.error
import xml.etree.ElementTree as ET
from core.logger import AppLogger
class DeviceDiscovery:
"""
DLNA/UPnP 媒体渲染器发现类
用于发现局域网中的投屏设备
"""
def __init__(self):
"""
初始化设备发现器
"""
self.logger = AppLogger('DeviceDiscovery')
self.discovered_devices = []
def discover_media_renderers(self, timeout=3):
"""
发现局域网中的媒体渲染器(投屏设备)
只返回 deviceType 为 urn:schemas-upnp-org:device:MediaRenderer:1 的设备
通过 UUID 去重
Args:
timeout: 搜索超时时间(秒)
Returns:
list: 发现的设备列表
"""
# 首先检查网络连接
try:
test_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
test_sock.settimeout(1)
test_sock.sendto(b'test', ('239.255.255.250', 1900))
test_sock.recvfrom(1024)
test_sock.close()
except socket.timeout:
self.logger.warning("网络可能没有响应,检查防火墙设置")
except Exception as e:
self.logger.warning(f"网络测试异常: {e}")
devices_by_uuid = {} # 用UUID去重
pending_devices = {} # 待处理的设备key为locationvalue为(ip, headers)
# SSDP 发现消息
ssdp_msg = (
"M-SEARCH * HTTP/1.1\r\n"
"HOST: 239.255.255.250:1900\r\n"
"MAN: \"ssdp:discover\"\r\n"
"MX: 3\r\n"
"ST: ssdp:all\r\n"
"USER-AGENT: Python DLNA Discovery\r\n"
"\r\n"
)
# 创建 UDP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
sock.bind(('0.0.0.0', 0))
# 发送发现请求
sock.sendto(ssdp_msg.encode(), ('239.255.255.250', 1900))
sock.setblocking(0) # 设置为非阻塞模式
self.logger.info(f"开始搜索媒体渲染器,超时时间: {timeout}秒...")
# 使用 select 监听socket避免while循环
import select
start_time = time.time()
response_count = 0
while True:
# 计算剩余超时时间
remaining_time = timeout - (time.time() - start_time)
if remaining_time <= 0:
break
# 使用select监听socket
ready_to_read, _, _ = select.select([sock], [], [], remaining_time)
if sock in ready_to_read:
try:
data, addr = sock.recvfrom(4096)
response_count += 1
response = data.decode('utf-8', errors='ignore')
# 解析SSDP响应头
lines = response.split('\r\n')
headers = {}
for line in lines:
if ':' in line:
key, value = line.split(':', 1)
headers[key.strip().upper()] = value.strip()
# 检查是否有LOCATION头
location = headers.get('LOCATION')
if location:
pending_devices[location] = (addr[0], headers)
except socket.error as e:
self.logger.debug(f"接收数据时出错: {e}")
continue
else:
# select 超时
break
sock.close()
# 并发获取设备详情
if pending_devices:
from concurrent.futures import ThreadPoolExecutor, as_completed
self.logger.info(f"收到{response_count}个SSDP响应准备并发获取{len(pending_devices)}个设备详情...")
with ThreadPoolExecutor(max_workers=min(10, len(pending_devices))) as executor:
# 提交所有设备详情获取任务
future_to_location = {
executor.submit(self._get_device_details, location, ip, headers): location
for location, (ip, headers) in pending_devices.items()
}
# 收集结果
for future in as_completed(future_to_location):
try:
device_info = future.result(timeout=5)
if device_info:
uuid = device_info.get('udn', '')
if uuid and uuid not in devices_by_uuid:
devices_by_uuid[uuid] = device_info
device_name = device_info.get('friendly_name', '未知')
self.logger.info(f"发现新设备: {device_name} (UUID: {uuid[:20]}...)")
except Exception as e:
location = future_to_location[future]
self.logger.debug(f"获取设备详情失败 {location}: {e}")
self.logger.info(f"搜索完成: 发现{len(devices_by_uuid)}个媒体渲染器")
self.discovered_devices = list(devices_by_uuid.values())
return self.discovered_devices
def _get_device_details(self, location, ip, headers):
"""
获取单个设备的详细信息(提取出来便于并发调用)
Args:
location: 设备描述文档URL
ip: 设备IP地址
headers: SSDP响应头
Returns:
dict: 设备信息如果不是MediaRenderer则返回None
"""
usn = headers.get('USN', '')
st = headers.get('ST', headers.get('NT', ''))
try:
# 获取设备描述文档
req = urllib.request.Request(location)
req.add_header('User-Agent', 'Python DLNA Client')
req.timeout = 3
with urllib.request.urlopen(req, timeout=3) as response:
xml_data = response.read().decode('utf-8', errors='ignore')
# 解析 XML
root = ET.fromstring(xml_data)
ns = {'upnp': 'urn:schemas-upnp-org:device-1-0'}
# 提取设备信息
device = root.find('.//upnp:device', ns)
if device is None:
self.logger.debug(f"{location} 未找到设备信息")
return None
# 检查设备类型
device_type_elem = device.find('upnp:deviceType', ns)
if device_type_elem is None or device_type_elem.text != 'urn:schemas-upnp-org:device:MediaRenderer:1':
return None # 不是 MediaRenderer 设备,跳过
# 提取设备信息
friendly_name_elem = device.find('upnp:friendlyName', ns)
manufacturer_elem = device.find('upnp:manufacturer', ns)
model_name_elem = device.find('upnp:modelName', ns)
model_description_elem = device.find('upnp:modelDescription', ns)
udn_elem = device.find('upnp:UDN', ns)
# 提取服务列表
services = []
service_list = root.find('.//upnp:serviceList', ns)
if service_list is not None:
for service in service_list.findall('upnp:service', ns):
service_type_elem = service.find('upnp:serviceType', ns)
if service_type_elem is not None:
services.append(service_type_elem.text)
device_info = {
'ip': ip,
'location': location,
'server': headers.get('SERVER', ''),
'usn': usn,
'st': st,
'friendly_name': friendly_name_elem.text if friendly_name_elem is not None else '',
'manufacturer': manufacturer_elem.text if manufacturer_elem is not None else '',
'model_name': model_name_elem.text if model_name_elem is not None else '',
'model_description': model_description_elem.text if model_description_elem is not None else '',
'device_type': device_type_elem.text,
'udn': udn_elem.text if udn_elem is not None else '',
'services': services,
'response_time': time.strftime('%H:%M:%S')
}
self.logger.debug(f"成功获取设备详情: {device_info.get('friendly_name')} ({ip})")
return device_info
except (urllib.error.URLError, socket.timeout, ConnectionError) as e:
self.logger.debug(f"无法访问设备描述文档 {location}: {e}")
return None
except ET.ParseError as e:
self.logger.debug(f"解析设备XML失败: {location}, 错误: {e}")
return None
except Exception as e:
self.logger.debug(f"获取设备详情时出错: {e}")
return None
def _get_device_details_from_response(self, response, ip):
"""
从SSDP响应中获取设备详情只返回 MediaRenderer 设备
Args:
response: SSDP响应数据
ip: 设备IP地址
Returns:
dict: 设备信息如果不是MediaRenderer则返回None
"""
# 解析SSDP响应头
lines = response.split('\r\n')
headers = {}
for line in lines:
if ':' in line:
key, value = line.split(':', 1)
headers[key.strip().upper()] = value.strip()
# 检查是否有LOCATION头
location = headers.get('LOCATION')
if not location:
return None
# 先检查USN是否包含MediaRenderer字样快速过滤
usn = headers.get('USN', '')
st = headers.get('ST', headers.get('NT', ''))
try:
# 获取设备描述文档
req = urllib.request.Request(location)
req.add_header('User-Agent', 'Python DLNA Client')
with urllib.request.urlopen(req, timeout=4) as response:
xml_data = response.read().decode('utf-8', errors='ignore')
# 解析 XML
root = ET.fromstring(xml_data)
ns = {'upnp': 'urn:schemas-upnp-org:device-1-0'}
# 提取设备信息
device = root.find('.//upnp:device', ns)
if device is None:
self.logger.debug(f"{location} 未找到设备信息")
return None
# 检查设备类型
device_type_elem = device.find('upnp:deviceType', ns)
if device_type_elem is None or device_type_elem.text != 'urn:schemas-upnp-org:device:MediaRenderer:1':
return None # 不是 MediaRenderer 设备,跳过
# 提取设备信息
friendly_name_elem = device.find('upnp:friendlyName', ns)
manufacturer_elem = device.find('upnp:manufacturer', ns)
model_name_elem = device.find('upnp:modelName', ns)
model_description_elem = device.find('upnp:modelDescription', ns)
udn_elem = device.find('upnp:UDN', ns)
# 提取服务列表
services = []
service_list = root.find('.//upnp:serviceList', ns)
if service_list is not None:
for service in service_list.findall('upnp:service', ns):
service_type_elem = service.find('upnp:serviceType', ns)
if service_type_elem is not None:
services.append(service_type_elem.text)
device_info = {
'ip': ip,
'location': location,
'server': headers.get('SERVER', ''),
'usn': usn,
'st': st,
'friendly_name': friendly_name_elem.text if friendly_name_elem is not None else '',
'manufacturer': manufacturer_elem.text if manufacturer_elem is not None else '',
'model_name': model_name_elem.text if model_name_elem is not None else '',
'model_description': model_description_elem.text if model_description_elem is not None else '',
'device_type': device_type_elem.text,
'udn': udn_elem.text if udn_elem is not None else '',
'services': services,
'response_time': time.strftime('%H:%M:%S')
}
self.logger.debug(f"成功获取设备详情: {device_info.get('friendly_name')} ({ip})")
return device_info
except urllib.error.URLError as e:
self.logger.debug(f"无法访问设备描述文档 {location}: {e}")
return None
except socket.timeout:
self.logger.debug(f"访问设备描述文档超时: {location}")
return None
except ET.ParseError as e:
self.logger.debug(f"解析设备XML失败: {location}, 错误: {e}")
return None
except Exception as e:
self.logger.debug(f"获取设备详情时出错: {e}")
return None
def display_discovered_devices(self):
"""
显示已发现的媒体渲染器列表
"""
if not self.discovered_devices:
self.logger.info("未发现任何媒体渲染器(投屏设备)")
return
self.logger.info(f"发现 {len(self.discovered_devices)} 个媒体渲染器(投屏设备):")
for i, device in enumerate(self.discovered_devices, 1):
device_name = device.get('friendly_name', '未知')
device_ip = device.get('ip', '未知')
manufacturer = device.get('manufacturer', '未知')
model = device.get('model_name', '未知')
model_desc = device.get('model_description', '')
self.logger.info(f"设备{i}:{device_name},IP地址:{device_ip},制造商:{manufacturer},型号:{model},描述:{model_desc}")
def check_device_isline(self, devices, timeout=2, max_threads=5):
"""
快速检测指定的设备列表是否在线
通过访问设备的 location URL 来检测
Args:
devices: 待检测的设备列表
timeout: 每个设备的检测超时时间(秒)
max_threads: 最大并发线程数默认5个
Returns:
dict: 包含在线设备列表和离线设备列表的字典
"""
if not devices:
self.logger.info("没有需要检测的设备")
return []
from concurrent.futures import ThreadPoolExecutor, as_completed
self.logger.info(f"开始检测 {len(devices)} 个设备是否在线,超时时间: {timeout}")
online_devices = []
def check_single_device(device):
"""检测单个设备是否在线"""
uuid = device.get('udn', '未知')
device_name = device.get('friendly_name', '未知')
location = device.get('location', '')
ip = device.get('ip', '未知')
if not location:
self.logger.debug(f"设备 {device_name} 没有location信息")
return device, False
try:
# 设置超时访问设备的location URL
req = urllib.request.Request(location)
req.add_header('User-Agent', 'Python DLNA Availability Checker')
req.timeout = timeout
with urllib.request.urlopen(req, timeout=timeout) as response:
# 检查响应状态码
if response.status == 200:
self.logger.debug(f"设备 {device_name} ({ip}) 在线")
return device, True
else:
self.logger.debug(f"设备 {device_name} ({ip}) 响应异常: {response.status}")
return device, False
except urllib.error.URLError as e:
self.logger.debug(f"设备 {device_name} ({ip}) 无法访问: {e}")
return device, False
except socket.timeout:
self.logger.debug(f"设备 {device_name} ({ip}) 检测超时")
return device, False
except Exception as e:
self.logger.debug(f"设备 {device_name} ({ip}) 检测出错: {e}")
return device, False
# 使用线程池并发检测设备
with ThreadPoolExecutor(max_workers=max_threads) as executor:
# 提交所有检测任务
future_to_device = {
executor.submit(check_single_device, device): device
for device in devices
}
# 收集结果
for future in as_completed(future_to_device):
try:
device, is_online = future.result(timeout=timeout + 1)
if is_online:
online_devices.append(device)
self.logger.info(f"设备:{device.get('friendly_name')}[{device.get('ip')}]在线")
except Exception as e:
device = future_to_device[future]
self.logger.error(f"检测设备 {device.get('friendly_name', '未知')} 时出错: {e}")
return online_devices
def test_raw_discovery(self, timeout=5):
"""
测试原始的SSDP发现查看所有响应
"""
ssdp_msg = (
"M-SEARCH * HTTP/1.1\r\n"
"HOST: 239.255.255.250:1900\r\n"
"MAN: \"ssdp:discover\"\r\n"
"MX: 3\r\n"
"ST: ssdp:all\r\n" # 搜索所有设备
"USER-AGENT: Python DLNA Discovery\r\n"
"\r\n"
)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
sock.bind(('', 0))
# 发送到所有可能的端口
sock.sendto(ssdp_msg.encode(), ('239.255.255.250', 1900))
sock.settimeout(timeout)
responses = []
start_time = time.time()
while time.time() - start_time < timeout:
try:
data, addr = sock.recvfrom(4096)
response = data.decode('utf-8', errors='ignore')
responses.append((addr, response))
# 打印原始响应
print(f"\n收到来自 {addr} 的响应:")
print(response[:200] + "..." if len(response) > 200 else response)
# 尝试解析USN
lines = response.split('\r\n')
for line in lines:
if 'USN:' in line or 'ST:' in line or 'LOCATION:' in line:
print(f" {line.strip()}")
except socket.timeout:
break
except Exception as e:
print(f"接收错误: {e}")
sock.close()
print(f"\n总共收到 {len(responses)} 个原始响应")
return responses
def main():
discovery = DeviceDiscovery()
devices = discovery.discover_media_renderers()
discovery.display_discovered_devices()
print(devices)
if __name__ == '__main__':
main()

658
core/dlna_controller.py Normal file
View File

@@ -0,0 +1,658 @@
import urllib.parse
import xml.etree.ElementTree as ET
import requests
from urllib.parse import urlparse
from core import AppLogger
class DLNAController:
"""
DLNA 媒体渲染器控制器
只负责控制已发现的设备,不包含发现功能
"""
def __init__(self, device_info):
"""
初始化控制器
Args:
device_info: 设备信息字典,必须包含:
- 'location': 设备描述文档的URL
- 'friendly_name': 设备友好名称 (可选)
- 'ip': 设备IP地址 (可选)
"""
if 'location' not in device_info:
raise ValueError("device_info 必须包含 'location' 字段")
self.device_info = device_info
self.location = device_info['location']
self.base_url = None
self.av_transport_url = None
self.rendering_control_url = None
self.connection_manager_url = None
# 解析设备信息
self._parse_device_info()
self.logger = AppLogger('DLNAController')
def _parse_device_info(self):
"""
解析设备描述文档提取控制URL
"""
try:
# 获取基础URL
parsed = urlparse(self.location)
self.base_url = f"{parsed.scheme}://{parsed.netloc}"
# 获取设备描述文档
response = requests.get(self.location, timeout=5)
response.raise_for_status()
# 解析XML
root = ET.fromstring(response.content)
ns = {'upnp': 'urn:schemas-upnp-org:device-1-0'}
# 查找服务列表
service_list = root.find('.//upnp:device/upnp:serviceList', ns)
if service_list is None:
raise ValueError("设备描述中未找到服务列表")
# 遍历服务提取控制URL
for service in service_list.findall('upnp:service', ns):
service_type_elem = service.find('upnp:serviceType', ns)
control_url_elem = service.find('upnp:controlURL', ns)
if service_type_elem is None or control_url_elem is None:
continue
service_type = service_type_elem.text
control_url = control_url_elem.text
# 确保URL是完整的
if control_url.startswith('/'):
control_url = f"{self.base_url}{control_url}"
if 'AVTransport' in service_type:
self.av_transport_url = control_url
elif 'RenderingControl' in service_type:
self.rendering_control_url = control_url
elif 'ConnectionManager' in service_type:
self.connection_manager_url = control_url
# 验证必要的服务是否存在
if not self.av_transport_url:
raise ValueError("设备不支持 AVTransport 服务")
except requests.exceptions.RequestException as e:
raise ConnectionError(f"无法连接设备: {e}")
except ET.ParseError as e:
raise ValueError(f"设备描述文档格式错误: {e}")
except Exception as e:
raise RuntimeError(f"解析设备信息失败: {e}")
def _send_soap_request(self, service_url, action, body):
"""
发送SOAP请求到设备
Args:
service_url: 服务控制URL
action: SOAP Action
body: SOAP Body内容
Returns:
tuple: (success, response_xml) - 成功标志和响应XML
"""
soap_envelope = f"""<?xml version="1.0" encoding="utf-8"?>
<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/"
s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<s:Body>
{body}
</s:Body>
</s:Envelope>"""
headers = {
'Content-Type': 'text/xml; charset="utf-8"',
'SOAPAction': f'"{action}"',
'User-Agent': 'Python DLNA Controller'
}
try:
response = requests.post(service_url, data=soap_envelope,
headers=headers, timeout=5)
response.raise_for_status()
return True, response.text
except requests.exceptions.Timeout:
return False, "请求超时"
except requests.exceptions.ConnectionError:
return False, "连接失败"
except Exception as e:
return False, str(e)
# ================== AVTransport 服务方法 ==================
def set_av_transport_uri(self, media_url, metadata=""):
"""
设置播放URI
Args:
media_url: 媒体文件的URL
metadata: 媒体元数据(可选)
Returns:
bool: 是否成功
"""
if not self.av_transport_url:
return False
# 转义XML特殊字符
media_url_escaped = media_url.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;')
media_url_escaped = urllib.parse.quote(media_url_escaped,safe=':/')
metadata_escaped = metadata.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;')
soap_body = f"""<u:SetAVTransportURI xmlns:u="urn:schemas-upnp-org:service:AVTransport:1">
<InstanceID>0</InstanceID>
<CurrentURI>{media_url_escaped}</CurrentURI>
<CurrentURIMetaData>{metadata_escaped}</CurrentURIMetaData>
</u:SetAVTransportURI>"""
action = "urn:schemas-upnp-org:service:AVTransport:1#SetAVTransportURI"
success, response = self._send_soap_request(self.av_transport_url, action, soap_body)
self.logger.info(f"媒体播放地址:{media_url_escaped}")
self.logger.info(f"DLNA控制响应{response}")
return success
def play(self, speed="1"):
"""
开始播放
Args:
speed: 播放速度默认为1
Returns:
bool: 是否成功
"""
if not self.av_transport_url:
return False
soap_body = f"""<u:Play xmlns:u="urn:schemas-upnp-org:service:AVTransport:1">
<InstanceID>0</InstanceID>
<Speed>{speed}</Speed>
</u:Play>"""
action = "urn:schemas-upnp-org:service:AVTransport:1#Play"
success, response = self._send_soap_request(self.av_transport_url, action, soap_body)
return success
def pause(self):
"""
暂停播放
Returns:
bool: 是否成功
"""
if not self.av_transport_url:
return False
soap_body = """<u:Pause xmlns:u="urn:schemas-upnp-org:service:AVTransport:1">
<InstanceID>0</InstanceID>
</u:Pause>"""
action = "urn:schemas-upnp-org:service:AVTransport:1#Pause"
success, response = self._send_soap_request(self.av_transport_url, action, soap_body)
return success
def stop(self):
"""
停止播放
Returns:
bool: 是否成功
"""
if not self.av_transport_url:
return False
soap_body = """<u:Stop xmlns:u="urn:schemas-upnp-org:service:AVTransport:1">
<InstanceID>0</InstanceID>
</u:Stop>"""
action = "urn:schemas-upnp-org:service:AVTransport:1#Stop"
success, response = self._send_soap_request(self.av_transport_url, action, soap_body)
return success
def seek(self, unit, target):
"""
跳转到指定位置
Args:
unit: 时间单位,如 "REL_TIME"(相对时间)或 "TRACK_NR"(曲目号)
target: 目标位置,如 "00:05:30""2"第2首
Returns:
bool: 是否成功
"""
if not self.av_transport_url:
return False
target_escaped = target.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;')
soap_body = f"""<u:Seek xmlns:u="urn:schemas-upnp-org:service:AVTransport:1">
<InstanceID>0</InstanceID>
<Unit>{unit}</Unit>
<Target>{target_escaped}</Target>
</u:Seek>"""
action = "urn:schemas-upnp-org:service:AVTransport:1#Seek"
success, response = self._send_soap_request(self.av_transport_url, action, soap_body)
return success
def get_transport_info(self):
"""
获取传输状态信息
Returns:
dict: 传输状态信息,包含:
- state: 当前传输状态
- status: 当前传输状态
- speed: 当前播放速度
None: 获取失败
"""
if not self.av_transport_url:
return None
soap_body = """<u:GetTransportInfo xmlns:u="urn:schemas-upnp-org:service:AVTransport:1">
<InstanceID>0</InstanceID>
</u:GetTransportInfo>"""
action = "urn:schemas-upnp-org:service:AVTransport:1#GetTransportInfo"
success, response = self._send_soap_request(self.av_transport_url, action, soap_body)
if not success:
return None
try:
# 解析响应XML
root = ET.fromstring(response)
ns = {'s': 'http://schemas.xmlsoap.org/soap/envelope/',
'u': 'urn:schemas-upnp-org:service:AVTransport:1'}
# 查找Body中的响应
body = root.find('.//s:Body', ns)
if body is None:
return None
# 获取状态信息
current_transport_state = body.find('.//CurrentTransportState')
current_transport_status = body.find('.//CurrentTransportStatus')
current_speed = body.find('.//CurrentSpeed')
return {
'state': current_transport_state.text if current_transport_state is not None else '未知',
'status': current_transport_status.text if current_transport_status is not None else '未知',
'speed': current_speed.text if current_speed is not None else '未知'
}
except ET.ParseError:
return None
def get_position_info(self):
"""
获取当前位置信息
Returns:
dict: 位置信息,包含:
- track: 当前曲目
- track_duration: 曲目时长
- track_meta_data: 曲目元数据
- track_uri: 曲目URI
- rel_time: 相对时间
- abs_time: 绝对时间
- rel_count: 相对计数
- abs_count: 绝对计数
None: 获取失败
"""
if not self.av_transport_url:
return None
soap_body = """<u:GetPositionInfo xmlns:u="urn:schemas-upnp-org:service:AVTransport:1">
<InstanceID>0</InstanceID>
</u:GetPositionInfo>"""
action = "urn:schemas-upnp-org:service:AVTransport:1#GetPositionInfo"
success, response = self._send_soap_request(self.av_transport_url, action, soap_body)
if not success:
return None
try:
root = ET.fromstring(response)
ns = {'s': 'http://schemas.xmlsoap.org/soap/envelope/',
'u': 'urn:schemas-upnp-org:service:AVTransport:1'}
body = root.find('.//s:Body', ns)
if body is None:
return None
# 提取位置信息
info = {}
fields = [
'Track', 'TrackDuration', 'TrackMetaData', 'TrackURI',
'RelTime', 'AbsTime', 'RelCount', 'AbsCount'
]
for field in fields:
elem = body.find(f'.//{field}')
info[field.lower()] = elem.text if elem is not None else ''
return info
except ET.ParseError:
return None
# ================== RenderingControl 服务方法 ==================
def set_volume(self, volume, channel='Master'):
"""
设置音量
Args:
volume: 音量值 (0-100)
channel: 声道,默认为 'Master'
Returns:
bool: 是否成功
"""
if not self.rendering_control_url:
return False
# 确保音量在0-100范围内
volume = max(0, min(100, int(volume)))
soap_body = f"""<u:SetVolume xmlns:u="urn:schemas-upnp-org:service:RenderingControl:1">
<InstanceID>0</InstanceID>
<Channel>{channel}</Channel>
<DesiredVolume>{volume}</DesiredVolume>
</u:SetVolume>"""
action = "urn:schemas-upnp-org:service:RenderingControl:1#SetVolume"
success, response = self._send_soap_request(self.rendering_control_url, action, soap_body)
return success
def get_volume(self, channel='Master'):
"""
获取当前音量
Args:
channel: 声道,默认为 'Master'
Returns:
int: 音量值 (0-100)None表示获取失败
"""
if not self.rendering_control_url:
return None
soap_body = f"""<u:GetVolume xmlns:u="urn:schemas-upnp-org:service:RenderingControl:1">
<InstanceID>0</InstanceID>
<Channel>{channel}</Channel>
</u:GetVolume>"""
action = "urn:schemas-upnp-org:service:RenderingControl:1#GetVolume"
success, response = self._send_soap_request(self.rendering_control_url, action, soap_body)
if not success:
return None
try:
root = ET.fromstring(response)
ns = {'s': 'http://schemas.xmlsoap.org/soap/envelope/',
'u': 'urn:schemas-upnp-org:service:RenderingControl:1'}
body = root.find('.//s:Body', ns)
if body is None:
return None
current_volume = body.find('.//CurrentVolume')
if current_volume is not None and current_volume.text:
return int(current_volume.text)
except (ET.ParseError, ValueError):
pass
return None
def set_mute(self, mute, channel='Master'):
"""
设置静音
Args:
mute: 是否静音 (True/False)
channel: 声道,默认为 'Master'
Returns:
bool: 是否成功
"""
if not self.rendering_control_url:
return False
desired_mute = "1" if mute else "0"
soap_body = f"""<u:SetMute xmlns:u="urn:schemas-upnp-org:service:RenderingControl:1">
<InstanceID>0</InstanceID>
<Channel>{channel}</Channel>
<DesiredMute>{desired_mute}</DesiredMute>
</u:SetMute>"""
action = "urn:schemas-upnp-org:service:RenderingControl:1#SetMute"
success, response = self._send_soap_request(self.rendering_control_url, action, soap_body)
return success
def get_mute(self, channel='Master'):
"""
获取静音状态
Args:
channel: 声道,默认为 'Master'
Returns:
bool: 是否静音None表示获取失败
"""
if not self.rendering_control_url:
return None
soap_body = f"""<u:GetMute xmlns:u="urn:schemas-upnp-org:service:RenderingControl:1">
<InstanceID>0</InstanceID>
<Channel>{channel}</Channel>
</u:GetMute>"""
action = "urn:schemas-upnp-org:service:RenderingControl:1#GetMute"
success, response = self._send_soap_request(self.rendering_control_url, action, soap_body)
if not success:
return None
try:
root = ET.fromstring(response)
ns = {'s': 'http://schemas.xmlsoap.org/soap/envelope/',
'u': 'urn:schemas-upnp-org:service:RenderingControl:1'}
body = root.find('.//s:Body', ns)
if body is None:
return None
current_mute = body.find('.//CurrentMute')
if current_mute is not None and current_mute.text:
return current_mute.text == "1"
except ET.ParseError:
pass
return None
# ================== ConnectionManager 服务方法 ==================
def get_protocol_info(self):
"""
获取设备支持的协议信息
Returns:
dict: 协议信息,包含:
- source: 源协议信息
- sink: 接收协议信息
None: 获取失败
"""
if not self.connection_manager_url:
return None
soap_body = """<u:GetProtocolInfo xmlns:u="urn:schemas-upnp-org:service:ConnectionManager:1">
</u:GetProtocolInfo>"""
action = "urn:schemas-upnp-org:service:ConnectionManager:1#GetProtocolInfo"
success, response = self._send_soap_request(self.connection_manager_url, action, soap_body)
if not success:
return None
try:
root = ET.fromstring(response)
ns = {'s': 'http://schemas.xmlsoap.org/soap/envelope/',
'u': 'urn:schemas-upnp-org:service:ConnectionManager:1'}
body = root.find('.//s:Body', ns)
if body is None:
return None
source = body.find('.//Source')
sink = body.find('.//Sink')
return {
'source': source.text if source is not None else '',
'sink': sink.text if sink is not None else ''
}
except ET.ParseError:
return None
# ================== 高级方法 ==================
def play_media(self, media_url, metadata="", wait=1):
"""
播放媒体文件(组合操作)
Args:
media_url: 媒体文件URL
metadata: 媒体元数据
wait: 设置URI后的等待时间
Returns:
bool: 是否成功
"""
import time
if self.set_av_transport_uri(media_url, metadata):
time.sleep(wait)
return self.play()
return False
def get_device_status(self):
"""
获取设备完整状态
Returns:
dict: 设备状态信息
"""
status = {
'device_info': {
'friendly_name': self.device_info.get('friendly_name', '未知'),
'ip': self.device_info.get('ip', '未知'),
'location': self.device_info.get('location', '未知')
},
'transport_info': self.get_transport_info(),
'position_info': self.get_position_info(),
'volume': self.get_volume(),
'mute': self.get_mute(),
'protocol_info': self.get_protocol_info(),
'services': {
'av_transport': self.av_transport_url is not None,
'rendering_control': self.rendering_control_url is not None,
'connection_manager': self.connection_manager_url is not None
}
}
return status
def get_supported_formats(self):
"""
获取设备支持的媒体格式
Returns:
list: 支持的格式列表
"""
protocol_info = self.get_protocol_info()
if not protocol_info or 'sink' not in protocol_info:
return []
# 解析sink字段提取支持的格式
sink_info = protocol_info['sink']
formats = []
# DLNA格式通常是逗号分隔的
for fmt in sink_info.split(','):
fmt = fmt.strip()
if fmt:
formats.append(fmt)
return formats
def can_play_format(self, media_url):
"""
检查设备是否可能支持指定格式
Args:
media_url: 媒体URL
Returns:
bool: 是否可能支持
"""
import mimetypes
# 获取文件扩展名
if '.' in media_url:
ext = media_url.split('.')[-1].lower()
# 常见扩展名到MIME类型的映射
ext_to_mime = {
'mp4': 'video/mp4',
'mp3': 'audio/mp3',
'm4a': 'audio/mp4',
'wav': 'audio/wav',
'flac': 'audio/flac',
'mkv': 'video/x-matroska',
'avi': 'video/x-msvideo',
'mov': 'video/quicktime',
'wmv': 'video/x-ms-wmv',
'flv': 'video/x-flv',
'webm': 'video/webm',
'ogg': 'audio/ogg',
'oga': 'audio/ogg',
'ogv': 'video/ogg',
'm3u8': 'application/x-mpegURL',
'mpd': 'application/dash+xml'
}
mime_type = ext_to_mime.get(ext)
if mime_type:
# 检查设备是否支持该MIME类型
supported_formats = self.get_supported_formats()
for fmt in supported_formats:
if mime_type in fmt:
return True
return False

693
core/http_file_server.py Normal file
View File

@@ -0,0 +1,693 @@
# core/http_file_server.py
"""
支持流式传输的HTTP文件服务器适合大文件投屏
支持动态修改根目录和流式传输
"""
import http.server
import socketserver
import threading
import os
import urllib.parse
import mimetypes
import time
from typing import Optional, Tuple
from core.logger import AppLogger
class StreamingHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
"""支持流式传输的HTTP请求处理器"""
# 协议版本
protocol_version = 'HTTP/1.1'
# 支持的MIME类型映射
VIDEO_MIME_TYPES = {
'.mp4': 'video/mp4',
'.mkv': 'video/x-matroska',
'.avi': 'video/x-msvideo',
'.mov': 'video/quicktime',
'.wmv': 'video/x-ms-wmv',
'.flv': 'video/x-flv',
'.webm': 'video/webm',
'.m4v': 'video/x-m4v',
'.3gp': 'video/3gpp',
'.ts': 'video/mp2t',
'.m2ts': 'video/mp2t',
'.mts': 'video/mp2t',
}
AUDIO_MIME_TYPES = {
'.mp3': 'audio/mpeg',
'.wav': 'audio/wav',
'.flac': 'audio/flac',
'.aac': 'audio/aac',
'.ogg': 'audio/ogg',
'.m4a': 'audio/mp4',
'.wma': 'audio/x-ms-wma',
}
IMAGE_MIME_TYPES = {
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.png': 'image/png',
'.gif': 'image/gif',
'.bmp': 'image/bmp',
'.webp': 'image/webp',
}
def __init__(self, *args, **kwargs):
self.extensions_map = {}
self.extensions_map.update(self.VIDEO_MIME_TYPES)
self.extensions_map.update(self.AUDIO_MIME_TYPES)
self.extensions_map.update(self.IMAGE_MIME_TYPES)
# 文本和其他类型
self.extensions_map.update({
'.html': 'text/html',
'.htm': 'text/html',
'.css': 'text/css',
'.js': 'application/javascript',
'.json': 'application/json',
'.txt': 'text/plain',
'.xml': 'application/xml',
'.pdf': 'application/pdf',
'.ico': 'image/x-icon',
'.svg': 'image/svg+xml',
})
self.logger = AppLogger('StreamingHTTPServer')
super().__init__(*args, **kwargs)
def get_server_root(self) -> str:
"""获取服务器当前根目录"""
if hasattr(self.server, 'current_directory'):
return self.server.current_directory
return "."
def translate_path(self, path: str) -> str:
"""
转换URL路径为文件系统路径
Args:
path: URL路径
Returns:
文件系统路径,如果无效则返回空字符串
"""
try:
# 解析URL
parsed_path = urllib.parse.urlparse(path)
url_path = urllib.parse.unquote(parsed_path.path)
# 获取根目录
root_dir = self.get_server_root()
# 去掉开头的斜杠
if url_path.startswith('/'):
url_path = url_path[1:]
# 安全检查:防止目录遍历攻击
if '..' in url_path:
self.logger.warning(f"检测到目录遍历攻击: {path}")
return ""
# 构建完整路径
full_path = os.path.join(root_dir, url_path)
# 标准化路径
full_path = os.path.normpath(full_path)
# 再次安全检查:确保路径在根目录下
abs_root = os.path.abspath(root_dir)
abs_path = os.path.abspath(full_path)
if not abs_path.startswith(abs_root):
self.logger.warning(f"路径超出根目录范围: {full_path}")
return ""
return full_path
except Exception as e:
self.logger.error(f"路径转换失败: {e}")
return ""
def guess_mime_type(self, file_path: str) -> str:
"""
根据文件扩展名猜测MIME类型
Args:
file_path: 文件路径
Returns:
MIME类型字符串
"""
# 获取扩展名
_, ext = os.path.splitext(file_path)
ext = ext.lower()
# 使用自定义映射
if ext in self.extensions_map:
return self.extensions_map[ext]
# 使用系统猜测
mime_type, _ = mimetypes.guess_type(file_path)
if mime_type:
return mime_type
# 默认类型
return 'application/octet-stream'
def parse_range_header(self, range_header: str, file_size: int) -> Optional[Tuple[int, int]]:
"""
解析Range请求头
Args:
range_header: Range头值
file_size: 文件大小
Returns:
(start, end)元组或None表示无效范围
"""
if not range_header:
return None
try:
# 格式: bytes=start-end
if range_header.startswith('bytes='):
range_str = range_header[6:] # 去掉'bytes='
if '-' in range_str:
start_str, end_str = range_str.split('-', 1)
start = 0
end = file_size - 1
if start_str:
start = int(start_str)
if end_str:
end = int(end_str)
else:
# 格式: bytes=start-
end = file_size - 1
# 验证范围
if start < 0 or end >= file_size or start > end:
return None
return (start, end)
except Exception as e:
self.logger.error(f"解析Range头失败: {e}")
return None
def send_file_streaming(self, file_path: str, start_pos: int = 0, end_pos: Optional[int] = None):
"""
流式发送文件内容
Args:
file_path: 文件路径
start_pos: 开始位置
end_pos: 结束位置None表示到文件末尾
"""
try:
file_size = os.path.getsize(file_path)
# 计算实际发送范围
if end_pos is None:
end_pos = file_size - 1
content_length = end_pos - start_pos + 1
# 打开文件
with open(file_path, 'rb') as f:
# 定位到开始位置
f.seek(start_pos)
# 设置传输编码为分块传输Chunked
self.send_response(200 if start_pos == 0 and end_pos == file_size - 1 else 206)
# 对于范围请求发送Content-Range头
if start_pos > 0 or end_pos < file_size - 1:
self.send_header('Content-Range', f'bytes {start_pos}-{end_pos}/{file_size}')
self.send_header('Accept-Ranges', 'bytes')
# 设置Content-Length而不是使用Transfer-Encoding: chunked
# 这样可以支持视频播放器的进度控制和跳转
self.send_header('Content-Length', str(content_length))
self.send_header('Content-Type', self.guess_mime_type(file_path))
self.send_header('Accept-Ranges', 'bytes')
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, HEAD, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Range')
self.send_header('Access-Control-Expose-Headers', 'Content-Length, Content-Range')
# 缓存控制头
self.send_header('Cache-Control', 'no-cache, no-store, must-revalidate')
self.send_header('Pragma', 'no-cache')
self.send_header('Expires', '0')
self.end_headers()
# 流式发送数据
remaining = content_length
chunk_size = 1024 * 256 # 256KB chunk size for streaming
while remaining > 0:
# 读取数据块
read_size = min(chunk_size, remaining)
data = f.read(read_size)
if not data:
break
# 发送数据块
try:
self.wfile.write(data)
self.wfile.flush() # 确保数据立即发送
except (ConnectionError, BrokenPipeError):
self.logger.debug("客户端断开连接")
break
remaining -= len(data)
# 可选小延迟以避免占用太多CPU
if read_size == chunk_size:
time.sleep(0.001) # 1ms延迟
self.logger.debug(f"文件传输完成: {file_path} ({content_length} bytes)")
except Exception as e:
self.logger.error(f"流式发送文件失败: {e}")
raise
def do_GET(self):
"""处理GET请求"""
try:
client_ip = self.client_address[0]
self.logger.debug(f"GET请求: {self.path} from {client_ip}")
# 转换路径
file_path = self.translate_path(self.path)
if not file_path:
self.send_error(404, "文件未找到")
return
# 检查文件是否存在
if not os.path.exists(file_path):
self.send_error(404, f"文件不存在: {self.path}")
return
if not os.path.isfile(file_path):
self.send_error(403, f"不是文件: {self.path}")
return
# 获取文件信息
file_size = os.path.getsize(file_path)
mime_type = self.guess_mime_type(file_path)
# 检查Range请求
range_header = self.headers.get('Range')
range_info = self.parse_range_header(range_header, file_size)
# 如果是视频或音频文件,支持范围请求
is_media_file = any(ext in file_path.lower() for ext in
['.mp4', '.mkv', '.avi', '.mov', '.mp3', '.wav', '.flac'])
if is_media_file and range_info:
# 范围请求
start_pos, end_pos = range_info
self.send_file_streaming(file_path, start_pos, end_pos)
else:
# 完整文件请求
# 对于大文件,也使用流式传输
self.send_file_streaming(file_path, 0, file_size - 1)
except ConnectionError:
self.logger.debug("客户端断开连接")
except Exception as e:
self.logger.error(f"处理GET请求失败: {e}")
if not self.headers_sent:
self.send_error(500, "内部服务器错误")
def do_HEAD(self):
"""处理HEAD请求"""
try:
self.logger.debug(f"HEAD请求: {self.path}")
# 转换路径
file_path = self.translate_path(self.path)
if not file_path:
self.send_error(404, "文件未找到")
return
# 检查文件是否存在
if not os.path.exists(file_path):
self.send_error(404, f"文件不存在: {self.path}")
return
if not os.path.isfile(file_path):
self.send_error(403, f"不是文件: {self.path}")
return
# 获取文件信息
file_size = os.path.getsize(file_path)
mime_type = self.guess_mime_type(file_path)
# 设置响应头
self.send_response(200)
self.send_header('Content-Type', mime_type)
self.send_header('Content-Length', str(file_size))
self.send_header('Accept-Ranges', 'bytes')
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, HEAD, OPTIONS')
self.end_headers()
except Exception as e:
self.logger.error(f"处理HEAD请求失败: {e}")
self.send_error(500, "内部服务器错误")
def do_OPTIONS(self):
"""处理OPTIONS请求CORS预检"""
self.send_response(200)
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, HEAD, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Range')
self.send_header('Access-Control-Max-Age', '86400')
self.end_headers()
def log_message(self, format, *args):
"""重写日志消息"""
# 减少日志输出,避免性能影响
message = format % args
if 'code' in message and 'message' in message:
# 只记录错误响应
if '200' not in message:
self.logger.info(f"HTTP响应: {message}")
class StreamingHTTPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
"""支持流式传输和多线程的HTTP服务器"""
allow_reuse_address = True
daemon_threads = True
def __init__(self, server_address, request_handler_class, initial_directory="."):
"""
初始化流式HTTP服务器
Args:
server_address: 服务器地址和端口
request_handler_class: 请求处理器类
initial_directory: 初始根目录
"""
self.current_directory = os.path.abspath(initial_directory)
self.request_queue_size = 50 # 增加请求队列大小
super().__init__(server_address, request_handler_class)
def set_directory(self, directory: str) -> bool:
"""
设置新的根目录
Args:
directory: 新的根目录
Returns:
是否设置成功
"""
new_dir = os.path.dirname(directory)
if os.path.exists(directory) and os.path.isdir(new_dir):
old_dir = self.current_directory
self.current_directory = new_dir
return True
return False
# 设置新的根目录
new_dir = os.path.abspath(directory)
old_dir = self.httpd.current_directory
self.httpd.current_directory = new_dir
self.logger.info(f"HTTP服务器根目录已更改: {old_dir} -> {new_dir}")
class StreamingFileServer:
"""流式文件服务器管理器"""
def __init__(self, port: int = 8000, bind_address: str = "0.0.0.0"):
"""
初始化流式文件服务器
Args:
port: 服务器端口
bind_address: 绑定地址
"""
self.port = port
self.bind_address = bind_address
self.httpd: Optional[StreamingHTTPServer] = None
self.server_thread: Optional[threading.Thread] = None
self.is_running = False
self.logger = AppLogger.create_default_logger('StreamingFileServer')
# 初始化MIME类型
mimetypes.init()
def start(self, directory: str = ".") -> bool:
"""
启动流式文件服务器
Args:
directory: 初始根目录
Returns:
是否启动成功
"""
try:
if self.is_running:
self.logger.warning("服务器已经在运行")
return False
# 检查目录
if not os.path.exists(directory):
self.logger.error(f"目录不存在: {directory}")
return False
if not os.path.isdir(directory):
self.logger.error(f"路径不是目录: {directory}")
return False
# 创建服务器
self.httpd = StreamingHTTPServer(
(self.bind_address, self.port),
StreamingHTTPRequestHandler,
initial_directory=directory
)
# 设置超时(秒)
self.httpd.timeout = 30
# 启动服务器线程
self.server_thread = threading.Thread(
target=self._run_server,
daemon=True,
name="StreamingHTTPServer"
)
self.server_thread.start()
self.is_running = True
self.logger.info(f"流式文件服务器已启动: http://{self.bind_address}:{self.port}")
self.logger.info(f"初始根目录: {directory}")
self.logger.info("服务器支持流式传输和断点续传")
return True
except Exception as e:
self.logger.error(f"启动服务器失败: {e}")
return False
def _run_server(self):
"""运行服务器线程"""
try:
self.logger.info(f"服务器监听在 {self.bind_address}:{self.port}")
self.httpd.serve_forever()
except Exception as e:
if self.is_running:
self.logger.error(f"服务器错误: {e}")
finally:
self.is_running = False
def set_root_directory(self, directory: str) -> bool:
"""
动态设置根目录
Args:
directory: 新的根目录
Returns:
是否设置成功
"""
if not self.is_running or not self.httpd:
self.logger.error("服务器未运行")
return False
try:
if not os.path.exists(directory):
self.logger.error(f"目录不存在: {directory}")
return False
if self.httpd.set_directory(directory):
self.logger.info(f"根目录已更改为: {directory}")
return True
else:
self.logger.error(f"设置根目录失败: {directory}")
return False
except Exception as e:
self.logger.error(f"设置根目录时出错: {e}")
return False
def get_root_directory(self) -> str:
"""获取当前根目录"""
if self.httpd:
return self.httpd.current_directory
return "."
def stop(self):
"""停止服务器"""
try:
self.is_running = False
if self.httpd:
self.httpd.shutdown()
self.httpd.server_close()
self.logger.info("流式文件服务器已停止")
except Exception as e:
self.logger.error(f"停止服务器时出错: {e}")
def get_file_url(self, file_path: str) -> Optional[str]:
"""
获取文件的HTTP URL
Args:
file_path: 文件路径(绝对或相对)
Returns:
文件的HTTP URL如果无效则返回None
"""
try:
# 获取当前根目录
root_dir = self.get_root_directory()
# 如果是绝对路径
if os.path.isabs(file_path):
abs_file_path = os.path.abspath(file_path)
abs_root_dir = os.path.abspath(root_dir)
# 确保文件在根目录下
if not abs_file_path.startswith(abs_root_dir):
self.logger.warning(f"文件不在当前根目录下: {file_path}")
return None
# 获取相对路径
relative_path = os.path.relpath(abs_file_path, abs_root_dir)
else:
# 相对路径
relative_path = file_path
# 检查文件是否存在
full_path = os.path.join(root_dir, relative_path)
if not os.path.exists(full_path) or not os.path.isfile(full_path):
self.logger.warning(f"文件不存在: {full_path}")
return None
# URL编码路径
encoded_path = urllib.parse.quote(relative_path.replace('\\', '/'))
# 构建URL
return f"http://{self.bind_address}:{self.port}/{encoded_path}"
except Exception as e:
self.logger.error(f"生成文件URL失败: {e}")
return None
def get_server_info(self) -> dict:
"""获取服务器信息"""
return {
'is_running': self.is_running,
'bind_address': self.bind_address,
'port': self.port,
'root_directory': self.get_root_directory(),
'url': f"http://{self.bind_address}:{self.port}"
}
# 高级流式服务器类
class HTTPFileServer(StreamingFileServer):
"""高级流式服务器,支持更多功能"""
def __init__(self, port: int = 8000, bind_address: str = "0.0.0.0",
max_chunk_size: int = 1024 * 256): # 256KB
"""
初始化高级流式服务器
Args:
port: 服务器端口
bind_address: 绑定地址
max_chunk_size: 最大块大小(字节)
"""
super().__init__(port, bind_address)
self.max_chunk_size = max_chunk_size
self.active_connections = 0
self.total_served_bytes = 0
self.stats_lock = threading.Lock()
def _run_server(self):
"""运行服务器线程,包含统计信息"""
self.logger.info(f"高级流式服务器启动,块大小: {self.max_chunk_size}字节")
super()._run_server()
def get_stats(self) -> dict:
"""获取服务器统计信息"""
with self.stats_lock:
return {
'active_connections': self.active_connections,
'total_served_bytes': self.total_served_bytes,
'is_running': self.is_running,
'root_directory': self.get_root_directory()
}
# 使用示例
def test_streaming_server():
"""测试流式服务器"""
import time
# 创建服务器
server = HTTPFileServer(port=8080)
print("启动流式文件服务器...")
# 启动服务器
if server.start("."):
print(f"服务器已启动: http://localhost:8080")
print(f"当前目录: {server.get_root_directory()}")
# 测试获取文件URL
test_file = "test.mp4" # 假设存在一个测试文件
file_url = server.get_file_url(test_file)
if file_url:
print(f"文件URL: {file_url}")
print("你可以使用以下命令测试:")
print(f" curl -I {file_url} # 查看文件信息")
print(f" curl --range 0-999 {file_url} # 测试范围请求")
# 运行一段时间
try:
while True:
time.sleep(1)
stats = server.get_stats()
print(f"\r活跃连接: {stats['active_connections']}, "
f"已传输: {stats['total_served_bytes'] / 1024 / 1024:.2f} MB", end='')
except KeyboardInterrupt:
print("\n\n正在停止服务器...")
finally:
server.stop()
print("服务器已停止")
else:
print("启动服务器失败")
if __name__ == "__main__":
test_streaming_server()

83
core/logger.py Normal file
View File

@@ -0,0 +1,83 @@
# logger.py
import logging
import sys
from datetime import datetime
import os
class AppLogger:
"""
应用程序日志工具类
提供统一的日志记录功能
"""
def __init__(self, name='', log_level=logging.INFO, log_to_file=False, log_file_path=None):
"""
初始化日志记录器
Args:
name: 日志记录器名称
log_level: 日志级别
log_to_file: 是否记录到文件
log_file_path: 日志文件路径如果为None则使用默认路径
"""
self.logger = logging.getLogger(name)
# 避免重复添加处理器
if self.logger.handlers:
return
self.logger.setLevel(log_level)
# 创建格式化器
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# 创建控制台处理器
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(log_level)
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)
# 如果需要记录到文件
if log_to_file:
if log_file_path is None:
# 创建logs目录
if not os.path.exists('logs'):
os.makedirs('logs')
# 生成日志文件名
timestamp = datetime.now().strftime('%Y%m%d')
log_file_path = f'logs/{timestamp}.log'
file_handler = logging.FileHandler(log_file_path, encoding='utf-8')
file_handler.setLevel(log_level)
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
def info(self, message):
"""记录信息级别日志"""
self.logger.info(message)
def debug(self, message):
"""记录调试级别日志"""
self.logger.debug(message)
def warning(self, message):
"""记录警告级别日志"""
self.logger.warning(message)
def error(self, message):
"""记录错误级别日志"""
self.logger.error(message)
def critical(self, message):
"""记录严重级别日志"""
self.logger.critical(message)
def set_level(self, level):
"""设置日志级别"""
self.logger.setLevel(level)
for handler in self.logger.handlers:
handler.setLevel(level)

BIN
icon.ico Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 104 KiB

30
main.py Normal file
View File

@@ -0,0 +1,30 @@
# main.py
import os
import sys
from PyQt5.QtWidgets import QApplication
# 获取当前文件所在目录main.py所在目录
current_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, current_dir) # 添加当前目录到搜索路径
from ui.main_window import MainWindow
def main():
"""应用主函数"""
app = QApplication(sys.argv)
# 设置应用样式
app.setStyle("Fusion")
# 创建并显示主窗口
window = MainWindow()
window.show()
# 启动应用事件循环
sys.exit(app.exec_())
if __name__ == "__main__":
main()

11
ui/__init__.py Normal file
View File

@@ -0,0 +1,11 @@
"""
用户界面模块
包含主窗口和各种UI组件。
"""
__all__ = [
"MainWindow",
]
# 导入UI组件
from .main_window import MainWindow

841
ui/main_window.py Normal file
View File

@@ -0,0 +1,841 @@
import sys
import os
import threading
import socket
from datetime import datetime
from PyQt5.QtCore import Qt
from PyQt5.QtGui import QIcon
from PyQt5.QtWidgets import QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QLabel, QGroupBox, QFrame, QPushButton, \
QListWidget, QSlider, QTextEdit, QFileDialog, QMessageBox, QListWidgetItem
from core.device_discovery import DeviceDiscovery
from core.dlna_controller import DLNAController
from core.http_file_server import HTTPFileServer
from core.logger import AppLogger
class MainWindow(QMainWindow):
"""智能媒体投屏器主窗口"""
def __init__(self):
super().__init__()
self.device_discovery = None
self.dlna_controller = None
self.http_server = None
self.selected_device = None
self.selected_file = None
self.discovered_devices = []
self.logger = AppLogger('MediaCastUI')
self.http_port = 19735
self.is_paused = False # 新增:跟踪暂停状态
self.init_ui()
self.init_connections()
def init_ui(self):
"""初始化UI界面"""
version = "v1.0.1"
app_name = "多媒体投屏 by yqsphp"
icon = "icon.ico"
self.setWindowTitle(app_name)
self.setGeometry(100, 100, 700, 500)
# 获取图标路径(支持打包和开发环境)
if getattr(sys, 'frozen', False):
# 打包后的exe环境
base_path = sys._MEIPASS
else:
# 开发环境
base_path = os.path.abspath(".")
icon_path = os.path.join(base_path, icon)
# 设置应用程序图标(影响任务栏)
self.setWindowIcon(QIcon(icon_path))
# Windows专用设置AppUserModelID非常重要
if sys.platform == "win32":
try:
from ctypes import windll
# 唯一ID格式公司名.程序名.版本
myappid = app_name + " " + version
windll.shell32.SetCurrentProcessExplicitAppUserModelID(myappid)
except ImportError:
pass
# 创建中央部件
central_widget = QWidget()
self.setCentralWidget(central_widget)
# 主布局
main_layout = QVBoxLayout(central_widget)
main_layout.setSpacing(10)
main_layout.setContentsMargins(10, 10, 10, 10)
# 水平布局用于左右面板
horizontal_layout = QHBoxLayout()
horizontal_layout.setSpacing(10)
# 左侧控制面板
left_panel = self.create_left_panel()
horizontal_layout.addWidget(left_panel, 1)
# 右侧信息面板
# right_panel = self.create_right_panel()
# horizontal_layout.addWidget(right_panel, 1)
# 将水平布局添加到垂直布局中
main_layout.addLayout(horizontal_layout)
# 状态栏
self.statusBar().showMessage("准备就绪")
# 添加弹性空间
main_layout.addStretch()
# 底部版权信息
copyright_label = QLabel(f"© {app_name} {version}")
copyright_label.setAlignment(Qt.AlignCenter)
copyright_label.setStyleSheet("""
color: #999;
font-size: 11px;
padding: 15px 0 5px 0;
border-top: 1px solid #eee;
margin-top: 10px;
""")
main_layout.addWidget(copyright_label)
# 设置扁平化样式
self.setStyleSheet("""
/* 主窗口 */
QMainWindow {
background-color: #f5f5f5;
font-family: 'Microsoft YaHei', 'Segoe UI', sans-serif;
}
/* 分组框 - 扁平化 */
QGroupBox {
font-size: 13px;
border: 1px solid #ddd;
border-radius: 6px;
margin-top: 8px;
padding-top: 8px;
background-color: white;
}
QGroupBox::title {
subcontrol-origin: margin;
left: 10px;
padding: 0 8px 0 8px;
color: #333;
}
/* 按钮 - 扁平化 */
QPushButton {
border: none;
border-radius: 4px;
padding: 13px 10px;
font-size: 15px;
color: white;
background-color: #6c757d;
}
QPushButton:disabled {
color: #999;
background-color: #f5f5f5;
}
/* 特殊按钮样式 */
#browseBtn{
background:white;
border: 1px solid #ddd;
color:blank;
}
#refreshBtn {
background-color: #28a745;
color: white;
border: none;
}
#startCastBtn {
background-color: #007bff;
color: white;
border: none;
}
#pauseCastBtn {
background-color: #ffc107;
color: white;
border: none;
}
#stopCastBtn {
background-color: #dc3545;
color: white;
border: none;
}
/*音量图标*/
#muteBtn{
background-color:none;
}
/* 列表控件 */
QListWidget {
border: 1px solid #ddd;
border-radius: 4px;
background-color: white;
padding: 4px;
}
QListWidget::item {
padding: 6px 8px;
border-radius: 4px;
border-bottom: 1px solid #f0f0f0;
}
QListWidget::item:selected {
color: white;
background-color: #007bff;
}
/* 文本编辑框 */
QTextEdit {
border: 1px solid #ddd;
border-radius: 4px;
background-color: white;
font-size: 11.5px;
padding: 0 4px;
}
/* 标签 */
QLabel {
color: #333;
font-size: 12px;
}
/* 输入框 */
QLineEdit {
border: 1px solid #ddd;
border-radius: 4px;
padding: 0 8px;
font-size: 12px;
}
""")
def create_left_panel(self):
"""创建左侧控制面板(文件和设备控制合并)"""
panel = QWidget()
layout = QVBoxLayout(panel)
layout.setSpacing(10)
# ==================== 系统信息区域 ====================
system_group = QGroupBox("系统信息")
system_layout = QVBoxLayout()
system_layout.setSpacing(8)
# 获取本机IP
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
local_ip = s.getsockname()[0]
s.close()
except:
local_ip = "获取失败"
# 服务器状态
server_frame = QFrame()
server_layout = QHBoxLayout(server_frame)
server_layout.addWidget(QLabel("文件服务:"))
self.ip_label = QLabel(f"{local_ip}:{self.http_port}")
server_layout.addWidget(self.ip_label, 1)
system_layout.addWidget(server_frame)
# 文件服务状态
self.file_service_label = QLabel("就绪")
self.file_service_label.setStyleSheet("color: #28a745;")
server_layout.addWidget(self.file_service_label, 1)
system_group.setLayout(system_layout)
layout.addWidget(system_group)
# ==================== 文件和设备控制区域 ====================
control_group = QGroupBox("文件与设备控制")
control_layout = QVBoxLayout()
control_layout.setSpacing(12)
# 文件选择区域
file_frame = QFrame()
file_layout = QHBoxLayout(file_frame)
file_layout.setSpacing(8)
self.file_label = QLabel("未选择文件")
self.file_label.setWordWrap(True)
self.file_label.setMinimumHeight(25)
self.file_label.setStyleSheet("""
border: 1px solid #dee2e6;
border-radius: 4px;
padding-left: 5px;
""")
file_layout.addWidget(self.file_label, 1)
browse_btn = QPushButton("浏览文件")
browse_btn.setCursor(Qt.PointingHandCursor)
browse_btn.setFixedWidth(80)
browse_btn.setObjectName("browseBtn")
browse_btn.clicked.connect(self.browse_file)
file_layout.addWidget(browse_btn)
control_layout.addWidget(file_frame)
# 设备列表区域
device_frame = QFrame()
device_layout = QVBoxLayout(device_frame)
device_layout.setSpacing(8)
# 设备列表标题和刷新按钮
device_header = QHBoxLayout()
device_header.addWidget(QLabel("可用设备列表:"))
device_header.addStretch()
device_layout.addLayout(device_header)
# 设备列表
self.device_list = QListWidget()
self.device_list.itemClicked.connect(self.select_device_from_list)
self.device_list.setMinimumHeight(100)
device_layout.addWidget(self.device_list)
control_layout.addWidget(device_frame)
# 播放控制按钮
playback_frame = QFrame()
playback_layout = QHBoxLayout(playback_frame)
playback_layout.setSpacing(8)
self.refresh_btn = QPushButton("刷新设备")
self.refresh_btn.setObjectName("refreshBtn")
self.refresh_btn.setCursor(Qt.PointingHandCursor)
self.refresh_btn.clicked.connect(self.refresh_devices)
self.start_btn = QPushButton("开始投屏")
self.start_btn.setObjectName("startCastBtn")
self.start_btn.setCursor(Qt.PointingHandCursor)
self.start_btn.clicked.connect(self.start_casting)
self.pause_btn = QPushButton("暂停投屏")
self.pause_btn.setObjectName("pauseCastBtn")
self.pause_btn.setCursor(Qt.PointingHandCursor)
self.pause_btn.clicked.connect(self.pause_casting)
self.pause_btn.setEnabled(False)
self.stop_btn = QPushButton("结束投屏")
self.stop_btn.setObjectName("stopCastBtn")
self.stop_btn.setCursor(Qt.PointingHandCursor)
self.stop_btn.clicked.connect(self.stop_casting)
self.stop_btn.setEnabled(False)
playback_layout.addWidget(self.refresh_btn)
playback_layout.addWidget(self.start_btn)
playback_layout.addWidget(self.pause_btn)
playback_layout.addWidget(self.stop_btn)
control_layout.addWidget(playback_frame)
# 音量控制
volume_frame = QFrame()
volume_layout = QHBoxLayout(volume_frame)
volume_layout.setSpacing(8)
volume_layout.addWidget(QLabel("音量:"))
self.volume_slider = QSlider(Qt.Horizontal)
self.volume_slider.setRange(0, 100)
self.volume_slider.setValue(50)
self.volume_slider.setCursor(Qt.PointingHandCursor)
self.volume_slider.valueChanged.connect(self.volume_changed)
volume_layout.addWidget(self.volume_slider, 1)
self.mute_btn = QPushButton("🔊")
self.mute_btn.setCheckable(True)
self.mute_btn.setFixedWidth(40)
self.mute_btn.setObjectName("muteBtn")
self.mute_btn.clicked.connect(self.toggle_mute)
volume_layout.addWidget(self.mute_btn)
control_layout.addWidget(volume_frame)
control_group.setLayout(control_layout)
layout.addWidget(control_group, 1) # 主要区域占据更多空间
layout.addStretch()
return panel
def create_right_panel(self):
"""创建右侧信息面板"""
panel = QWidget()
layout = QVBoxLayout(panel)
layout.setSpacing(10)
# ==================== 系统信息区域 ====================
system_group = QGroupBox("系统信息")
system_layout = QVBoxLayout()
system_layout.setSpacing(8)
# 获取本机IP
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
local_ip = s.getsockname()[0]
s.close()
except:
local_ip = "获取失败"
# 服务器状态
server_frame = QFrame()
server_layout = QHBoxLayout(server_frame)
server_layout.addWidget(QLabel("服务地址:"))
self.ip_label = QLabel(f"{local_ip}:{self.http_port}")
server_layout.addWidget(self.ip_label, 1)
system_layout.addWidget(server_frame)
# 文件服务状态
service_frame = QFrame()
service_layout = QHBoxLayout(service_frame)
service_layout.addWidget(QLabel("文件服务:"))
self.file_service_label = QLabel("就绪")
self.file_service_label.setStyleSheet("color: #28a745;")
service_layout.addWidget(self.file_service_label, 1)
system_layout.addWidget(service_frame)
system_group.setLayout(system_layout)
layout.addWidget(system_group)
# ==================== 操作日志区域 ====================
log_group = QGroupBox("操作日志")
log_layout = QVBoxLayout()
log_layout.setSpacing(2)
self.log_text = QTextEdit()
self.log_text.setReadOnly(True)
self.log_text.setMinimumHeight(250)
# 添加初始日志
self.log_message("系统", "智能媒体投屏器已启动")
self.log_message("系统", f"本地IP地址: {local_ip}")
log_layout.addWidget(self.log_text, 1)
# 日志控制按钮
log_btn_layout = QHBoxLayout()
log_btn_layout.setSpacing(8)
clear_log_btn = QPushButton("清空日志")
clear_log_btn.clicked.connect(self.log_text.clear)
log_btn_layout.addWidget(clear_log_btn)
log_btn_layout.addStretch()
log_layout.addLayout(log_btn_layout)
log_group.setLayout(log_layout)
layout.addWidget(log_group, 1) # 日志区域占据更多空间
layout.addStretch()
return panel
def init_connections(self):
"""初始化信号连接"""
# 设备发现
self.refresh_devices()
# 启动HTTP文件服务器
self.start_http_server()
def start_http_server(self):
"""启动HTTP文件服务器"""
try:
self.http_server = HTTPFileServer(self.http_port)
if self.http_server.start():
self.log_message("服务器", "HTTP文件服务器启动成功")
self.file_service_label.setText("运行中")
self.file_service_label.setStyleSheet("color: #20c997; font-weight: bold;")
else:
self.log_message("服务器", "HTTP文件服务器启动失败", "error")
self.file_service_label.setText("异常")
self.file_service_label.setStyleSheet("color: #dc3545; font-weight: bold;")
except Exception as e:
self.log_message("服务器", f"启动HTTP服务器出错: {e}", "error")
def enable_refresh_button(self):
"""启用刷新按钮"""
self.refresh_btn.setEnabled(True)
self.refresh_btn.setText("刷新设备")
def on_discovery_error(self, error_message):
"""设备发现错误"""
self.log_message("设备发现", f"搜索失败: {error_message}", "error")
self.enable_refresh_button()
QMessageBox.warning(self, "设备发现失败", f"搜索设备时出错:\n{error_message}")
def refresh_devices(self):
"""刷新设备列表"""
self.log_message("设备发现", "开始搜索网络中的投屏设备...", "info")
self.refresh_btn.setEnabled(False)
self.refresh_btn.setText("搜索中...")
# 在线程中进行避免阻塞UI
def do_quick_discovery():
if not self.device_discovery:
self.device_discovery = DeviceDiscovery()
# 获取刷新后设备信息
new_device = self.device_discovery.discover_media_renderers()
self.update_device_list(new_device, self.selected_device)
# 搜索完成后启用按钮
self.enable_refresh_button()
# 确保在UI线程中更新
thread = threading.Thread(target=do_quick_discovery, daemon=True)
thread.start()
def update_device_list(self, new_devices, current_selected_device=None):
"""
更新设备列表显示保留选中状态使用UDN作为唯一标识
Args:
new_devices: 搜索的设备
current_selected_device:当前选中的设备
"""
#当前选中设备
current_selected_udn = None
if current_selected_device:
current_selected_udn = current_selected_device.get('udn')
# 创建新设备字典以UDN为键
new_device_dict = {}
for device in new_devices:
device_udn = device.get('udn')
if device_udn: # 只有有UDN的设备才处理
new_device_dict[device_udn] = device
# 获取之前列表中所有设备的UDN
current_device_udn_list = []
for i in range(self.device_list.count()):
item = self.device_list.item(i)
device = item.data(Qt.UserRole)
device_udn = device.get('udn')
if device_udn:
current_device_udn_list.append(device_udn)
# 移除不存在的设备
items_to_remove = []
for i in range(self.device_list.count()):
item = self.device_list.item(i)
device = item.data(Qt.UserRole)
# 获取设备的标识优先使用UDN没有则用IP
device_udn = device.get('udn')
# 如果设备标识不在新设备列表中,标记为删除
if device_udn not in new_device_dict:
items_to_remove.append(i)
# 从后往前删除,避免索引变化
for index in reversed(items_to_remove):
self.device_list.takeItem(index)
# 如果删除的是当前选中的设备,清空选中状态
if item and item == self.device_list.currentItem():
self.device_list.clearSelection()
self.selected_device = None
# 更新或添加设备
current_selected_item = None
for device in new_devices:
# 获取设备标识优先使用UDN没有则用IP
device_identifier = device.get('udn')
device_name = device.get('friendly_name', '未知设备')
device_ip = device.get('ip', '未知IP')
# 创建显示文本
if device.get('udn'):
# 简化UDN显示只显示最后一部分
udn_parts = device_identifier.split(':')
udn_display = udn_parts[-1][:8] if len(udn_parts) > 1 else device_identifier[:8]
item_text = f"{device_name} [{device_ip}]"
else:
item_text = f"{device_name} [{device_ip}] (无UDN)"
# 检查设备是否已存在
found_item = None
for i in range(self.device_list.count()):
item = self.device_list.item(i)
existing_device = item.data(Qt.UserRole)
# 比较设备标识
existing_identifier = existing_device.get('udn')
if not existing_identifier:
existing_identifier = existing_device.get('ip', '')
if existing_identifier == device_identifier:
found_item = item
break
if found_item:
# 设备已存在,更新显示文本和设备数据
if found_item.text() != item_text:
found_item.setText(item_text)
# 更新设备数据(可能有新信息)
found_item.setData(Qt.UserRole, device)
# 检查是否是之前选中的设备
if current_selected_udn and device.get('udn') == current_selected_udn:
current_selected_item = found_item
elif current_selected_device and device.get('ip') == current_selected_device.get('ip'):
current_selected_item = found_item
else:
# 设备不存在,添加新项目
item = QListWidgetItem(item_text)
item.setData(Qt.UserRole, device)
self.device_list.addItem(item)
# 检查是否是之前选中的设备
if current_selected_udn and device.get('udn') == current_selected_udn:
current_selected_item = item
elif current_selected_device and device.get('ip') == current_selected_device.get('ip'):
current_selected_item = item
# 恢复选中状态
if current_selected_item:
self.device_list.setCurrentItem(current_selected_item)
self.selected_device = current_selected_item.data(Qt.UserRole)
device_name = self.selected_device.get('friendly_name', '未知设备')
self.log_message("设备选择", f"已恢复选择设备: {device_name}")
elif self.device_list.count() > 0 and self.device_list.currentItem() is None:
# 如果之前选中的设备不存在了,清空选中
self.device_list.clearSelection()
self.selected_device = None
elif self.device_list.count() == 0:
# 没有设备
self.selected_device = None
# 更新设备数量统计
device_count = self.device_list.count()
if device_count > 0:
self.log_message("设备发现", f"发现 {device_count} 个投屏设备")
# 如果有选中的设备,启用控制按钮
if self.selected_device:
self.enable_control_buttons(True)
device_name = self.selected_device.get('friendly_name', '未知设备')
self.log_message("设备选择", f"当前选中: {device_name}")
else:
self.log_message("设备发现", "未发现任何投屏设备", "warning")
self.selected_device = None
self.enable_control_buttons(False)
def select_device_from_list(self, item):
"""从列表中选择设备"""
device = item.data(Qt.UserRole)
if device:
self.selected_device = device
self.log_message("设备选择", f"已选择设备: {device.get('friendly_name', '未知')}")
# 启用控制按钮
self.enable_control_buttons(True)
def browse_file(self):
"""浏览并选择媒体文件"""
file_dialog = QFileDialog()
file_dialog.setFileMode(QFileDialog.ExistingFile)
# 设置文件过滤器
file_dialog.setNameFilter(
"媒体文件 (*.mp4 *.avi *.mkv *.mov *.mp3 *.wav *.flac *.wmv *.flv *.webm "
"*.mpeg *.mpg *.m4v *.3gp *.m4a *.aac *.ogg *.wma *.ape *.alac "
"*.amr *.opus *.ac3 *.dts *.ts *.m2ts *.vob *.ogv *.asf *.rm "
"*.rmvb *.divx *.xvid *.f4v *.m2v *.mpe *.mp2 *.mp1 *.ra *.ram "
"*.mid *.midi *.cda *.gsm *.vox *.voc *.dvf *.raw *.mka *.tta "
"*.spx *.8svx *.aiff *.au *.snd *.paf *.svx *.nist *.ircam "
"*.voc *.smp *.vox *.sou);;"
)
if file_dialog.exec_():
files = file_dialog.selectedFiles()
if files:
self.selected_file = files[0]
file_name = os.path.basename(self.selected_file)
self.file_label.setText(file_name)
#选择文件后启用播放按钮
self.start_btn.setEnabled(True)
self.log_message("文件选择", f"已选择文件: {file_name}")
def start_casting(self):
"""开始投屏"""
if not self.selected_device :
self.log_message("投屏", "请先选择设备", "error")
return
if not self.selected_file:
self.log_message("投屏", "请先选择文件", "error")
return
try:
# 如果是第一次播放需要创建DLNA控制器并设置URI
if not self.dlna_controller:
# 创建DLNA控制器
self.dlna_controller = DLNAController(self.selected_device)
# 启动HTTP服务器并获取文件URL
file_name = os.path.basename(self.selected_file)
dir_bool = self.http_server.set_root_directory(self.selected_file)
self.log_message("设置服务路径", "成功" if dir_bool else "失败")
# 获取IP地址从IP标签中提取
ip_text = self.ip_label.text()
file_url = f"http://{ip_text}/{file_name}"
self.log_message("设置播放地址", file_url)
# 设置播放URI并开始播放
if self.dlna_controller.set_av_transport_uri(file_url):
self.dlna_controller.play()
self.start_btn.setEnabled(False)
self.pause_btn.setEnabled(True)
self.stop_btn.setEnabled(True)
self.log_message("投屏", f"开始投屏: {file_name}")
self.statusBar().showMessage(f"正在投屏: {file_name}")
self.is_paused = False # 确保不是暂停状态
else:
QMessageBox.critical(self, "错误", "投屏失败,请检查设备连接")
return
else:
# 如果已经创建了DLNA控制器可能是从暂停状态恢复
if self.is_paused:
# 从暂停状态恢复播放
if self.dlna_controller.play():
self.is_paused = False
self.start_btn.setEnabled(False)
self.pause_btn.setEnabled(True)
self.stop_btn.setEnabled(True)
self.log_message("投屏", "继续播放")
else:
QMessageBox.critical(self, "错误", "继续播放失败")
else:
# 如果不是暂停状态,说明是停止后的重新播放
# 需要重新设置URI
file_name = os.path.basename(self.selected_file)
ip_text = self.ip_label.text()
file_url = f"http://{ip_text}/{file_name}"
if self.dlna_controller.set_av_transport_uri(file_url):
self.dlna_controller.play()
self.start_btn.setEnabled(False)
self.pause_btn.setEnabled(True)
self.stop_btn.setEnabled(True)
self.log_message("投屏", f"重新开始投屏: {file_name}")
self.statusBar().showMessage(f"正在投屏: {file_name}")
else:
QMessageBox.critical(self, "错误", "重新投屏失败")
except Exception as e:
QMessageBox.critical(self, "错误", f"投屏时出错: {str(e)}")
self.log_message("投屏", f"投屏失败: {str(e)}", "error")
def pause_casting(self):
"""暂停投屏"""
if self.dlna_controller:
if self.dlna_controller.pause():
self.is_paused = True # 标记为暂停状态
self.start_btn.setText("▶ 继续播放")
self.start_btn.setEnabled(True)
self.pause_btn.setEnabled(False)
self.log_message("投屏", "已暂停播放")
def stop_casting(self):
"""停止投屏"""
if self.dlna_controller:
if self.dlna_controller.stop():
self.is_paused = False # 重置暂停状态
self.start_btn.setText("▶ 开始投屏")
self.start_btn.setEnabled(True)
self.pause_btn.setEnabled(False)
self.stop_btn.setEnabled(False)
self.log_message("投屏", "已停止播放")
def volume_changed(self, value):
"""音量改变"""
if self.dlna_controller:
self.dlna_controller.set_volume(value)
def toggle_mute(self, checked):
"""切换静音"""
if self.dlna_controller:
self.dlna_controller.set_mute(checked)
icon = "🔇" if checked else "🔊"
self.mute_btn.setText(icon)
status = "静音" if checked else "取消静音"
self.log_message("控制", f"{status}")
def log_message(self, category, message, level="info"):
"""记录日志消息"""
# timestamp = datetime.now().strftime("%H:%M:%S")
#
# if level == "error":
# color = "#dc3545"
# prefix = "❌"
# elif level == "warning":
# color = "#ffc107"
# prefix = "⚠️"
# else:
# color = "#0d6efd"
# prefix = ""
#
# log_entry = f'<span style="color:#adb5bd;">[{timestamp}]</span><span style="color:{color};"><b>{category}:</b> {prefix} {message}</span><br>'
#
# self.log_text.insertHtml(log_entry)
#
# # 自动滚动到底部
# scrollbar = self.log_text.verticalScrollBar()
# scrollbar.setValue(scrollbar.maximum())
# 同时输出到状态栏
if level == "error":
self.statusBar().showMessage(f"错误: {message}", 1000)
elif level == "warning":
self.statusBar().showMessage(f"警告: {message}", 1000)
else:
self.statusBar().showMessage(message, 1000)
def enable_control_buttons(self, enabled):
"""启用/禁用控制按钮"""
self.pause_btn.setEnabled(False)
self.stop_btn.setEnabled(False)
self.volume_slider.setEnabled(enabled)
self.mute_btn.setEnabled(enabled)
def closeEvent(self, event):
"""窗口关闭事件"""
# 停止HTTP服务器
if self.http_server:
self.http_server.stop()
# 停止DLNA控制器
if self.dlna_controller:
try:
self.dlna_controller.stop()
except:
pass
self.log_message("系统", "应用程序已关闭")
event.accept()