import os import shutil import subprocess import sys import time import zipfile from datetime import datetime from pathlib import Path from loguru import logger def ensure_dir(directory): if not os.path.exists(directory): os.makedirs(directory) logger.info(f"目录 {directory} 已创建") else: logger.info(f"目录 {directory} 已存在") def get_resource_path(relative_path): """ 获取资源绝对路径,适用于开发环境和PyInstaller打包后 """ if hasattr(sys, '_MEIPASS'): # 打包后的资源路径:sys._MEIPASS + 相对路径 base_path = sys._MEIPASS else: # 开发环境的基础路径 base_path = os.path.dirname(os.path.abspath(".")) return os.path.join(base_path, relative_path) def check_service_exist(service_name): result = subprocess.run(['sc', 'query', service_name], capture_output=True, text=True) if '1060' in result.stdout: # 服务不存在 return False return True def start_service(service_name): result = subprocess.run(['sc', 'query', service_name], capture_output=True, text=True) if 'RUNNING' in result.stdout: logger.info(f"服务 {service_name} 已经在运行") return True else: logger.info(f"服务 {service_name} 当前未运行") logger.info("尝试启动服务...") # 尝试启动服务 start_result = subprocess.run(['sc', 'start', service_name], capture_output=True, text=True) if start_result.returncode == 0: logger.info(f"服务 {service_name} 启动成功") return True else: logger.info(f"启动服务 {service_name} 失败: {start_result.stderr}") return False def unzip_file(zip_file_path): new_path = get_resource_path(os.path.join('datas', 'mariadb.zip')) with zipfile.ZipFile(str(new_path), 'r') as zip_ref: zip_ref.extractall(zip_file_path) # 将文件解压到指定路径 def start_service_if_not_running(service_name): result = subprocess.run(['sc', 'query', service_name], capture_output=True, text=True) if 'RUNNING' in result.stdout: logger.info(f"服务 {service_name} 已经在运行") else: logger.info(f"服务 {service_name} 当前未运行") logger.info("尝试启动服务...") # 尝试启动服务 start_result = subprocess.run(['sc', 'start', service_name], capture_output=True, text=True) if start_result.returncode == 0: logger.info(f"服务 {service_name} 启动成功") else: logger.info(f"启动服务 {service_name} 失败: {start_result.stderr}") raise MyCustomError(rf"服务 {service_name} 启动失败") class MyCustomError(Exception): """自定义异常""" def __init__(self, message="发生了一个错误"): # 调用基类的构造函数 super().__init__(message) class InstallMariaDb: def __init__(self): self.service_name = 'ComacMariaDB' self.root_path = r'D:\database' self.mariadb_data_path = r'D:\database\mariadb\data' self.mariadb_port = 3417 self.passowrd = '1qaz@WSX' self.init_schema = 'em_data_prod' self.init_sql = r'D:/database/mariadb/data/init.sql' pass def start_install_mariadb(self): existStatus = check_service_exist(self.service_name) if existStatus: start_service_if_not_running(self.service_name) return logger.info("开始安装MariaDB") ensure_dir(self.root_path) logger.info("开始解压文件") unzip_file(self.root_path) logger.info(f"开始注册服务 {self.service_name}") self.__register_service() logger.info("启动数据库") self.__start_mariadb() logger.info("数据库用户初始化") self.__init_db_user() logger.info("数据库初始化") self.__init_db() logger.info("设置Java运行环境") self.__set_java_env() # logger.info("生成脚本文件") # self.__set_bat() logger.info("安装完成,10秒后自动退出") time.sleep(10) pass def __start_mariadb(self): command = 'net start ' + self.service_name with os.popen(command) as stream: res = stream.read() logger.info(res) def __register_service(self): args1 = self.root_path + '/mariadb/bin/mysql_install_db.exe' args2 = '--datadir=' + self.mariadb_data_path args3 = '--service=' + self.service_name args4 = '--password=' + self.passowrd args5 = '--port=' + str(self.mariadb_port) res = subprocess.run([args1, args2, args3, args4, args5], text=True, capture_output=True) logger.info(res.stdout) return res def __init_db_user(self): command1 = fr'{self.root_path}\mariadb\bin\mysql -u root -p{self.passowrd} -P {self.mariadb_port} -e "create database if not exists {self.init_schema}; use {self.init_schema};"' with os.popen(command1) as stream: res = stream.read() logger.info(res) def __init_db(self): new_path = get_resource_path(os.path.join('datas', 'init.sql')) shutil.copy(str(new_path), self.mariadb_data_path) command2 = fr'{self.root_path}\mariadb\bin\mysql --no-defaults -u root -p{self.passowrd} -P {self.mariadb_port} {self.init_schema} < {self.init_sql}' with os.popen(command2) as stream: res2 = stream.read() logger.info(res2) os.remove(self.init_sql) pass def __set_java_env(self): new_path = get_resource_path(os.path.join('datas', 'jdk.zip')) with zipfile.ZipFile(str(new_path), 'r') as zip_ref: zip_ref.extractall(self.root_path) # 将文件解压到指定路径 pass # # def __set_bat(self): # new_path = get_resource_path(os.path.join('datas', 'start.bat')) # shutil.copy(str(new_path), self.root_path) pass class InstallComacDb: def __init__(self): self.project_dir = get_resource_path(os.path.dirname(Path(__file__).parent.absolute())) self.app_log_dir = 'D:/database/logs' self.comac_db_running_port = 12396 self.jar_path = get_resource_path(os.path.join("datas", "electromagnetic.jar")) self.new_java_path = 'D:/database/jdk/bin/java.exe' self.url = f'http://127.0.0.1:{self.comac_db_running_port}/index' self.service_name = "comacDatabase" self.service_description = "数据库组件服务" # 路径配置 self.datas_dir = get_resource_path(os.path.join('datas')) # NSSM可执行文件路径 self.nssm_exe = get_resource_path(os.path.join('datas', 'nssm.exe')) # Java和JAR文件路径 self.java_path = "D:/database/jdk/bin/java.exe" ensure_dir(self.app_log_dir) pass def start_comac_db(self): self.__delete_old_files(self.app_log_dir) existStatus = check_service_exist(self.service_name) if existStatus: start_service_if_not_running(self.service_name) return self.__register_and_start_service() pass def __delete_old_files(self, directory, days=2): # 计算时间阈值(当前时间 - days天) threshold_time = time.time() - days * 24 * 60 * 60 deleted_count = 0 try: # 遍历目录中的文件 for filename in os.listdir(directory): filepath = os.path.join(directory, filename) # 确保是文件而不是目录 if os.path.isfile(filepath): try: # 获取文件创建时间(Windows系统) creation_time = os.path.getctime(filepath) # 检查文件是否超过阈值 if creation_time < threshold_time: # 删除文件 os.remove(filepath) deleted_count += 1 logger.info(f"已删除: {filename}") except Exception as e: logger.info(f"处理文件 {filename} 时出错: {str(e)}", file=sys.stderr) logger.info(f"\n操作完成!共删除 {deleted_count} 个日志文件。") except Exception as e: logger.info(f"遍历目录时出错: {str(e)}", file=sys.stderr) def __register_and_start_service(self): # 服务配置 # 构建NSSM安装命令 formatted_time = datetime.now().strftime("%Y%m%d%H%M%S") cmd = [ str(self.nssm_exe), "install", self.service_name, self.java_path, f"-jar {self.jar_path} --logging.file.path={self.app_log_dir} --logging.file.name={self.app_log_dir}/app_{formatted_time}.log" ] # 执行安装命令 print("Installing service...") result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: print("Error installing service:") print(result.stderr) return False print("Service installed successfully!") # 设置服务显示名称和描述 subprocess.run([str(self.nssm_exe), "set", self.service_name, "DisplayName", self.service_name]) subprocess.run([str(self.nssm_exe), "set", self.service_name, "Description", self.service_description]) # 设置启动目录(可选,但推荐) # subprocess.run([str(nssm_exe), "set", service_name, "AppDirectory", str(datas_dir)]) # 设置启动类型为自动(可选) subprocess.run([str(self.nssm_exe), "set", self.service_name, "Start", "SERVICE_AUTO_START"]) print("Service configuration completed.") print(f"You can now start the service with: net start {self.service_name}") subprocess.run(rf"net start {self.service_name}") time.sleep(10) if __name__ == '__main__': InstallMariaDb().start_install_mariadb() InstallComacDb().start_comac_db() pass