Files
MediaCast4/core/http_file_server.py
2026-01-12 14:34:45 +08:00

692 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# core/http_file_server.py
"""
支持流式传输的HTTP文件服务器适合大文件投屏
支持动态修改根目录和流式传输
"""
import http.server
import socketserver
import threading
import os
import urllib.parse
import mimetypes
import time
from typing import Optional, Tuple
from core.logger import AppLogger
class StreamingHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
"""支持流式传输的HTTP请求处理器"""
# 协议版本
protocol_version = 'HTTP/1.1'
# 支持的MIME类型映射
VIDEO_MIME_TYPES = {
'.mp4': 'video/mp4',
'.mkv': 'video/x-matroska',
'.avi': 'video/x-msvideo',
'.mov': 'video/quicktime',
'.wmv': 'video/x-ms-wmv',
'.flv': 'video/x-flv',
'.webm': 'video/webm',
'.m4v': 'video/x-m4v',
'.3gp': 'video/3gpp',
'.ts': 'video/mp2t',
'.m2ts': 'video/mp2t',
'.mts': 'video/mp2t',
}
AUDIO_MIME_TYPES = {
'.mp3': 'audio/mpeg',
'.wav': 'audio/wav',
'.flac': 'audio/flac',
'.aac': 'audio/aac',
'.ogg': 'audio/ogg',
'.m4a': 'audio/mp4',
'.wma': 'audio/x-ms-wma',
}
IMAGE_MIME_TYPES = {
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.png': 'image/png',
'.gif': 'image/gif',
'.bmp': 'image/bmp',
'.webp': 'image/webp',
}
def __init__(self, *args, **kwargs):
self.extensions_map = {}
self.extensions_map.update(self.VIDEO_MIME_TYPES)
self.extensions_map.update(self.AUDIO_MIME_TYPES)
self.extensions_map.update(self.IMAGE_MIME_TYPES)
# 文本和其他类型
self.extensions_map.update({
'.html': 'text/html',
'.htm': 'text/html',
'.css': 'text/css',
'.js': 'application/javascript',
'.json': 'application/json',
'.txt': 'text/plain',
'.xml': 'application/xml',
'.pdf': 'application/pdf',
'.ico': 'image/x-icon',
'.svg': 'image/svg+xml',
})
self.logger = AppLogger('StreamingHTTPServer')
super().__init__(*args, **kwargs)
def get_server_root(self) -> str:
"""获取服务器当前根目录"""
if hasattr(self.server, 'current_directory'):
return self.server.current_directory
return "."
def translate_path(self, path: str) -> str:
"""
转换URL路径为文件系统路径
Args:
path: URL路径
Returns:
文件系统路径,如果无效则返回空字符串
"""
try:
# 解析URL
parsed_path = urllib.parse.urlparse(path)
url_path = urllib.parse.unquote(parsed_path.path)
# 获取根目录
root_dir = self.get_server_root()
# 去掉开头的斜杠
if url_path.startswith('/'):
url_path = url_path[1:]
# 安全检查:防止目录遍历攻击
if '..' in url_path:
self.logger.warning(f"检测到目录遍历攻击: {path}")
return ""
# 构建完整路径
full_path = os.path.join(root_dir, url_path)
# 标准化路径
full_path = os.path.normpath(full_path)
# 再次安全检查:确保路径在根目录下
abs_root = os.path.abspath(root_dir)
abs_path = os.path.abspath(full_path)
if not abs_path.startswith(abs_root):
self.logger.warning(f"路径超出根目录范围: {full_path}")
return ""
return full_path
except Exception as e:
self.logger.error(f"路径转换失败: {e}")
return ""
def guess_mime_type(self, file_path: str) -> str:
"""
根据文件扩展名猜测MIME类型
Args:
file_path: 文件路径
Returns:
MIME类型字符串
"""
# 获取扩展名
_, ext = os.path.splitext(file_path)
ext = ext.lower()
# 使用自定义映射
if ext in self.extensions_map:
return self.extensions_map[ext]
# 使用系统猜测
mime_type, _ = mimetypes.guess_type(file_path)
if mime_type:
return mime_type
# 默认类型
return 'application/octet-stream'
def parse_range_header(self, range_header: str, file_size: int) -> Optional[Tuple[int, int]]:
"""
解析Range请求头
Args:
range_header: Range头值
file_size: 文件大小
Returns:
(start, end)元组或None表示无效范围
"""
if not range_header:
return None
try:
# 格式: bytes=start-end
if range_header.startswith('bytes='):
range_str = range_header[6:] # 去掉'bytes='
if '-' in range_str:
start_str, end_str = range_str.split('-', 1)
start = 0
end = file_size - 1
if start_str:
start = int(start_str)
if end_str:
end = int(end_str)
else:
# 格式: bytes=start-
end = file_size - 1
# 验证范围
if start < 0 or end >= file_size or start > end:
return None
return (start, end)
except Exception as e:
self.logger.error(f"解析Range头失败: {e}")
return None
def send_file_streaming(self, file_path: str, start_pos: int = 0, end_pos: Optional[int] = None):
"""
流式发送文件内容
Args:
file_path: 文件路径
start_pos: 开始位置
end_pos: 结束位置None表示到文件末尾
"""
try:
file_size = os.path.getsize(file_path)
# 计算实际发送范围
if end_pos is None:
end_pos = file_size - 1
content_length = end_pos - start_pos + 1
# 打开文件
with open(file_path, 'rb') as f:
# 定位到开始位置
f.seek(start_pos)
# 设置传输编码为分块传输Chunked
self.send_response(200 if start_pos == 0 and end_pos == file_size - 1 else 206)
# 对于范围请求发送Content-Range头
if start_pos > 0 or end_pos < file_size - 1:
self.send_header('Content-Range', f'bytes {start_pos}-{end_pos}/{file_size}')
self.send_header('Accept-Ranges', 'bytes')
# 设置Content-Length而不是使用Transfer-Encoding: chunked
# 这样可以支持视频播放器的进度控制和跳转
self.send_header('Content-Length', str(content_length))
self.send_header('Content-Type', self.guess_mime_type(file_path))
self.send_header('Accept-Ranges', 'bytes')
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, HEAD, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Range')
self.send_header('Access-Control-Expose-Headers', 'Content-Length, Content-Range')
# 缓存控制头
self.send_header('Cache-Control', 'no-cache, no-store, must-revalidate')
self.send_header('Pragma', 'no-cache')
self.send_header('Expires', '0')
self.end_headers()
# 流式发送数据
remaining = content_length
chunk_size = 1024 * 256 # 256KB chunk size for streaming
while remaining > 0:
# 读取数据块
read_size = min(chunk_size, remaining)
data = f.read(read_size)
if not data:
break
# 发送数据块
try:
self.wfile.write(data)
self.wfile.flush() # 确保数据立即发送
except (ConnectionError, BrokenPipeError):
self.logger.debug("客户端断开连接")
break
remaining -= len(data)
# 可选小延迟以避免占用太多CPU
if read_size == chunk_size:
time.sleep(0.001) # 1ms延迟
self.logger.debug(f"文件传输完成: {file_path} ({content_length} bytes)")
except Exception as e:
self.logger.error(f"流式发送文件失败: {e}")
raise
def do_GET(self):
"""处理GET请求"""
try:
client_ip = self.client_address[0]
self.logger.debug(f"GET请求: {self.path} from {client_ip}")
# 转换路径
file_path = self.translate_path(self.path)
if not file_path:
self.send_error(404, "文件未找到")
return
# 检查文件是否存在
if not os.path.exists(file_path):
self.send_error(404, f"文件不存在: {self.path}")
return
if not os.path.isfile(file_path):
self.send_error(403, f"不是文件: {self.path}")
return
# 获取文件信息
file_size = os.path.getsize(file_path)
mime_type = self.guess_mime_type(file_path)
# 检查Range请求
range_header = self.headers.get('Range')
range_info = self.parse_range_header(range_header, file_size)
# 如果是视频或音频文件,支持范围请求
is_media_file = any(ext in file_path.lower() for ext in
['.mp4', '.mkv', '.avi', '.mov', '.mp3', '.wav', '.flac'])
if is_media_file and range_info:
# 范围请求
start_pos, end_pos = range_info
self.send_file_streaming(file_path, start_pos, end_pos)
else:
# 完整文件请求
# 对于大文件,也使用流式传输
self.send_file_streaming(file_path, 0, file_size - 1)
except ConnectionError:
self.logger.debug("客户端断开连接")
except Exception as e:
self.logger.error(f"处理GET请求失败: {e}")
if not self.headers_sent:
self.send_error(500, "内部服务器错误")
def do_HEAD(self):
"""处理HEAD请求"""
try:
self.logger.debug(f"HEAD请求: {self.path}")
# 转换路径
file_path = self.translate_path(self.path)
if not file_path:
self.send_error(404, "文件未找到")
return
# 检查文件是否存在
if not os.path.exists(file_path):
self.send_error(404, f"文件不存在: {self.path}")
return
if not os.path.isfile(file_path):
self.send_error(403, f"不是文件: {self.path}")
return
# 获取文件信息
file_size = os.path.getsize(file_path)
mime_type = self.guess_mime_type(file_path)
# 设置响应头
self.send_response(200)
self.send_header('Content-Type', mime_type)
self.send_header('Content-Length', str(file_size))
self.send_header('Accept-Ranges', 'bytes')
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, HEAD, OPTIONS')
self.end_headers()
except Exception as e:
self.logger.error(f"处理HEAD请求失败: {e}")
self.send_error(500, "内部服务器错误")
def do_OPTIONS(self):
"""处理OPTIONS请求CORS预检"""
self.send_response(200)
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, HEAD, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Range')
self.send_header('Access-Control-Max-Age', '86400')
self.end_headers()
def log_message(self, format, *args):
"""重写日志消息"""
# 减少日志输出,避免性能影响
message = format % args
if 'code' in message and 'message' in message:
# 只记录错误响应
if '200' not in message:
self.logger.info(f"HTTP响应: {message}")
class StreamingHTTPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
"""支持流式传输和多线程的HTTP服务器"""
allow_reuse_address = True
daemon_threads = True
def __init__(self, server_address, request_handler_class, initial_directory="."):
"""
初始化流式HTTP服务器
Args:
server_address: 服务器地址和端口
request_handler_class: 请求处理器类
initial_directory: 初始根目录
"""
self.current_directory = os.path.abspath(initial_directory)
self.request_queue_size = 50 # 增加请求队列大小
super().__init__(server_address, request_handler_class)
def set_directory(self, directory: str) -> bool:
"""
设置新的根目录
Args:
directory: 新的根目录
Returns:
是否设置成功
"""
new_dir = os.path.dirname(directory)
if os.path.exists(directory) and os.path.isdir(new_dir):
old_dir = self.current_directory
self.current_directory = new_dir
return True
return False
# 设置新的根目录
new_dir = os.path.abspath(directory)
old_dir = self.httpd.current_directory
self.httpd.current_directory = new_dir
self.logger.info(f"HTTP服务器根目录已更改: {old_dir} -> {new_dir}")
class StreamingFileServer:
"""流式文件服务器管理器"""
def __init__(self, port: int = 8000, bind_address: str = "0.0.0.0"):
"""
初始化流式文件服务器
Args:
port: 服务器端口
bind_address: 绑定地址
"""
self.port = port
self.bind_address = bind_address
self.httpd: Optional[StreamingHTTPServer] = None
self.server_thread: Optional[threading.Thread] = None
self.is_running = False
self.logger = AppLogger('StreamingFileServer')
# 初始化MIME类型
mimetypes.init()
def start(self, directory: str = ".") -> bool:
"""
启动流式文件服务器
Args:
directory: 初始根目录
Returns:
是否启动成功
"""
try:
if self.is_running:
self.logger.warning("服务器已经在运行")
return False
# 检查目录
if not os.path.exists(directory):
self.logger.error(f"目录不存在: {directory}")
return False
if not os.path.isdir(directory):
self.logger.error(f"路径不是目录: {directory}")
return False
# 创建服务器
self.httpd = StreamingHTTPServer(
(self.bind_address, self.port),
StreamingHTTPRequestHandler,
initial_directory=directory
)
# 设置超时(秒)
self.httpd.timeout = 30
# 启动服务器线程
self.server_thread = threading.Thread(
target=self._run_server,
daemon=True,
name="StreamingHTTPServer"
)
self.server_thread.start()
self.is_running = True
self.logger.info(f"流式文件服务器已启动: http://{self.bind_address}:{self.port}")
self.logger.info(f"初始根目录: {directory}")
self.logger.info("服务器支持流式传输和断点续传")
return True
except Exception as e:
self.logger.error(f"启动服务器失败: {e}")
return False
def _run_server(self):
"""运行服务器线程"""
try:
self.logger.info(f"服务器监听在 {self.bind_address}:{self.port}")
self.httpd.serve_forever()
except Exception as e:
if self.is_running:
self.logger.error(f"服务器错误: {e}")
finally:
self.is_running = False
def set_root_directory(self, directory: str) -> bool:
"""
动态设置根目录
Args:
directory: 新的根目录
Returns:
是否设置成功
"""
if not self.is_running or not self.httpd:
self.logger.error("服务器未运行")
return False
try:
if not os.path.exists(directory):
self.logger.error(f"目录不存在: {directory}")
return False
if self.httpd.set_directory(directory):
self.logger.info(f"根目录已更改为: {directory}")
return True
else:
self.logger.error(f"设置根目录失败: {directory}")
return False
except Exception as e:
self.logger.error(f"设置根目录时出错: {e}")
return False
def get_root_directory(self) -> str:
"""获取当前根目录"""
if self.httpd:
return self.httpd.current_directory
return "."
def stop(self):
"""停止服务器"""
try:
self.is_running = False
if self.httpd:
self.httpd.shutdown()
self.httpd.server_close()
self.logger.info("流式文件服务器已停止")
except Exception as e:
self.logger.error(f"停止服务器时出错: {e}")
def get_file_url(self, file_path: str) -> Optional[str]:
"""
获取文件的HTTP URL
Args:
file_path: 文件路径(绝对或相对)
Returns:
文件的HTTP URL如果无效则返回None
"""
try:
# 获取当前根目录
root_dir = self.get_root_directory()
# 如果是绝对路径
if os.path.isabs(file_path):
abs_file_path = os.path.abspath(file_path)
abs_root_dir = os.path.abspath(root_dir)
# 确保文件在根目录下
if not abs_file_path.startswith(abs_root_dir):
self.logger.warning(f"文件不在当前根目录下: {file_path}")
return None
# 获取相对路径
relative_path = os.path.relpath(abs_file_path, abs_root_dir)
else:
# 相对路径
relative_path = file_path
# 检查文件是否存在
full_path = os.path.join(root_dir, relative_path)
if not os.path.exists(full_path) or not os.path.isfile(full_path):
self.logger.warning(f"文件不存在: {full_path}")
return None
# URL编码路径
encoded_path = urllib.parse.quote(relative_path.replace('\\', '/'))
# 构建URL
return f"http://{self.bind_address}:{self.port}/{encoded_path}"
except Exception as e:
self.logger.error(f"生成文件URL失败: {e}")
return None
def get_server_info(self) -> dict:
"""获取服务器信息"""
return {
'is_running': self.is_running,
'bind_address': self.bind_address,
'port': self.port,
'root_directory': self.get_root_directory(),
'url': f"http://{self.bind_address}:{self.port}"
}
# 高级流式服务器类
class HTTPFileServer(StreamingFileServer):
"""高级流式服务器,支持更多功能"""
def __init__(self, port: int = 8000, bind_address: str = "0.0.0.0", max_chunk_size: int = 1024 * 256): # 256KB
"""
初始化高级流式服务器
Args:
port: 服务器端口
bind_address: 绑定地址
max_chunk_size: 最大块大小(字节)
"""
super().__init__(port, bind_address)
self.max_chunk_size = max_chunk_size
self.active_connections = 0
self.total_served_bytes = 0
self.stats_lock = threading.Lock()
def _run_server(self):
"""运行服务器线程,包含统计信息"""
self.logger.info(f"高级流式服务器启动,块大小: {self.max_chunk_size}字节")
super()._run_server()
def get_stats(self) -> dict:
"""获取服务器统计信息"""
with self.stats_lock:
return {
'active_connections': self.active_connections,
'total_served_bytes': self.total_served_bytes,
'is_running': self.is_running,
'root_directory': self.get_root_directory()
}
# 使用示例
def test_streaming_server():
"""测试流式服务器"""
import time
# 创建服务器
server = HTTPFileServer(port=8080)
print("启动流式文件服务器...")
# 启动服务器
if server.start("."):
print(f"服务器已启动: http://localhost:8080")
print(f"当前目录: {server.get_root_directory()}")
# 测试获取文件URL
test_file = "test.mp4" # 假设存在一个测试文件
file_url = server.get_file_url(test_file)
if file_url:
print(f"文件URL: {file_url}")
print("你可以使用以下命令测试:")
print(f" curl -I {file_url} # 查看文件信息")
print(f" curl --range 0-999 {file_url} # 测试范围请求")
# 运行一段时间
try:
while True:
time.sleep(1)
stats = server.get_stats()
print(f"\r活跃连接: {stats['active_connections']}, "
f"已传输: {stats['total_served_bytes'] / 1024 / 1024:.2f} MB", end='')
except KeyboardInterrupt:
print("\n\n正在停止服务器...")
finally:
server.stop()
print("服务器已停止")
else:
print("启动服务器失败")
if __name__ == "__main__":
test_streaming_server()