通过115挂载onedrive网盘,然后rclone挂载到云服务器,在使用脚本生成strm文件

1、安装必要的软件以及包

# 更新系统包
apt update

# 安装Python3和pip
apt install -y python3 python3-pip

# 安装需要的Python包
pip3 install tmdbsimple requests parse-torrent-name

2、创建工作和日志目录

# 创建脚本目录
mkdir -p /root/scripts

# 创建日志文件
touch /var/log/media_generator.log
chmod 644 /var/log/media_generator.log

3、创建主脚本文件:

# 创建并编辑脚本文件
nano /root/scripts/generate_media_files.py

将以下完整代码复制到编辑器中:

#!/usr/bin/env python3

import os
import re
import json
import shutil
import requests
import logging
from pathlib import Path
from datetime import datetime
from urllib.parse import quote

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('/var/log/media_generator.log'),
        logging.StreamHandler()
    ]
)

# Alist配置
ALIST_CONFIG = {
    'host': 'http://127.0.0.1:5244',
    'username': 'alist登录账号',
    'password': 'alist登录密码'
}

# 其他配置
SOURCE_DIR = '/mnt/alistpan'
MEDIA_DIR = '/qingliangdata/alistod'

# 文件扩展名配置
VIDEO_EXTENSIONS = {'.mp4', '.mkv', '.avi', '.m4v', '.ts', '.wmv', '.m2ts', '.mov'}
SUBTITLE_EXTENSIONS = {'.srt', '.ass', '.ssa', '.sub', '.idx', '.sup', '.smi'}
IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.tbn', '.gif'}
NFO_EXTENSIONS = {'.nfo', '.info'}

class AlistClient:
    def __init__(self, config):
        self.host = config['host']
        self.username = config['username']
        self.password = config['password']
        self.token = None
        self.token_expire = 0
        
    def login(self):
        """登录Alist获取token"""
        try:
            response = requests.post(
                f"{self.host}/api/auth/login",
                json={
                    "username": self.username,
                    "password": self.password
                }
            )
            data = response.json()
            if data['code'] == 200:
                self.token = data['data']['token']
                logging.info("Alist登录成功")
                return True
            else:
                logging.error(f"Alist登录失败: {data['message']}")
                return False
        except Exception as e:
            logging.error(f"Alist登录异常: {str(e)}")
            return False

    def get_file_link(self, file_path):
        """获取文件的直链"""
        if not self.token:
            if not self.login():
                return None

        try:
            # 移除开头的/mnt/alistpan
            alist_path = file_path.replace('/mnt/alistpan/', '/')
            
            # 获取下载链接
            response = requests.post(
                f"{self.host}/api/fs/link",
                headers={"Authorization": self.token},
                json={
                    "path": alist_path,
                    "password": "",
                }
            )
            data = response.json()
            
            if data['code'] == 200:
                link_data = data['data']
                # 检查返回的数据是否是字典类型并包含url字段
                if isinstance(link_data, dict) and 'url' in link_data:
                    raw_url = link_data['url']
                    logging.info(f"获取到下载链接URL: {raw_url}")
                    return raw_url
                else:
                    logging.error(f"无效的链接数据格式: {link_data}")
                    return None
            elif data['code'] == 401:
                # token过期,重新登录
                if self.login():
                    return self.get_file_link(file_path)
            else:
                logging.error(f"获取直链失败: {data['message']}, 文件路径: {file_path}")
                return None
                
        except Exception as e:
            logging.error(f"获取直链异常: {str(e)}, 文件路径: {file_path}")
            return None

