import os import subprocess import sys import time from loguru import logger mariadb_port = 3417 mariadb_passowrd = '1qaz@WSX' mariadb_user='root' mariadb_init_schema = 'em_data_prod' comac_db_port=12396 class MyCustomError(Exception): """自定义异常""" def __init__(self, message="发生了一个错误"): # 调用基类的构造函数 super().__init__(message) def ensure_dir(directory): if not os.path.exists(directory): os.makedirs(directory) logger.info(f"目录 {directory} 已创建") else: logger.info(f"目录 {directory} 已存在") def get_resource_path(relative_path): """ 获取资源绝对路径,适用于开发环境和PyInstaller打包后 """ if hasattr(sys, '_MEIPASS'): # 打包后的资源路径:sys._MEIPASS + 相对路径 base_path = sys._MEIPASS else: # 开发环境的基础路径 base_path = os.path.dirname(os.path.abspath("..")) return os.path.join(base_path, relative_path) def check_service_exist(service_name): result = subprocess.run(['sc', 'query', service_name], capture_output=True, text=True) if '1060' in result.stdout: # 服务不存在 return False return True def start_service(service_name): result = subprocess.run(['sc', 'query', service_name], capture_output=True, text=True) if 'RUNNING' in result.stdout: logger.info(f"服务 {service_name} 已经在运行") return True else: logger.info(f"服务 {service_name} 当前未运行") logger.info("尝试启动服务...") # 尝试启动服务 start_result = subprocess.run(['sc', 'start', service_name], capture_output=True, text=True) if start_result.returncode == 0: logger.info(f"服务 {service_name} 启动成功") return True else: logger.info(f"启动服务 {service_name} 失败: {start_result.stderr}") return False def start_service_if_not_running(service_name): result = subprocess.run(['sc', 'query', service_name], capture_output=True, text=True) if 'RUNNING' in result.stdout: logger.info(f"服务 {service_name} 已经在运行") else: logger.info(f"服务 {service_name} 当前未运行") logger.info("尝试启动服务...") # 尝试启动服务 start_result = subprocess.run(['sc', 'start', service_name], capture_output=True, text=True) if start_result.returncode == 0: logger.info(f"服务 {service_name} 启动成功") else: logger.info(f"启动服务 {service_name} 失败: {start_result.stderr}") raise MyCustomError(rf"服务 {service_name} 启动失败") def delete_old_files(directory, days=2): if not os.path.exists(directory): return # 计算时间阈值(当前时间 - days天) threshold_time = time.time() - days * 24 * 60 * 60 deleted_count = 0 try: # 遍历目录中的文件 for filename in os.listdir(directory): filepath = os.path.join(directory, filename) # 确保是文件而不是目录 if os.path.isfile(filepath): try: # 获取文件创建时间(Windows系统) creation_time = os.path.getctime(filepath) # 检查文件是否超过阈值 if creation_time < threshold_time: # 删除文件 os.remove(filepath) deleted_count += 1 logger.info(f"已删除: {filename}") except Exception as e: logger.info(f"处理文件 {filename} 时出错: {str(e)}", file=sys.stderr) logger.info(f"\n操作完成!共删除 {deleted_count} 个日志文件。") except Exception as e: logger.info(f"遍历目录时出错: {str(e)}", file=sys.stderr)