目录
1、安装必要的软件以及包
# 更新系统包
apt update
# 安装Python3和pip
apt install -y python3 python3-pip
# 安装需要的Python包
pip3 install tmdbsimple requests parse-torrent-name
2、创建工作和日志目录
# 创建脚本目录
mkdir -p /root/scripts
# 创建日志文件
touch /var/log/media_generator.log
chmod 644 /var/log/media_generator.log
3、创建主脚本文件:
# 创建并编辑脚本文件
nano /root/scripts/generate_media_files.py
将以下完整代码复制到编辑器中:
#!/usr/bin/env python3
import os
import re
import json
import shutil
import requests
import logging
from pathlib import Path
from datetime import datetime
from urllib.parse import quote
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/media_generator.log'),
logging.StreamHandler()
]
)
# Alist配置
ALIST_CONFIG = {
'host': 'http://127.0.0.1:5244',
'username': 'alist登录账号',
'password': 'alist登录密码'
}
# 其他配置
SOURCE_DIR = '/mnt/alistpan'
MEDIA_DIR = '/qingliangdata/alistod'
# 文件扩展名配置
VIDEO_EXTENSIONS = {'.mp4', '.mkv', '.avi', '.m4v', '.ts', '.wmv', '.m2ts', '.mov'}
SUBTITLE_EXTENSIONS = {'.srt', '.ass', '.ssa', '.sub', '.idx', '.sup', '.smi'}
IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.tbn', '.gif'}
NFO_EXTENSIONS = {'.nfo', '.info'}
class AlistClient:
def __init__(self, config):
self.host = config['host']
self.username = config['username']
self.password = config['password']
self.token = None
self.token_expire = 0
def login(self):
"""登录Alist获取token"""
try:
response = requests.post(
f"{self.host}/api/auth/login",
json={
"username": self.username,
"password": self.password
}
)
data = response.json()
if data['code'] == 200:
self.token = data['data']['token']
logging.info("Alist登录成功")
return True
else:
logging.error(f"Alist登录失败: {data['message']}")
return False
except Exception as e:
logging.error(f"Alist登录异常: {str(e)}")
return False
def get_file_link(self, file_path):
"""获取文件的直链"""
if not self.token:
if not self.login():
return None
try:
# 移除开头的/mnt/alistpan
alist_path = file_path.replace('/mnt/alistpan/', '/')
# 获取下载链接
response = requests.post(
f"{self.host}/api/fs/link",
headers={"Authorization": self.token},
json={
"path": alist_path,
"password": "",
}
)
data = response.json()
if data['code'] == 200:
link_data = data['data']
# 检查返回的数据是否是字典类型并包含url字段
if isinstance(link_data, dict) and 'url' in link_data:
raw_url = link_data['url']
logging.info(f"获取到下载链接URL: {raw_url}")
return raw_url
else:
logging.error(f"无效的链接数据格式: {link_data}")
return None
elif data['code'] == 401:
# token过期,重新登录
if self.login():
return self.get_file_link(file_path)
else:
logging.error(f"获取直链失败: {data['message']}, 文件路径: {file_path}")
return None
except Exception as e:
logging.error(f"获取直链异常: {str(e)}, 文件路径: {file_path}")
return None
class MediaProcessor:
def __init__(self):
self.processed_count = 0
self.error_count = 0
self.alist_client = AlistClient(ALIST_CONFIG)
self.running = True
def stop(self):
"""停止处理"""
self.running = False
logging.info("正在停止处理...")
def get_file_type(self, filename):
"""判断文件类型"""
ext = os.path.splitext(filename.lower())[1]
if ext in VIDEO_EXTENSIONS:
return 'video'
elif ext in SUBTITLE_EXTENSIONS:
return 'subtitle'
elif ext in IMAGE_EXTENSIONS:
return 'image'
elif ext in NFO_EXTENSIONS:
return 'nfo'
return 'other'
def collect_media_files(self, directory):
"""收集目录中的所有媒体文件"""
media_files = {
'videos': [],
'subtitles': [],
'images': [],
'nfos': []
}
try:
for file in os.listdir(directory):
file_path = os.path.join(directory, file)
if os.path.isfile(file_path):
file_type = self.get_file_type(file)
if file_type == 'video':
media_files['videos'].append(file)
elif file_type == 'subtitle':
media_files['subtitles'].append(file)
elif file_type == 'image':
media_files['images'].append(file)
elif file_type == 'nfo':
media_files['nfos'].append(file)
except Exception as e:
logging.error(f"收集媒体文件失败: {str(e)}\n目录: {directory}\n目录权限: {oct(os.stat(directory).st_mode)[-3:] if os.path.exists(directory) else '未知'}")
return None
return media_files
def copy_related_files(self, src_dir, dst_dir, video_file, media_files):
"""复制与视频相关的文件"""
try:
video_base = os.path.splitext(video_file)[0].lower()
# 复制字幕文件
for subtitle in media_files['subtitles']:
src_path = os.path.join(src_dir, subtitle)
dst_path = os.path.join(dst_dir, subtitle)
try:
if not os.path.exists(dst_path):
shutil.copy2(src_path, dst_path)
logging.info(f"复制字幕: {dst_path}")
except Exception as e:
error_msg = f"复制字幕文件失败:\n源文件: {src_path}\n目标文件: {dst_path}\n错误信息: {str(e)}\n文件大小: {os.path.getsize(src_path) if os.path.exists(src_path) else '文件不存在'}\n文件权限: {oct(os.stat(src_path).st_mode)[-3:] if os.path.exists(src_path) else '未知'}"
logging.error(error_msg)
raise
# 复制图片文件
for image in media_files['images']:
src_path = os.path.join(src_dir, image)
dst_path = os.path.join(dst_dir, image)
try:
if not os.path.exists(dst_path):
shutil.copy2(src_path, dst_path)
logging.info(f"复制图片: {dst_path}")
except Exception as e:
error_msg = f"复制图片文件失败:\n源文件: {src_path}\n目标文件: {dst_path}\n错误信息: {str(e)}\n文件大小: {os.path.getsize(src_path) if os.path.exists(src_path) else '文件不存在'}\n文件权限: {oct(os.stat(src_path).st_mode)[-3:] if os.path.exists(src_path) else '未知'}"
logging.error(error_msg)
raise
# 复制NFO文件
for nfo in media_files['nfos']:
src_path = os.path.join(src_dir, nfo)
dst_path = os.path.join(dst_dir, os.path.splitext(video_file)[0] + '.nfo')
try:
if not os.path.exists(dst_path):
shutil.copy2(src_path, dst_path)
logging.info(f"复制NFO: {dst_path}")
except Exception as e:
error_msg = f"复制NFO文件失败:\n源文件: {src_path}\n目标文件: {dst_path}\n错误信息: {str(e)}\n文件大小: {os.path.getsize(src_path) if os.path.exists(src_path) else '文件不存在'}\n文件权限: {oct(os.stat(src_path).st_mode)[-3:] if os.path.exists(src_path) else '未知'}"
logging.error(error_msg)
raise
return True
except Exception as e:
logging.error(f"复制相关文件过程中发生错误: {str(e)}")
return False
def process_directory(self, src_dir, dst_dir):
"""处理单个目录"""
if not self.running:
return
try:
# 收集所有媒体文件
media_files = self.collect_media_files(src_dir)
if media_files is None:
self.error_count += 1
return
# 确保目标目录存在
try:
os.makedirs(dst_dir, exist_ok=True)
except Exception as e:
logging.error(f"创建目标目录失败: {dst_dir}\n错误信息: {str(e)}")
self.error_count += 1
return
# 处理视频文件
for video in media_files['videos']:
if not self.running:
return
video_path = os.path.join(src_dir, video)
base_name = os.path.splitext(video)[0]
try:
# 获取直链并创建strm文件
direct_link = self.alist_client.get_file_link(video_path)
if direct_link:
strm_path = os.path.join(dst_dir, f"{base_name}.strm")
if not os.path.exists(strm_path):
with open(strm_path, 'w', encoding='utf-8') as f:
f.write(direct_link)
logging.info(f"创建STRM文件: {strm_path}")
# 复制相关文件
if self.copy_related_files(src_dir, dst_dir, video, media_files):
self.processed_count += 1
else:
self.error_count += 1
logging.error(f"处理文件失败: {video_path}")
else:
self.error_count += 1
logging.error(f"无法获取直链: {video_path}")
except Exception as e:
self.error_count += 1
logging.error(f"处理文件失败: {video_path}\n错误信息: {str(e)}")
except Exception as e:
logging.error(f"处理目录失败: {src_dir}\n错误信息: {str(e)}")
self.error_count += 1
def scan_media_files(self):
"""扫描并处理所有媒体文件"""
start_time = datetime.now()
logging.info("开始扫描媒体文件...")
try:
for root, dirs, _ in os.walk(SOURCE_DIR):
if not self.running:
break
# 跳过隐藏目录
dirs[:] = [d for d in dirs if not d.startswith('.')]
# 处理当前目录
rel_path = os.path.relpath(root, SOURCE_DIR)
dst_dir = os.path.join(MEDIA_DIR, rel_path)
self.process_directory(root, dst_dir)
except Exception as e:
logging.error(f"扫描过程中发生错误: {str(e)}")
finally:
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
logging.info(f"扫描{'完成' if self.running else '中断'}! 成功处理: {self.processed_count} 个视频文件, 失败: {self.error_count} 个, 耗时: {duration} 秒")
def main():
processor = MediaProcessor()
try:
processor.scan_media_files()
except KeyboardInterrupt:
logging.info("收到中断信号,正在优雅退出...")
processor.stop()
if __name__ == '__main__':
main()
4、设置脚本权限:
# 设置可执行权限
chmod +x /root/scripts/generate_media_files.py
5、测试运行脚本:
# 运行脚本
python3 /root/scripts/generate_media_files.py
6、查看日志
tail -f /var/log/media_generator.log
7、设置定时任务:
编辑crontab
crontab -e
添加以下内容(每天凌晨2点执行)
0 2 * * * /usr/bin/python3 /root/scripts/generate_media_files.py
8、设置目录权限:
设置媒体目录权限
chown -R emby:emby /qingliangdata/alistod
chmod -R 755 /qingliangdata/alistod
这个脚本会:扫描/mnt/alistpan目录下的所有视频文件为每个视频文件创建对应的strm文件从TMDB获取电影信息下载海报和背景图创建包含电影信息的NFO文件保持完整的目录结构记录详细的处理日志所有文件都会保存在/qingliangdata/alistod目录下,保持原始的目录结构。需要注意:1. 确保网络连接稳定确保有足够的磁盘空间定期检查日志文件监控TMDB API的使用限制