240 lines
9.6 KiB
Python
240 lines
9.6 KiB
Python
# -*- coding: utf-8 -*-
|
||
import wave
|
||
import pyaudio
|
||
import subprocess
|
||
import numpy as np
|
||
import time
|
||
import cn2an
|
||
import os
|
||
from model_ASR import *
|
||
from mesh_ASR import *
|
||
from CAD_ASR import *
|
||
from model_operator import *
|
||
from file_operation import *
|
||
#-------关联C++库---------------
|
||
import ctypes
|
||
import platform
|
||
from ctypes import *
|
||
system = platform.system()
|
||
if system == "Windows":
|
||
pre = "./"
|
||
suff = ".dll"
|
||
else:
|
||
pre = "./lib"
|
||
suff = ".so"
|
||
|
||
libfile = ctypes.cdll.LoadLibrary
|
||
filename = pre+"MainWindow"+suff
|
||
|
||
mw = libfile(filename)
|
||
|
||
|
||
|
||
class Recorder:
|
||
def __init__(self, chunk=16, volume=600, gap=600, wait_time=3):
|
||
self.CHUNK = chunk
|
||
self.FORMAT = pyaudio.paInt16
|
||
self.CHANNELS = 1
|
||
self.RATE = 16000
|
||
self.WAIT_TIME = wait_time
|
||
self._frames = []
|
||
self.VOLUME = volume
|
||
self.GAP = gap
|
||
self.running = True
|
||
|
||
def __recording(self):
|
||
p = pyaudio.PyAudio()
|
||
stream = p.open(format=self.FORMAT,
|
||
channels=self.CHANNELS,
|
||
rate=self.RATE,
|
||
input=True,
|
||
frames_per_buffer=self.CHUNK)
|
||
count_small = 0
|
||
count_loop = 0
|
||
end_flag = 0
|
||
begin = time.time()
|
||
overtime_flag = 0
|
||
while True:
|
||
final = time.time()
|
||
elapse = final - begin
|
||
if elapse >= self.WAIT_TIME:
|
||
overtime_flag = 1
|
||
break
|
||
data_check = stream.read(self.CHUNK)
|
||
audio_data = np.frombuffer(data_check, dtype=np.short)
|
||
temp = np.max(audio_data)
|
||
if temp > self.VOLUME:
|
||
print("开始录音: ", temp)
|
||
frames = self._frames
|
||
frames.append(data_check)
|
||
while True:
|
||
data_record = stream.read(self.CHUNK)
|
||
frames.append(data_record)
|
||
audio_data = np.frombuffer(data_record, dtype=np.short)
|
||
count_one_loop = np.sum(audio_data > self.VOLUME / 2)
|
||
if count_one_loop < self.CHUNK / 2:
|
||
count_small = count_small + 1
|
||
if count_small > self.GAP:
|
||
print("录音结束")
|
||
end_flag = 1
|
||
break
|
||
else:
|
||
count_small = 0
|
||
self.GAP = self.GAP + 0.1
|
||
count_loop = count_loop + 1
|
||
if count_loop > 10000:
|
||
break
|
||
|
||
if end_flag == 1:
|
||
break
|
||
stream.stop_stream()
|
||
stream.close()
|
||
p.terminate()
|
||
return overtime_flag
|
||
|
||
def __save(self, filename):
|
||
p = pyaudio.PyAudio()
|
||
if not filename.endswith(".wav"):
|
||
filename = filename + ".wav"
|
||
wf = wave.open(filename, 'wb')
|
||
wf.setsampwidth(2)
|
||
wf.setnchannels(self.CHANNELS)
|
||
wf.setframerate(self.RATE)
|
||
wf.writeframes(b''.join(self._frames))
|
||
wf.close()
|
||
|
||
def __obtain_vol(self, vol):
|
||
vl = vol
|
||
volume_system = 100
|
||
if 10 < vl <= 20:
|
||
volume_system = 200
|
||
if 20 <= vl < 40:
|
||
volume_system = 500
|
||
if 40 <= vl < 60:
|
||
volume_system = 700
|
||
if 60 <= vl < 80:
|
||
volume_system = 1000
|
||
if 80 <= vl < 90:
|
||
volume_system = 1600
|
||
if 90 <= vl:
|
||
volume_system = 1800
|
||
self.VOLUME = volume_system
|
||
|
||
def run_thread(self, volume):
|
||
print("音量:", volume)
|
||
self.__obtain_vol(volume)
|
||
flag_asr = 0
|
||
model_param = []
|
||
model_name = []
|
||
model_param_str = ''
|
||
model_name_str = ''
|
||
model_op_str = ''
|
||
err_message = ''
|
||
model_op = [0]
|
||
overtime_flag = self.__recording()
|
||
# 录音结束, 开始识别
|
||
if overtime_flag == 0:
|
||
if "asr" not in os.getcwd():
|
||
os.chdir("pyScripts/asr")
|
||
self.__save("command.wav")
|
||
begin = time.time()
|
||
subprocess.run(
|
||
"decoder_main.exe --chunk_size 16 --wav_path command.wav --model_path final.zip --dict_path words.txt --result result.txt",
|
||
shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||
end = time.time()
|
||
print("识别时间:", end - begin)
|
||
#修改易错词
|
||
with open('result.txt', encoding='utf-8') as f:
|
||
content = f.read().strip()
|
||
print(content)
|
||
err_dict = {"武": '五', "酒": '九', "幺": "一", "山式": "三十", "山石": "三十", "高木": "高五", "围角": "为九",
|
||
"士": "十", "一致": "移至", "伍": "五", "为期": "为七", "闪色": "散射"}
|
||
for i in err_dict.keys():
|
||
if i in content:
|
||
content = content.replace(i, err_dict[i])
|
||
content = cn2an.transform(content, "cn2an")
|
||
print("识别结果:", content)
|
||
|
||
# 判断是否选择了模型(某号或某个模型)
|
||
flag_asr = file_operation(content, flag_asr)
|
||
if flag_asr == 0:
|
||
model_name, flag_asr = choose_model(content, flag_asr)
|
||
|
||
# 如果选择了模型,则进行模型求解识别和CAD操作识别, 目前CAD只能识别一种操作
|
||
if flag_asr == 100:
|
||
content = cn2an.transform(content, "cn2an")
|
||
model_op = Operator(content)
|
||
model_param, flag_asr, err_message = Move(content, flag_asr)
|
||
if flag_asr == 100:
|
||
model_param, flag_asr, err_message = Rotation(content, flag_asr)
|
||
if flag_asr == 100:
|
||
model_param, flag_asr, err_message = Mirror(content, flag_asr)
|
||
# 如果选择的模型大于1个,则进行模型求和求差等多模型CAD交互操作识别
|
||
if len(model_name) > 1:
|
||
flag_asr = Summation(content, flag_asr)
|
||
if flag_asr == 100:
|
||
flag_asr = Difference(content, flag_asr)
|
||
if flag_asr == 100:
|
||
flag_asr = Intersection(content, flag_asr)
|
||
# 如果没有选择模型,则进行模型创建以及生成网格识别,同时识别是否对生成的模型进行散射等求解操作,目前只能生成一个模型
|
||
if flag_asr == 0:
|
||
err_dict = {"-": " ", "块": "宽", "凹": "高", "常": "长", "花": "宽", "搞": "高", "矿": "宽", "超": "长", "匡": "宽",
|
||
"昂": "长", "半斤": "半径", "伴君": "半径", "半军": "半径", "搬进": "半径", "阿维": "高为", "散烁": "散射", "反射": "散射",
|
||
"散车": "散射"}
|
||
for i in err_dict.keys():
|
||
if i in content:
|
||
content = content.replace(i, err_dict[i])
|
||
model_param, flag_asr, model_op = Cube(content, model_param, flag_asr)
|
||
if flag_asr == 0:
|
||
model_param, flag_asr, model_op = Cylinder(content, model_param, flag_asr)
|
||
if flag_asr == 0:
|
||
model_param, flag_asr, model_op = Sphere(content, model_param, flag_asr)
|
||
if flag_asr == 0:
|
||
model_param, flag_asr, model_op = Cone(content, model_param, flag_asr)
|
||
if flag_asr == 0:
|
||
flag_asr = SurfaceMesh(content, flag_asr)
|
||
flag_asr = VolumeMesh(content, flag_asr)
|
||
flag_asr = import_mesh(content, flag_asr)
|
||
flag_asr = output_mesh(content, flag_asr)
|
||
# 转换list至byte str, 方便传参
|
||
for i in model_param:
|
||
temp = i
|
||
model_param_str = model_param_str + temp + ','
|
||
model_param_str = list(model_param_str)
|
||
if model_param_str:
|
||
model_param_str.pop(-1)
|
||
model_param_str = ''.join(model_param_str)
|
||
model_param_str = '[' + model_param_str + ']'
|
||
model_param_str = bytes(model_param_str, encoding='utf-8')
|
||
for i in model_name:
|
||
temp = i
|
||
model_name_str = model_name_str + temp + ','
|
||
model_name_str = list(model_name_str)
|
||
if model_name_str:
|
||
model_name_str.pop(-1)
|
||
model_name_str = ''.join(model_name_str)
|
||
model_name_str = '[' + model_name_str + ']'
|
||
model_name_str = bytes(model_name_str, encoding='utf-8')
|
||
for i in model_op:
|
||
temp = str(i)
|
||
model_op_str = model_op_str + temp + ','
|
||
model_op_str = list(model_op_str)
|
||
print(model_op_str)
|
||
if model_op_str:
|
||
model_op_str.pop(-1)
|
||
model_op_str = ''.join(model_op_str)
|
||
model_op_str = '[' + model_op_str + ']'
|
||
model_op_str = bytes(model_op_str, encoding='utf-8')
|
||
if err_message:
|
||
err_message = bytes(err_message, encoding='utf-8')
|
||
print(flag_asr, model_param_str, model_name_str, model_op_str, err_message)
|
||
# model_name为选择的模型名称,bytes str;
|
||
# model_param_str为模型参数,bytes str;
|
||
# model_op为模型操作,bytes str;
|
||
# flag_asr为操作类型, int
|
||
# err_message为错误信息, bytes str
|
||
mw.voiceControlASR(flag_asr, model_param_str, model_name_str, model_op_str, err_message)
|
||
else:
|
||
mw.voiceControlASR(flag_asr, model_param_str, model_name_str, model_op_str, err_message)
|
||
|