class MediaProcessor:
    def __init__(self):
        self.processed_count = 0
        self.error_count = 0
        self.alist_client = AlistClient(ALIST_CONFIG)
        self.running = True
        
    def stop(self):
        """停止处理"""
        self.running = False
        logging.info("正在停止处理...")
        
    def get_file_type(self, filename):
        """判断文件类型"""
        ext = os.path.splitext(filename.lower())[1]
        if ext in VIDEO_EXTENSIONS:
            return 'video'
        elif ext in SUBTITLE_EXTENSIONS:
            return 'subtitle'
        elif ext in IMAGE_EXTENSIONS:
            return 'image'
        elif ext in NFO_EXTENSIONS:
            return 'nfo'
        return 'other'

    def collect_media_files(self, directory):
        """收集目录中的所有媒体文件"""
        media_files = {
            'videos': [],
            'subtitles': [],
            'images': [],
            'nfos': []
        }
        
        try:
            for file in os.listdir(directory):
                file_path = os.path.join(directory, file)
                if os.path.isfile(file_path):
                    file_type = self.get_file_type(file)
                    if file_type == 'video':
                        media_files['videos'].append(file)
                    elif file_type == 'subtitle':
                        media_files['subtitles'].append(file)
                    elif file_type == 'image':
                        media_files['images'].append(file)
                    elif file_type == 'nfo':
                        media_files['nfos'].append(file)
        except Exception as e:
            logging.error(f"收集媒体文件失败: {str(e)}\n目录: {directory}\n目录权限: {oct(os.stat(directory).st_mode)[-3:] if os.path.exists(directory) else '未知'}")
            return None
        
        return media_files

    def copy_related_files(self, src_dir, dst_dir, video_file, media_files):
        """复制与视频相关的文件"""
        try:
            video_base = os.path.splitext(video_file)[0].lower()
            
            # 复制字幕文件
            for subtitle in media_files['subtitles']:
                src_path = os.path.join(src_dir, subtitle)
                dst_path = os.path.join(dst_dir, subtitle)
                try:
                    if not os.path.exists(dst_path):
                        shutil.copy2(src_path, dst_path)
                        logging.info(f"复制字幕: {dst_path}")
                except Exception as e:
                    error_msg = f"复制字幕文件失败:\n源文件: {src_path}\n目标文件: {dst_path}\n错误信息: {str(e)}\n文件大小: {os.path.getsize(src_path) if os.path.exists(src_path) else '文件不存在'}\n文件权限: {oct(os.stat(src_path).st_mode)[-3:] if os.path.exists(src_path) else '未知'}"
                    logging.error(error_msg)
                    raise
            
            # 复制图片文件
            for image in media_files['images']:
                src_path = os.path.join(src_dir, image)
                dst_path = os.path.join(dst_dir, image)
                try:
                    if not os.path.exists(dst_path):
                        shutil.copy2(src_path, dst_path)
                        logging.info(f"复制图片: {dst_path}")
                except Exception as e:
                    error_msg = f"复制图片文件失败:\n源文件: {src_path}\n目标文件: {dst_path}\n错误信息: {str(e)}\n文件大小: {os.path.getsize(src_path) if os.path.exists(src_path) else '文件不存在'}\n文件权限: {oct(os.stat(src_path).st_mode)[-3:] if os.path.exists(src_path) else '未知'}"
                    logging.error(error_msg)
                    raise
            
            # 复制NFO文件
            for nfo in media_files['nfos']:
                src_path = os.path.join(src_dir, nfo)
                dst_path = os.path.join(dst_dir, os.path.splitext(video_file)[0] + '.nfo')
                try:
                    if not os.path.exists(dst_path):
                        shutil.copy2(src_path, dst_path)
                        logging.info(f"复制NFO: {dst_path}")
                except Exception as e:
                    error_msg = f"复制NFO文件失败:\n源文件: {src_path}\n目标文件: {dst_path}\n错误信息: {str(e)}\n文件大小: {os.path.getsize(src_path) if os.path.exists(src_path) else '文件不存在'}\n文件权限: {oct(os.stat(src_path).st_mode)[-3:] if os.path.exists(src_path) else '未知'}"
                    logging.error(error_msg)
                    raise
            
            return True
            
        except Exception as e:
            logging.error(f"复制相关文件过程中发生错误: {str(e)}")
            return False

    def process_directory(self, src_dir, dst_dir):
        """处理单个目录"""
        if not self.running:
            return
            
        try:
            # 收集所有媒体文件
            media_files = self.collect_media_files(src_dir)
            if media_files is None:
                self.error_count += 1
                return
                
            # 确保目标目录存在
            try:
                os.makedirs(dst_dir, exist_ok=True)
            except Exception as e:
                logging.error(f"创建目标目录失败: {dst_dir}\n错误信息: {str(e)}")
                self.error_count += 1
                return
            
            # 处理视频文件
            for video in media_files['videos']:
                if not self.running:
                    return
                    
                video_path = os.path.join(src_dir, video)
                base_name = os.path.splitext(video)[0]
                
                try:
                    # 获取直链并创建strm文件
                    direct_link = self.alist_client.get_file_link(video_path)
                    if direct_link:
                        strm_path = os.path.join(dst_dir, f"{base_name}.strm")
                        if not os.path.exists(strm_path):
                            with open(strm_path, 'w', encoding='utf-8') as f:
                                f.write(direct_link)
                            logging.info(f"创建STRM文件: {strm_path}")
                        
                        # 复制相关文件
                        if self.copy_related_files(src_dir, dst_dir, video, media_files):
                            self.processed_count += 1
                        else:
                            self.error_count += 1
                            logging.error(f"处理文件失败: {video_path}")
                    else:
                        self.error_count += 1
                        logging.error(f"无法获取直链: {video_path}")
                
                except Exception as e:
                    self.error_count += 1
                    logging.error(f"处理文件失败: {video_path}\n错误信息: {str(e)}")
                
        except Exception as e:
            logging.error(f"处理目录失败: {src_dir}\n错误信息: {str(e)}")
            self.error_count += 1

    def scan_media_files(self):
        """扫描并处理所有媒体文件"""
        start_time = datetime.now()
        
        logging.info("开始扫描媒体文件...")
        
        try:
            for root, dirs, _ in os.walk(SOURCE_DIR):
                if not self.running:
                    break
                    
                # 跳过隐藏目录
                dirs[:] = [d for d in dirs if not d.startswith('.')]
                
                # 处理当前目录
                rel_path = os.path.relpath(root, SOURCE_DIR)
                dst_dir = os.path.join(MEDIA_DIR, rel_path)
                self.process_directory(root, dst_dir)
                
        except Exception as e:
            logging.error(f"扫描过程中发生错误: {str(e)}")
        finally:
            end_time = datetime.now()
            duration = (end_time - start_time).total_seconds()
            logging.info(f"扫描{'完成' if self.running else '中断'}! 成功处理: {self.processed_count} 个视频文件, 失败: {self.error_count} 个, 耗时: {duration} 秒")

