comac_desk_app/PostProcessing/Resources/pyScripts/Recorder.py

240 lines
9.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import wave
import pyaudio
import subprocess
import numpy as np
import time
import cn2an
import os
from model_ASR import *
from mesh_ASR import *
from CAD_ASR import *
from model_operator import *
from file_operation import *
#-------关联C++库---------------
import ctypes
import platform
from ctypes import *
system = platform.system()
if system == "Windows":
pre = "./"
suff = ".dll"
else:
pre = "./lib"
suff = ".so"
libfile = ctypes.cdll.LoadLibrary
filename = pre+"MainWindow"+suff
mw = libfile(filename)
class Recorder:
def __init__(self, chunk=16, volume=600, gap=600, wait_time=3):
self.CHUNK = chunk
self.FORMAT = pyaudio.paInt16
self.CHANNELS = 1
self.RATE = 16000
self.WAIT_TIME = wait_time
self._frames = []
self.VOLUME = volume
self.GAP = gap
self.running = True
def __recording(self):
p = pyaudio.PyAudio()
stream = p.open(format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
input=True,
frames_per_buffer=self.CHUNK)
count_small = 0
count_loop = 0
end_flag = 0
begin = time.time()
overtime_flag = 0
while True:
final = time.time()
elapse = final - begin
if elapse >= self.WAIT_TIME:
overtime_flag = 1
break
data_check = stream.read(self.CHUNK)
audio_data = np.frombuffer(data_check, dtype=np.short)
temp = np.max(audio_data)
if temp > self.VOLUME:
print("开始录音: ", temp)
frames = self._frames
frames.append(data_check)
while True:
data_record = stream.read(self.CHUNK)
frames.append(data_record)
audio_data = np.frombuffer(data_record, dtype=np.short)
count_one_loop = np.sum(audio_data > self.VOLUME / 2)
if count_one_loop < self.CHUNK / 2:
count_small = count_small + 1
if count_small > self.GAP:
print("录音结束")
end_flag = 1
break
else:
count_small = 0
self.GAP = self.GAP + 0.1
count_loop = count_loop + 1
if count_loop > 10000:
break
if end_flag == 1:
break
stream.stop_stream()
stream.close()
p.terminate()
return overtime_flag
def __save(self, filename):
p = pyaudio.PyAudio()
if not filename.endswith(".wav"):
filename = filename + ".wav"
wf = wave.open(filename, 'wb')
wf.setsampwidth(2)
wf.setnchannels(self.CHANNELS)
wf.setframerate(self.RATE)
wf.writeframes(b''.join(self._frames))
wf.close()
def __obtain_vol(self, vol):
vl = vol
volume_system = 100
if 10 < vl <= 20:
volume_system = 200
if 20 <= vl < 40:
volume_system = 500
if 40 <= vl < 60:
volume_system = 700
if 60 <= vl < 80:
volume_system = 1000
if 80 <= vl < 90:
volume_system = 1600
if 90 <= vl:
volume_system = 1800
self.VOLUME = volume_system
def run_thread(self, volume):
print("音量:", volume)
self.__obtain_vol(volume)
flag_asr = 0
model_param = []
model_name = []
model_param_str = ''
model_name_str = ''
model_op_str = ''
err_message = ''
model_op = [0]
overtime_flag = self.__recording()
# 录音结束, 开始识别
if overtime_flag == 0:
if "asr" not in os.getcwd():
os.chdir("pyScripts/asr")
self.__save("command.wav")
begin = time.time()
subprocess.run(
"decoder_main.exe --chunk_size 16 --wav_path command.wav --model_path final.zip --dict_path words.txt --result result.txt",
shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
end = time.time()
print("识别时间:", end - begin)
#修改易错词
with open('result.txt', encoding='utf-8') as f:
content = f.read().strip()
print(content)
err_dict = {"": '', "": '', "": "", "山式": "三十", "山石": "三十", "高木": "高五", "围角": "为九",
"": "", "一致": "移至", "": "", "为期": "为七", "闪色": "散射"}
for i in err_dict.keys():
if i in content:
content = content.replace(i, err_dict[i])
content = cn2an.transform(content, "cn2an")
print("识别结果:", content)
# 判断是否选择了模型(某号或某个模型)
flag_asr = file_operation(content, flag_asr)
if flag_asr == 0:
model_name, flag_asr = choose_model(content, flag_asr)
# 如果选择了模型则进行模型求解识别和CAD操作识别, 目前CAD只能识别一种操作
if flag_asr == 100:
content = cn2an.transform(content, "cn2an")
model_op = Operator(content)
model_param, flag_asr, err_message = Move(content, flag_asr)
if flag_asr == 100:
model_param, flag_asr, err_message = Rotation(content, flag_asr)
if flag_asr == 100:
model_param, flag_asr, err_message = Mirror(content, flag_asr)
# 如果选择的模型大于1个则进行模型求和求差等多模型CAD交互操作识别
if len(model_name) > 1:
flag_asr = Summation(content, flag_asr)
if flag_asr == 100:
flag_asr = Difference(content, flag_asr)
if flag_asr == 100:
flag_asr = Intersection(content, flag_asr)
# 如果没有选择模型,则进行模型创建以及生成网格识别,同时识别是否对生成的模型进行散射等求解操作,目前只能生成一个模型
if flag_asr == 0:
err_dict = {"-": " ", "": "", "": "", "": "", "": "", "": "", "": "", "": "", "": "",
"": "", "半斤": "半径", "伴君": "半径", "半军": "半径", "搬进": "半径", "阿维": "高为", "散烁": "散射", "反射": "散射",
"散车": "散射"}
for i in err_dict.keys():
if i in content:
content = content.replace(i, err_dict[i])
model_param, flag_asr, model_op = Cube(content, model_param, flag_asr)
if flag_asr == 0:
model_param, flag_asr, model_op = Cylinder(content, model_param, flag_asr)
if flag_asr == 0:
model_param, flag_asr, model_op = Sphere(content, model_param, flag_asr)
if flag_asr == 0:
model_param, flag_asr, model_op = Cone(content, model_param, flag_asr)
if flag_asr == 0:
flag_asr = SurfaceMesh(content, flag_asr)
flag_asr = VolumeMesh(content, flag_asr)
flag_asr = import_mesh(content, flag_asr)
flag_asr = output_mesh(content, flag_asr)
# 转换list至byte str, 方便传参
for i in model_param:
temp = i
model_param_str = model_param_str + temp + ','
model_param_str = list(model_param_str)
if model_param_str:
model_param_str.pop(-1)
model_param_str = ''.join(model_param_str)
model_param_str = '[' + model_param_str + ']'
model_param_str = bytes(model_param_str, encoding='utf-8')
for i in model_name:
temp = i
model_name_str = model_name_str + temp + ','
model_name_str = list(model_name_str)
if model_name_str:
model_name_str.pop(-1)
model_name_str = ''.join(model_name_str)
model_name_str = '[' + model_name_str + ']'
model_name_str = bytes(model_name_str, encoding='utf-8')
for i in model_op:
temp = str(i)
model_op_str = model_op_str + temp + ','
model_op_str = list(model_op_str)
print(model_op_str)
if model_op_str:
model_op_str.pop(-1)
model_op_str = ''.join(model_op_str)
model_op_str = '[' + model_op_str + ']'
model_op_str = bytes(model_op_str, encoding='utf-8')
if err_message:
err_message = bytes(err_message, encoding='utf-8')
print(flag_asr, model_param_str, model_name_str, model_op_str, err_message)
# model_name为选择的模型名称,bytes str;
# model_param_str为模型参数bytes str;
# model_op为模型操作bytes str;
# flag_asr为操作类型 int
# err_message为错误信息, bytes str
mw.voiceControlASR(flag_asr, model_param_str, model_name_str, model_op_str, err_message)
else:
mw.voiceControlASR(flag_asr, model_param_str, model_name_str, model_op_str, err_message)