Files
MediaCast3/core/device_discovery.py

486 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import socket
import time
import urllib.request
import urllib.error
import xml.etree.ElementTree as ET
from core.logger import AppLogger
class DeviceDiscovery:
"""
DLNA/UPnP 媒体渲染器发现类
用于发现局域网中的投屏设备
"""
def __init__(self):
"""
初始化设备发现器
"""
self.logger = AppLogger('DeviceDiscovery')
self.discovered_devices = []
def discover_media_renderers(self, timeout=4):
"""
发现局域网中的媒体渲染器(投屏设备)
只返回 deviceType 为 urn:schemas-upnp-org:device:MediaRenderer:1 的设备
通过 UUID 去重
Args:
timeout: 搜索超时时间(秒)
Returns:
list: 发现的设备列表
"""
devices_by_uuid = {} # 用UUID去重
pending_devices = {} # 待处理的设备key为locationvalue为(ip, headers)
# 获取本机IP
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
local_ip = s.getsockname()[0]
s.close()
except:
local_ip = "获取失败"
# SSDP 发现消息
ssdp_msg = (
"M-SEARCH * HTTP/1.1\r\n"
"HOST: 239.255.255.250:1900\r\n"
"MAN: \"ssdp:discover\"\r\n"
"MX: 3\r\n"
"ST: ssdp:all\r\n"
"USER-AGENT: Python DLNA Discovery\r\n"
"\r\n"
)
# 创建 UDP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
# 绑定本机ip
sock.bind((local_ip, 0))
# 发送发现请求
sock.sendto(ssdp_msg.encode(), ('239.255.255.250', 1900))
sock.setblocking(0) # 设置为非阻塞模式
self.logger.info(f"开始搜索媒体渲染器,超时时间: {timeout}秒...")
# 使用 select 监听socket避免while循环
import select
start_time = time.time()
response_count = 0
while True:
# 计算剩余超时时间
remaining_time = timeout - (time.time() - start_time)
if remaining_time <= 0:
break
# 使用select监听socket
ready_to_read, _, _ = select.select([sock], [], [], remaining_time)
if sock in ready_to_read:
try:
data, addr = sock.recvfrom(4096)
response_count += 1
response = data.decode('utf-8', errors='ignore')
# 解析SSDP响应头
lines = response.split('\r\n')
headers = {}
for line in lines:
if ':' in line:
key, value = line.split(':', 1)
headers[key.strip().upper()] = value.strip()
# 检查是否有LOCATION头
location = headers.get('LOCATION')
if location:
pending_devices[location] = (addr[0], headers)
except socket.error as e:
self.logger.debug(f"接收数据时出错: {e}")
continue
else:
# select 超时
break
sock.close()
# 并发获取设备详情
if pending_devices:
from concurrent.futures import ThreadPoolExecutor, as_completed
self.logger.info(f"收到{response_count}个SSDP响应准备并发获取{len(pending_devices)}个设备详情...")
with ThreadPoolExecutor(max_workers=min(10, len(pending_devices))) as executor:
# 提交所有设备详情获取任务
future_to_location = {
executor.submit(self._get_device_details, location, ip, headers): location
for location, (ip, headers) in pending_devices.items()
}
# 收集结果
for future in as_completed(future_to_location):
try:
device_info = future.result(timeout=5)
if device_info:
uuid = device_info.get('udn', '')
if uuid and uuid not in devices_by_uuid:
devices_by_uuid[uuid] = device_info
device_name = device_info.get('friendly_name', '未知')
self.logger.info(f"发现新设备: {device_name} (UUID: {uuid[:20]}...)")
except Exception as e:
location = future_to_location[future]
self.logger.debug(f"获取设备详情失败 {location}: {e}")
self.logger.info(f"搜索完成: 发现{len(devices_by_uuid)}个媒体渲染器")
self.discovered_devices = list(devices_by_uuid.values())
return self.discovered_devices
def _get_device_details(self, location, ip, headers):
"""
获取单个设备的详细信息(提取出来便于并发调用)
Args:
location: 设备描述文档URL
ip: 设备IP地址
headers: SSDP响应头
Returns:
dict: 设备信息如果不是MediaRenderer则返回None
"""
usn = headers.get('USN', '')
st = headers.get('ST', headers.get('NT', ''))
try:
# 获取设备描述文档
req = urllib.request.Request(location)
req.add_header('User-Agent', 'Python DLNA Client')
req.timeout = 3
with urllib.request.urlopen(req, timeout=3) as response:
xml_data = response.read().decode('utf-8', errors='ignore')
# 解析 XML
root = ET.fromstring(xml_data)
ns = {'upnp': 'urn:schemas-upnp-org:device-1-0'}
# 提取设备信息
device = root.find('.//upnp:device', ns)
if device is None:
self.logger.debug(f"{location} 未找到设备信息")
return None
# 检查设备类型
device_type_elem = device.find('upnp:deviceType', ns)
if device_type_elem is None or device_type_elem.text != 'urn:schemas-upnp-org:device:MediaRenderer:1':
return None # 不是 MediaRenderer 设备,跳过
# 提取设备信息
friendly_name_elem = device.find('upnp:friendlyName', ns)
manufacturer_elem = device.find('upnp:manufacturer', ns)
model_name_elem = device.find('upnp:modelName', ns)
model_description_elem = device.find('upnp:modelDescription', ns)
udn_elem = device.find('upnp:UDN', ns)
# 提取服务列表
services = []
service_list = root.find('.//upnp:serviceList', ns)
if service_list is not None:
for service in service_list.findall('upnp:service', ns):
service_type_elem = service.find('upnp:serviceType', ns)
if service_type_elem is not None:
services.append(service_type_elem.text)
device_info = {
'ip': ip,
'location': location,
'server': headers.get('SERVER', ''),
'usn': usn,
'st': st,
'friendly_name': friendly_name_elem.text if friendly_name_elem is not None else '',
'manufacturer': manufacturer_elem.text if manufacturer_elem is not None else '',
'model_name': model_name_elem.text if model_name_elem is not None else '',
'model_description': model_description_elem.text if model_description_elem is not None else '',
'device_type': device_type_elem.text,
'udn': udn_elem.text if udn_elem is not None else '',
'services': services,
'response_time': time.strftime('%H:%M:%S')
}
self.logger.debug(f"成功获取设备详情: {device_info.get('friendly_name')} ({ip})")
return device_info
except (urllib.error.URLError, socket.timeout, ConnectionError) as e:
self.logger.debug(f"无法访问设备描述文档 {location}: {e}")
return None
except ET.ParseError as e:
self.logger.debug(f"解析设备XML失败: {location}, 错误: {e}")
return None
except Exception as e:
self.logger.debug(f"获取设备详情时出错: {e}")
return None
def _get_device_details_from_response(self, response, ip):
"""
从SSDP响应中获取设备详情只返回 MediaRenderer 设备
Args:
response: SSDP响应数据
ip: 设备IP地址
Returns:
dict: 设备信息如果不是MediaRenderer则返回None
"""
# 解析SSDP响应头
lines = response.split('\r\n')
headers = {}
for line in lines:
if ':' in line:
key, value = line.split(':', 1)
headers[key.strip().upper()] = value.strip()
# 检查是否有LOCATION头
location = headers.get('LOCATION')
if not location:
return None
# 先检查USN是否包含MediaRenderer字样快速过滤
usn = headers.get('USN', '')
st = headers.get('ST', headers.get('NT', ''))
try:
# 获取设备描述文档
req = urllib.request.Request(location)
req.add_header('User-Agent', 'Python DLNA Client')
with urllib.request.urlopen(req, timeout=4) as response:
xml_data = response.read().decode('utf-8', errors='ignore')
# 解析 XML
root = ET.fromstring(xml_data)
ns = {'upnp': 'urn:schemas-upnp-org:device-1-0'}
# 提取设备信息
device = root.find('.//upnp:device', ns)
if device is None:
self.logger.debug(f"{location} 未找到设备信息")
return None
# 检查设备类型
device_type_elem = device.find('upnp:deviceType', ns)
if device_type_elem is None or device_type_elem.text != 'urn:schemas-upnp-org:device:MediaRenderer:1':
return None # 不是 MediaRenderer 设备,跳过
# 提取设备信息
friendly_name_elem = device.find('upnp:friendlyName', ns)
manufacturer_elem = device.find('upnp:manufacturer', ns)
model_name_elem = device.find('upnp:modelName', ns)
model_description_elem = device.find('upnp:modelDescription', ns)
udn_elem = device.find('upnp:UDN', ns)
# 提取服务列表
services = []
service_list = root.find('.//upnp:serviceList', ns)
if service_list is not None:
for service in service_list.findall('upnp:service', ns):
service_type_elem = service.find('upnp:serviceType', ns)
if service_type_elem is not None:
services.append(service_type_elem.text)
device_info = {
'ip': ip,
'location': location,
'server': headers.get('SERVER', ''),
'usn': usn,
'st': st,
'friendly_name': friendly_name_elem.text if friendly_name_elem is not None else '',
'manufacturer': manufacturer_elem.text if manufacturer_elem is not None else '',
'model_name': model_name_elem.text if model_name_elem is not None else '',
'model_description': model_description_elem.text if model_description_elem is not None else '',
'device_type': device_type_elem.text,
'udn': udn_elem.text if udn_elem is not None else '',
'services': services,
'response_time': time.strftime('%H:%M:%S')
}
self.logger.debug(f"成功获取设备详情: {device_info.get('friendly_name')} ({ip})")
return device_info
except urllib.error.URLError as e:
self.logger.debug(f"无法访问设备描述文档 {location}: {e}")
return None
except socket.timeout:
self.logger.debug(f"访问设备描述文档超时: {location}")
return None
except ET.ParseError as e:
self.logger.debug(f"解析设备XML失败: {location}, 错误: {e}")
return None
except Exception as e:
self.logger.debug(f"获取设备详情时出错: {e}")
return None
def display_discovered_devices(self):
"""
显示已发现的媒体渲染器列表
"""
if not self.discovered_devices:
self.logger.info("未发现任何媒体渲染器(投屏设备)")
return
self.logger.info(f"发现 {len(self.discovered_devices)} 个媒体渲染器(投屏设备):")
for i, device in enumerate(self.discovered_devices, 1):
device_name = device.get('friendly_name', '未知')
device_ip = device.get('ip', '未知')
manufacturer = device.get('manufacturer', '未知')
model = device.get('model_name', '未知')
model_desc = device.get('model_description', '')
self.logger.info(f"设备{i}:{device_name},IP地址:{device_ip},制造商:{manufacturer},型号:{model},描述:{model_desc}")
def check_device_isline(self, devices, timeout=2, max_threads=5):
"""
快速检测指定的设备列表是否在线
通过访问设备的 location URL 来检测
Args:
devices: 待检测的设备列表
timeout: 每个设备的检测超时时间(秒)
max_threads: 最大并发线程数默认5个
Returns:
dict: 包含在线设备列表和离线设备列表的字典
"""
if not devices:
self.logger.info("没有需要检测的设备")
return []
from concurrent.futures import ThreadPoolExecutor, as_completed
self.logger.info(f"开始检测 {len(devices)} 个设备是否在线,超时时间: {timeout}")
online_devices = []
def check_single_device(device):
"""检测单个设备是否在线"""
uuid = device.get('udn', '未知')
device_name = device.get('friendly_name', '未知')
location = device.get('location', '')
ip = device.get('ip', '未知')
if not location:
self.logger.debug(f"设备 {device_name} 没有location信息")
return device, False
try:
# 设置超时访问设备的location URL
req = urllib.request.Request(location)
req.add_header('User-Agent', 'Python DLNA Availability Checker')
req.timeout = timeout
with urllib.request.urlopen(req, timeout=timeout) as response:
# 检查响应状态码
if response.status == 200:
self.logger.debug(f"设备 {device_name} ({ip}) 在线")
return device, True
else:
self.logger.debug(f"设备 {device_name} ({ip}) 响应异常: {response.status}")
return device, False
except urllib.error.URLError as e:
self.logger.debug(f"设备 {device_name} ({ip}) 无法访问: {e}")
return device, False
except socket.timeout:
self.logger.debug(f"设备 {device_name} ({ip}) 检测超时")
return device, False
except Exception as e:
self.logger.debug(f"设备 {device_name} ({ip}) 检测出错: {e}")
return device, False
# 使用线程池并发检测设备
with ThreadPoolExecutor(max_workers=max_threads) as executor:
# 提交所有检测任务
future_to_device = {
executor.submit(check_single_device, device): device
for device in devices
}
# 收集结果
for future in as_completed(future_to_device):
try:
device, is_online = future.result(timeout=timeout + 1)
if is_online:
online_devices.append(device)
self.logger.info(f"设备:{device.get('friendly_name')}[{device.get('ip')}]在线")
except Exception as e:
device = future_to_device[future]
self.logger.error(f"检测设备 {device.get('friendly_name', '未知')} 时出错: {e}")
return online_devices
def test_raw_discovery(self, timeout=5):
"""
测试原始的SSDP发现查看所有响应
"""
ssdp_msg = (
"M-SEARCH * HTTP/1.1\r\n"
"HOST: 239.255.255.250:1900\r\n"
"MAN: \"ssdp:discover\"\r\n"
"MX: 3\r\n"
"ST: ssdp:all\r\n" # 搜索所有设备
"USER-AGENT: Python DLNA Discovery\r\n"
"\r\n"
)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
sock.bind(('', 0))
# 发送到所有可能的端口
sock.sendto(ssdp_msg.encode(), ('239.255.255.250', 1900))
sock.settimeout(timeout)
responses = []
start_time = time.time()
while time.time() - start_time < timeout:
try:
data, addr = sock.recvfrom(4096)
response = data.decode('utf-8', errors='ignore')
responses.append((addr, response))
# 打印原始响应
print(f"\n收到来自 {addr} 的响应:")
print(response[:200] + "..." if len(response) > 200 else response)
# 尝试解析USN
lines = response.split('\r\n')
for line in lines:
if 'USN:' in line or 'ST:' in line or 'LOCATION:' in line:
print(f" {line.strip()}")
except socket.timeout:
break
except Exception as e:
print(f"接收错误: {e}")
sock.close()
print(f"\n总共收到 {len(responses)} 个原始响应")
return responses
def main():
discovery = DeviceDiscovery()
devices = discovery.discover_media_renderers()
discovery.display_discovered_devices()
print(devices)
if __name__ == '__main__':
main()