文章目录
- 一、创建应用
- 二、测试接口
一、创建应用
点击控制台,创建新应用
点击应用,进入详情页,可获取APPID、APISecret、APIKey
二、测试接口
安装包
pip install websocket
pip install websocket-client==0.57.0
#!/usr/bin/env python
# -*- coding: utf-8 -*-import base64
import hashlib
import hmac
import json
import os
import timeimport pandas as pd
import requestslfasr_host = 'http://raasr.xfyun.cn/api'# 请求的接口名
api_prepare = '/prepare'
api_upload = '/upload'
api_merge = '/merge'
api_get_progress = '/getProgress'
api_get_result = '/getResult'
# 文件分片大小10M
file_piece_sice = 10485760# ——————————————————转写可配置参数————————————————
# 参数可在官网界面(https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html)查看,根据需求可自行在gene_params方法里添加修改
# 转写类型
lfasr_type = 0
# 是否开启分词
has_participle = 'false'
has_seperate = 'true'
# 多候选词个数
max_alternatives = 0
# 子用户标识
suid = ''class SliceIdGenerator:"""slice id生成器"""def __init__(self):self.__ch = 'aaaaaaaaa`'def getNextSliceId(self):ch = self.__chj = len(ch) - 1while j >= 0:cj = ch[j]if cj != 'z':ch = ch[:j] + chr(ord(cj) + 1) + ch[j + 1:]breakelse:ch = ch[:j] + 'a' + ch[j + 1:]j = j - 1self.__ch = chreturn self.__chclass RequestApi(object):def __init__(self, appid, secret_key, upload_file_path):self.appid = appidself.secret_key = secret_keyself.upload_file_path = upload_file_path# 根据不同的apiname生成不同的参数,本示例中未使用全部参数您可在官网(https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html)查看后选择适合业务场景的进行更换def gene_params(self, apiname, taskid=None, slice_id=None):appid = self.appidsecret_key = self.secret_keyupload_file_path = self.upload_file_pathspeaker_number = 2role_type = 2ts = str(int(time.time()))m2 = hashlib.md5()m2.update((appid + ts).encode('utf-8'))md5 = m2.hexdigest()md5 = bytes(md5, encoding='utf-8')# 以secret_key为key, 上面的md5为msg, 使用hashlib.sha1加密结果为signasigna = hmac.new(secret_key.encode('utf-8'), md5, hashlib.sha1).digest()signa = base64.b64encode(signa)signa = str(signa, 'utf-8')file_len = os.path.getsize(upload_file_path)file_name = os.path.basename(upload_file_path)param_dict = {}if apiname == api_prepare:# slice_num是指分片数量,如果您使用的音频都是较短音频也可以不分片,直接将slice_num指定为1即可slice_num = int(file_len / file_piece_sice) + (0 if (file_len % file_piece_sice == 0) else 1)param_dict['app_id'] = appidparam_dict['signa'] = signaparam_dict['speaker_number'] = speaker_numberparam_dict['has_seperate'] = has_seperateparam_dict['role_type'] = role_typeparam_dict['ts'] = tsparam_dict['file_len'] = str(file_len)param_dict['file_name'] = file_nameparam_dict['slice_num'] = str(slice_num)elif apiname == api_upload:param_dict['app_id'] = appidparam_dict['signa'] = signaparam_dict['speaker_number'] = speaker_numberparam_dict['has_seperate'] = has_seperateparam_dict['role_type'] = role_typeparam_dict['ts'] = tsparam_dict['task_id'] = taskidparam_dict['slice_id'] = slice_idelif apiname == api_merge:param_dict['app_id'] = appidparam_dict['signa'] = signaparam_dict['speaker_number'] = speaker_numberparam_dict['has_seperate'] = has_seperateparam_dict['role_type'] = role_typeparam_dict['ts'] = tsparam_dict['task_id'] = taskidparam_dict['file_name'] = file_nameelif apiname == api_get_progress or apiname == api_get_result:param_dict['app_id'] = appidparam_dict['signa'] = signaparam_dict['speaker_number'] = speaker_numberparam_dict['has_seperate'] = has_seperateparam_dict['role_type'] = role_typeparam_dict['ts'] = tsparam_dict['task_id'] = taskidreturn param_dict# 请求和结果解析,结果中各个字段的含义可参考:https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.htmldef gene_request(self, apiname, data, files=None, headers=None):response = requests.post(lfasr_host + apiname, data=data, files=files, headers=headers)result = json.loads(response.text)if result["ok"] == 0:print("{} success:".format(apiname) + str(result))return resultelse:print("{} error:".format(apiname) + str(result))exit(0)return result# 预处理def prepare_request(self):return self.gene_request(apiname=api_prepare,data=self.gene_params(api_prepare))# 上传def upload_request(self, taskid, upload_file_path):file_object = open(upload_file_path, 'rb')try:index = 1sig = SliceIdGenerator()while True:content = file_object.read(file_piece_sice)if not content or len(content) == 0:breakfiles = {"filename": self.gene_params(api_upload).get("slice_id"),"content": content}response = self.gene_request(api_upload,data=self.gene_params(api_upload, taskid=taskid,slice_id=sig.getNextSliceId()),files=files)if response.get('ok') != 0:# 上传分片失败print('upload slice fail, response: ' + str(response))return Falseprint('upload slice ' + str(index) + ' success')index += 1finally:'file index:' + str(file_object.tell())file_object.close()return True# 合并def merge_request(self, taskid):return self.gene_request(api_merge, data=self.gene_params(api_merge, taskid=taskid))# 获取进度def get_progress_request(self, taskid):return self.gene_request(api_get_progress, data=self.gene_params(api_get_progress, taskid=taskid))# 获取结果def get_result_request(self, taskid):return self.gene_request(api_get_result, data=self.gene_params(api_get_result, taskid=taskid))def all_api_request(self):# 1. 预处理pre_result = self.prepare_request()taskid = pre_result["data"]# 2 . 分片上传self.upload_request(taskid=taskid, upload_file_path=self.upload_file_path)# 3 . 文件合并self.merge_request(taskid=taskid)# 4 . 获取任务进度while True:# 每隔20秒获取一次任务进度progress = self.get_progress_request(taskid)progress_dic = progressif progress_dic['err_no'] != 0 and progress_dic['err_no'] != 26605:print('task error: ' + progress_dic['failed'])returnelse:data = progress_dic['data']task_status = json.loads(data)if task_status['status'] == 9:print('task ' + taskid + ' finished')breakprint('The task ' + taskid + ' is in processing, task status: ' + str(data))# 每次获取进度间隔20Stime.sleep(20)# 5 . 获取结果res = self.get_result_request(taskid=taskid)# 保存到txtdata = res['data']data = eval(data)return data# 注意:如果出现requests模块报错:"NoneType" object has no attribute 'read', 请尝试将requests模块更新到2.20.0或以上版本(本demo测试版本为2.20.0)
# 输入讯飞开放平台的appid,secret_key和待转写的文件路径
if __name__ == '__main__':# 解析单条语音并保存到xlsapi = RequestApi(appid="", secret_key="",upload_file_path=r"data/record/20220715014532.mp3")xf_result = api.all_api_request()print(xf_result)