import os import shutil import subprocess import sys import time import zipfile from datetime import datetime from pathlib import Path import tkinter as tk from tkinter import filedialog, messagebox from loguru import logger def ensure_dir(directory): if not os.path.exists(directory): os.makedirs(directory) logger.info(f"目录 {directory} 已创建") else: logger.info(f"目录 {directory} 已存在") def get_resource_path(relative_path): """ 获取资源绝对路径,适用于开发环境和PyInstaller打包后 """ if hasattr(sys, '_MEIPASS'): # 打包后的资源路径:sys._MEIPASS + 相对路径 base_path = sys._MEIPASS else: # 开发环境的基础路径 base_path = os.path.dirname(os.path.abspath(".")) return os.path.join(base_path, relative_path) def check_service_exist(service_name): result = subprocess.run(['sc', 'query', service_name], capture_output=True, text=True) if '1060' in result.stdout: # 服务不存在 return False return True def start_service(service_name): result = subprocess.run(['sc', 'query', service_name], capture_output=True, text=True) if 'RUNNING' in result.stdout: logger.info(f"服务 {service_name} 已经在运行") return True else: logger.info(f"服务 {service_name} 当前未运行") logger.info("尝试启动服务...") # 尝试启动服务 start_result = subprocess.run(['sc', 'start', service_name], capture_output=True, text=True) if start_result.returncode == 0: logger.info(f"服务 {service_name} 启动成功") return True else: logger.info(f"启动服务 {service_name} 失败: {start_result.stderr}") return False def unzip_file(zip_file_path): new_path = get_resource_path(os.path.join('datas', 'mariadb.zip')) with zipfile.ZipFile(str(new_path), 'r') as zip_ref: zip_ref.extractall(zip_file_path) # 将文件解压到指定路径 def start_service_if_not_running(service_name): result = subprocess.run(['sc', 'query', service_name], capture_output=True, text=True) if 'RUNNING' in result.stdout: logger.info(f"服务 {service_name} 已经在运行") else: logger.info(f"服务 {service_name} 当前未运行") logger.info("尝试启动服务...") # 尝试启动服务 start_result = subprocess.run(['sc', 'start', service_name], capture_output=True, text=True) if start_result.returncode == 0: logger.info(f"服务 {service_name} 启动成功") else: logger.info(f"启动服务 {service_name} 失败: {start_result.stderr}") raise MyCustomError(rf"服务 {service_name} 启动失败") class MyCustomError(Exception): """自定义异常""" def __init__(self, message="发生了一个错误"): # 调用基类的构造函数 super().__init__(message) class InstallMariaDb: def __init__(self): self.service_name = 'ComacMariaDB' self.root_path = r'D:/database' self.mariadb_data_path = r'D:/database/mariadb/data' self.mariadb_port = 3417 self.passowrd = '1qaz@WSX' self.init_schema = 'em_data_prod' self.init_sql = r'D:/database/mariadb/data/init.sql' pass def start_install_mariadb(self): existStatus = check_service_exist(self.service_name) if existStatus: start_service_if_not_running(self.service_name) return logger.info("开始安装MariaDB") ensure_dir(self.root_path) logger.info("开始解压文件") unzip_file(self.root_path) logger.info(f"开始注册服务 {self.service_name}") self.__register_service() logger.info("启动数据库") self.__start_mariadb() logger.info("数据库用户初始化") self.__init_db_user() logger.info("数据库初始化") self.__init_db() logger.info("设置Java运行环境") self.__set_java_env() # logger.info("生成脚本文件") # self.__set_bat() logger.info("安装完成,10秒后进行数据库服务安装") time.sleep(10) pass def __start_mariadb(self): command = 'net start ' + self.service_name with os.popen(command) as stream: res = stream.read() logger.info(res) def __register_service(self): args1 = self.root_path + '/mariadb/bin/mysql_install_db.exe' args2 = '--datadir=' + self.mariadb_data_path args3 = '--service=' + self.service_name args4 = '--password=' + self.passowrd args5 = '--port=' + str(self.mariadb_port) res = subprocess.run([args1, args2, args3, args4, args5], text=True, capture_output=True) logger.info(res.stdout) return res def __init_db_user(self): command1 = fr'{self.root_path}\mariadb\bin\mysql -u root -p{self.passowrd} -P {self.mariadb_port} -e "create database if not exists {self.init_schema}; use {self.init_schema};"' with os.popen(command1) as stream: res = stream.read() logger.info(res) def __init_db(self): new_path = get_resource_path(os.path.join('datas', 'init.sql')) shutil.copy(str(new_path), self.mariadb_data_path) command2 = fr'{self.root_path}\mariadb\bin\mysql --no-defaults -u root -p{self.passowrd} -P {self.mariadb_port} {self.init_schema} < {self.init_sql}' with os.popen(command2) as stream: res2 = stream.read() logger.info(res2) os.remove(self.init_sql) pass def __set_java_env(self): new_path = get_resource_path(os.path.join('datas', 'jdk.zip')) with zipfile.ZipFile(str(new_path), 'r') as zip_ref: zip_ref.extractall(self.root_path) # 将文件解压到指定路径 pass pass class InstallComacDb: def __init__(self): self.project_dir = get_resource_path(os.path.dirname(Path(__file__).parent.absolute())) self.app_log_dir = 'D:/database/logs' self.comac_db_running_port = 12396 self.jar_path = get_resource_path(os.path.join("datas", "electromagnetic.jar")) self.new_java_path = 'D:/database/jdk/bin/java.exe' self.url = f'http://127.0.0.1:{self.comac_db_running_port}/index' self.service_name = "ComacDatabase" self.service_description = "数据库组件服务" # 路径配置 self.datas_dir = get_resource_path(os.path.join('datas')) # NSSM可执行文件路径 self.nssm_exe = get_resource_path(os.path.join('datas', 'nssm.exe')) # Java和JAR文件路径 self.java_path = "D:/database/jdk/bin/java.exe" ensure_dir(self.app_log_dir) pass def start_comac_db(self): self.__delete_old_files(self.app_log_dir) self.__remove_pre_service() self.__register_and_start_service() logger.info("运行完成,10秒钟后自动退出") time.sleep(10) pass def __delete_old_files(self, directory, days=2): # 计算时间阈值(当前时间 - days天) threshold_time = time.time() - days * 24 * 60 * 60 deleted_count = 0 try: # 遍历目录中的文件 for filename in os.listdir(directory): filepath = os.path.join(directory, filename) # 确保是文件而不是目录 if os.path.isfile(filepath): try: # 获取文件创建时间(Windows系统) creation_time = os.path.getctime(filepath) # 检查文件是否超过阈值 if creation_time < threshold_time: # 删除文件 os.remove(filepath) deleted_count += 1 logger.info(f"已删除: {filename}") except Exception as e: logger.info(f"处理文件 {filename} 时出错: {str(e)}", file=sys.stderr) logger.info(f"\n操作完成!共删除 {deleted_count} 个日志文件。") except Exception as e: logger.info(f"遍历目录时出错: {str(e)}", file=sys.stderr) def __register_and_start_service(self): # 服务配置 # 构建NSSM安装命令 formatted_time = datetime.now().strftime("%Y%m%d%H%M%S") cmd = [ str(self.nssm_exe), "install", self.service_name, self.java_path, f"-jar {self.jar_path} --logging.file.path={self.app_log_dir} --logging.file.name={self.app_log_dir}/app_{formatted_time}.log" ] # 执行安装命令 logger.info("Installing service...") result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: logger.info("Error installing service:") logger.info(result.stderr) return False logger.info("Service installed successfully!") # 设置服务显示名称和描述 subprocess.run([str(self.nssm_exe), "set", self.service_name, "DisplayName", self.service_name]) subprocess.run([str(self.nssm_exe), "set", self.service_name, "Description", self.service_description]) # 设置启动目录(可选,但推荐) # subprocess.run([str(nssm_exe), "set", service_name, "AppDirectory", str(datas_dir)]) # 设置启动类型为自动(可选) subprocess.run([str(self.nssm_exe), "set", self.service_name, "Start", "SERVICE_AUTO_START"]) logger.info("Service configuration completed.") subprocess.run(rf"net start {self.service_name}") time.sleep(10) def __remove_pre_service(self): logger.info("清理历史服务") stop_command = ["sc", "stop", self.service_name] delete_command = ["sc", "delete", self.service_name] subprocess.run(stop_command, capture_output=True, text=True) subprocess.run(delete_command, capture_output=True, text=True) time.sleep(5) pass def select_folder(): # 创建主窗口(但不显示) root = tk.Tk() root.withdraw() # 隐藏主窗口 # 弹出文件夹选择对话框 folder_path = filedialog.askdirectory(title="请选择安装文件夹(安装目录必须为空文件夹)") if not folder_path: # 如果用户取消了选择 logger.info("用户取消了选择") return None return folder_path def select_install_folder(): folder_path = select_folder() if folder_path is None: return # 用户取消了选择 # 检查文件夹是否为空 if os.path.exists(folder_path) and os.listdir(folder_path): # 文件夹不为空,弹出警告并退出 messagebox.showerror("错误", "安装文件夹非空,退出安装") exit(1) else: logger.info(f"选择的文件夹是: {folder_path}") # 这里可以继续你的程序逻辑 if __name__ == '__main__': install_dir = select_install_folder() logger.info(install_dir) # 安装mariadb相关服务 InstallMariaDb().start_install_mariadb() # 安装数据库组件 InstallComacDb().start_comac_db() pass