rep1/bin/pyScripts/Recorder.py

240 lines
9.6 KiB
Python
Raw Normal View History

2024-11-21 09:00:42 +08:00
# -*- coding: utf-8 -*-
import wave
import pyaudio
import subprocess
import numpy as np
import time
import cn2an
import os
from model_ASR import *
from mesh_ASR import *
from CAD_ASR import *
from model_operator import *
from file_operation import *
#-------关联C++库---------------
import ctypes
import platform
from ctypes import *
system = platform.system()
if system == "Windows":
pre = "./"
suff = ".dll"
else:
pre = "./lib"
suff = ".so"
libfile = ctypes.cdll.LoadLibrary
filename = pre+"MainWindow"+suff
mw = libfile(filename)
class Recorder:
def __init__(self, chunk=16, volume=600, gap=600, wait_time=3):
self.CHUNK = chunk
self.FORMAT = pyaudio.paInt16
self.CHANNELS = 1
self.RATE = 16000
self.WAIT_TIME = wait_time
self._frames = []
self.VOLUME = volume
self.GAP = gap
self.running = True
def __recording(self):
p = pyaudio.PyAudio()
stream = p.open(format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
input=True,
frames_per_buffer=self.CHUNK)
count_small = 0
count_loop = 0
end_flag = 0
begin = time.time()
overtime_flag = 0
while True:
final = time.time()
elapse = final - begin
if elapse >= self.WAIT_TIME:
overtime_flag = 1
break
data_check = stream.read(self.CHUNK)
audio_data = np.frombuffer(data_check, dtype=np.short)
temp = np.max(audio_data)
if temp > self.VOLUME:
print("开始录音: ", temp)
frames = self._frames
frames.append(data_check)
while True:
data_record = stream.read(self.CHUNK)
frames.append(data_record)
audio_data = np.frombuffer(data_record, dtype=np.short)
count_one_loop = np.sum(audio_data > self.VOLUME / 2)
if count_one_loop < self.CHUNK / 2:
count_small = count_small + 1
if count_small > self.GAP:
print("录音结束")
end_flag = 1
break
else:
count_small = 0
self.GAP = self.GAP + 0.1
count_loop = count_loop + 1
if count_loop > 10000:
break
if end_flag == 1:
break
stream.stop_stream()
stream.close()
p.terminate()
return overtime_flag
def __save(self, filename):
p = pyaudio.PyAudio()
if not filename.endswith(".wav"):
filename = filename + ".wav"
wf = wave.open(filename, 'wb')
wf.setsampwidth(2)
wf.setnchannels(self.CHANNELS)
wf.setframerate(self.RATE)
wf.writeframes(b''.join(self._frames))
wf.close()
def __obtain_vol(self, vol):
vl = vol
volume_system = 100
if 10 < vl <= 20:
volume_system = 200
if 20 <= vl < 40:
volume_system = 500
if 40 <= vl < 60:
volume_system = 700
if 60 <= vl < 80:
volume_system = 1000
if 80 <= vl < 90:
volume_system = 1600
if 90 <= vl:
volume_system = 1800
self.VOLUME = volume_system
def run_thread(self, volume):
print("音量:", volume)
self.__obtain_vol(volume)
flag_asr = 0
model_param = []
model_name = []
model_param_str = ''
model_name_str = ''
model_op_str = ''
err_message = ''
model_op = [0]
overtime_flag = self.__recording()
# 录音结束, 开始识别
if overtime_flag == 0:
if "asr" not in os.getcwd():
os.chdir("pyScripts/asr")
self.__save("command.wav")
begin = time.time()
subprocess.run(
"decoder_main.exe --chunk_size 16 --wav_path command.wav --model_path final.zip --dict_path words.txt --result result.txt",
shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
end = time.time()
print("识别时间:", end - begin)
#修改易错词
with open('result.txt', encoding='utf-8') as f:
content = f.read().strip()
print(content)
err_dict = {"": '', "": '', "": "", "山式": "三十", "山石": "三十", "高木": "高五", "围角": "为九",
"": "", "一致": "移至", "": "", "为期": "为七", "闪色": "散射"}
for i in err_dict.keys():
if i in content:
content = content.replace(i, err_dict[i])
content = cn2an.transform(content, "cn2an")
print("识别结果:", content)
# 判断是否选择了模型(某号或某个模型)
flag_asr = file_operation(content, flag_asr)
if flag_asr == 0:
model_name, flag_asr = choose_model(content, flag_asr)
# 如果选择了模型则进行模型求解识别和CAD操作识别, 目前CAD只能识别一种操作
if flag_asr == 100:
content = cn2an.transform(content, "cn2an")
model_op = Operator(content)
model_param, flag_asr, err_message = Move(content, flag_asr)
if flag_asr == 100:
model_param, flag_asr, err_message = Rotation(content, flag_asr)
if flag_asr == 100:
model_param, flag_asr, err_message = Mirror(content, flag_asr)
# 如果选择的模型大于1个则进行模型求和求差等多模型CAD交互操作识别
if len(model_name) > 1:
flag_asr = Summation(content, flag_asr)
if flag_asr == 100:
flag_asr = Difference(content, flag_asr)
if flag_asr == 100:
flag_asr = Intersection(content, flag_asr)
# 如果没有选择模型,则进行模型创建以及生成网格识别,同时识别是否对生成的模型进行散射等求解操作,目前只能生成一个模型
if flag_asr == 0:
err_dict = {"-": " ", "": "", "": "", "": "", "": "", "": "", "": "", "": "", "": "",
"": "", "半斤": "半径", "伴君": "半径", "半军": "半径", "搬进": "半径", "阿维": "高为", "散烁": "散射", "反射": "散射",
"散车": "散射"}
for i in err_dict.keys():
if i in content:
content = content.replace(i, err_dict[i])
model_param, flag_asr, model_op = Cube(content, model_param, flag_asr)
if flag_asr == 0:
model_param, flag_asr, model_op = Cylinder(content, model_param, flag_asr)
if flag_asr == 0:
model_param, flag_asr, model_op = Sphere(content, model_param, flag_asr)
if flag_asr == 0:
model_param, flag_asr, model_op = Cone(content, model_param, flag_asr)
if flag_asr == 0:
flag_asr = SurfaceMesh(content, flag_asr)
flag_asr = VolumeMesh(content, flag_asr)
flag_asr = import_mesh(content, flag_asr)
flag_asr = output_mesh(content, flag_asr)
# 转换list至byte str, 方便传参
for i in model_param:
temp = i
model_param_str = model_param_str + temp + ','
model_param_str = list(model_param_str)
if model_param_str:
model_param_str.pop(-1)
model_param_str = ''.join(model_param_str)
model_param_str = '[' + model_param_str + ']'
model_param_str = bytes(model_param_str, encoding='utf-8')
for i in model_name:
temp = i
model_name_str = model_name_str + temp + ','
model_name_str = list(model_name_str)
if model_name_str:
model_name_str.pop(-1)
model_name_str = ''.join(model_name_str)
model_name_str = '[' + model_name_str + ']'
model_name_str = bytes(model_name_str, encoding='utf-8')
for i in model_op:
temp = str(i)
model_op_str = model_op_str + temp + ','
model_op_str = list(model_op_str)
print(model_op_str)
if model_op_str:
model_op_str.pop(-1)
model_op_str = ''.join(model_op_str)
model_op_str = '[' + model_op_str + ']'
model_op_str = bytes(model_op_str, encoding='utf-8')
if err_message:
err_message = bytes(err_message, encoding='utf-8')
print(flag_asr, model_param_str, model_name_str, model_op_str, err_message)
# model_name为选择的模型名称,bytes str;
# model_param_str为模型参数bytes str;
# model_op为模型操作bytes str;
# flag_asr为操作类型 int
# err_message为错误信息, bytes str
mw.voiceControlASR(flag_asr, model_param_str, model_name_str, model_op_str, err_message)
else:
mw.voiceControlASR(flag_asr, model_param_str, model_name_str, model_op_str, err_message)