import os import subprocess import sys from loguru import logger mariadb_port = 3417 mariadb_passowrd = '1qaz@WSX' mariadb_user='root' mariadb_init_schema = 'em_data_prod' comac_db_port=12396 class MyCustomError(Exception): """自定义异常""" def __init__(self, message="发生了一个错误"): # 调用基类的构造函数 super().__init__(message) def ensure_dir(directory): if not os.path.exists(directory): os.makedirs(directory) logger.info(f"目录 {directory} 已创建") else: logger.info(f"目录 {directory} 已存在") def get_resource_path(relative_path): """ 获取资源绝对路径,适用于开发环境和PyInstaller打包后 """ if hasattr(sys, '_MEIPASS'): # 打包后的资源路径:sys._MEIPASS + 相对路径 base_path = sys._MEIPASS else: # 开发环境的基础路径 base_path = os.path.dirname(os.path.abspath("..")) return os.path.join(base_path, relative_path) def check_service_exist(service_name): result = subprocess.run(['sc', 'query', service_name], capture_output=True, text=True) if '1060' in result.stdout: # 服务不存在 return False return True def start_service(service_name): result = subprocess.run(['sc', 'query', service_name], capture_output=True, text=True) if 'RUNNING' in result.stdout: logger.info(f"服务 {service_name} 已经在运行") return True else: logger.info(f"服务 {service_name} 当前未运行") logger.info("尝试启动服务...") # 尝试启动服务 start_result = subprocess.run(['sc', 'start', service_name], capture_output=True, text=True) if start_result.returncode == 0: logger.info(f"服务 {service_name} 启动成功") return True else: logger.info(f"启动服务 {service_name} 失败: {start_result.stderr}") return False def start_service_if_not_running(service_name): result = subprocess.run(['sc', 'query', service_name], capture_output=True, text=True) if 'RUNNING' in result.stdout: logger.info(f"服务 {service_name} 已经在运行") else: logger.info(f"服务 {service_name} 当前未运行") logger.info("尝试启动服务...") # 尝试启动服务 start_result = subprocess.run(['sc', 'start', service_name], capture_output=True, text=True) if start_result.returncode == 0: logger.info(f"服务 {service_name} 启动成功") else: logger.info(f"启动服务 {service_name} 失败: {start_result.stderr}") raise MyCustomError(rf"服务 {service_name} 启动失败")