def main():
    processor = MediaProcessor()
    try:
        processor.scan_media_files()
    except KeyboardInterrupt:
        logging.info("收到中断信号,正在优雅退出...")
        processor.stop()

if __name__ == '__main__':
    main()

4、设置脚本权限:

# 设置可执行权限
chmod +x /root/scripts/generate_media_files.py

5、测试运行脚本:

# 运行脚本 
python3 /root/scripts/generate_media_files.py

6、查看日志

tail -f /var/log/media_generator.log

7、设置定时任务:

编辑crontab

crontab -e

添加以下内容(每天凌晨2点执行)

0 2 * * * /usr/bin/python3 /root/scripts/generate_media_files.py

8、设置目录权限:

设置媒体目录权限

chown -R emby:emby /qingliangdata/alistod
chmod -R 755 /qingliangdata/alistod

这个脚本会:扫描/mnt/alistpan目录下的所有视频文件为每个视频文件创建对应的strm文件从TMDB获取电影信息下载海报和背景图创建包含电影信息的NFO文件保持完整的目录结构记录详细的处理日志所有文件都会保存在/qingliangdata/alistod目录下,保持原始的目录结构。需要注意:1. 确保网络连接稳定确保有足够的磁盘空间定期检查日志文件监控TMDB API的使用限制

By 行政