#!/usr/bin/env python # (c) 2015--2018 NovoLanguage, author: David A. van Leeuwen ## Recognition interface for actual backend. Adapted from player.asr.debug. import json import sys import wave import requests import websocket import logging import collections import time from .. import asr logger = logging.getLogger(__name__) ## turn off annoying warnings requests.packages.urllib3.disable_warnings() logging.getLogger("requests.packages.urllib3.connectionpool").setLevel(logging.WARN) buffer_size = 4096 gm = "gm.novolanguage.com" ## dev protocol = "https" port = 443 apiversion = 0 sessions = collections.Counter() def segmentation(result): """converts a raw backend recognition result to a segment of novo.asr.segments class Segmentation""" for w in result: w["score"] = w["confidence"]["prob"] w["llh"] = w["confidence"]["llr"] w["label"] = w["label"]["raw"] w["begin"] /= 10 w["end"] /= 10 for p in w["phones"]: p["score"] = p["confidence"]["prob"] p["llh"] = p["confidence"]["llr"] p["begin"] /= 10 p["end"] /= 10 return asr.segments.Segmentation(result) class rpcid: id = 0 @staticmethod def next(): rpcid.id += 1 return rpcid.id class Recognizer(object): def __init__(self, lang="en", gm=gm, grammar_version="0.1", user=None, password=None, snodeid=None, keepopen=False): self.lang = lang self.keepopen = keepopen self.api_url = "%s://%s:%d/v%d" % (protocol, gm, port, apiversion) self.verify = False self.headers = {"Content-Type": "application/json"} self.login_user(user, password) data = {"l2": lang, "local": False, "skipupload": True} if snodeid: data["snodeid"] = snodeid self.conn = None self.init_session(data) self.grammar_version = grammar_version self.last_message = None def login_user(self, username, password): # obtain authentication token of user logger.info('obtain auth token at %s', self.api_url) data = { 'username': username, 'password': password } try: r = requests.post(self.api_url + '/publishers/1/login', headers=self.headers, data=json.dumps(data), verify=self.verify) except Exception as e: logger.error("Cannot post request to GM API for user login: %s", e.message) sys.exit(-1) assert r.ok, r.reason result = r.json() if "errors" in result["response"]: logger.info("Error in logging in: %s", result["response"]["errors"]) sys.exit(-1) user_auth_token = result['response']['user']['authentication_token'] logger.info("User auth token is: %s", user_auth_token) # set auth token in header self.headers['Authentication-Token'] = user_auth_token def init_session(self, data, direct=False, use_ip=False): logger.info('Request new session: %s', data) r = requests.post(self.api_url + '/sessions', headers=self.headers, data=json.dumps(data), verify=self.verify) if not r.ok: logger.error("New session request failed: %s", r.text) return status_url = r.headers.get("location") if status_url: ## we got a redirect status = {} while True: logger.debug("Checking %s", status_url) s = requests.get(status_url, verify=self.verify) if not s.ok: logger.error('Checking Failed: %s', s.text) return status = s.json() if status['status'] == 'PENDING': logger.debug("Status: %s", status['status']) time.sleep(1) else: break session = status['result'][0] ## [1] is another status code... if "error" in session: logger.error("Error in getting a snode: %s", session["error"]) raise Exception else: session = r.json() try: logger.info("Session: %r", session) if direct: snode_ip = session["snode"]["ip"] proxy_url = snode_ip snode_port = session["port"] ws_url = "%s://%s:%d/" % ("ws", snode_ip, snode_port) else: field = "ip" if use_ip else "hostname" proxy_url = session['snode']['datacentre']['proxy'][field] ws_url = 'wss://' + proxy_url + '/' + session['uuid'] logger.info("Connecting to websocket: %s", ws_url) conn = websocket.create_connection(ws_url, sslopt={"check_hostname": self.verify}) logger.info("Connected.") #except Exception, e: except Exception as e: logger.error("Unable to connect to websocket: %s", e.message) raise e self.session_id = session['id'] self.proxy_url = proxy_url self.conn = conn self.session = session sessions[session["uuid"]] += 1 def setgrammar(self, grammar): ## backend grammar object: {"data": {...}, "type": "confusion_network"} data = {"jsonrpc": "2.0", 'type': 'jsonrpc', 'method': 'set_grammar', 'params': grammar, "id": rpcid.next()} asr.spraaklab.schema.validate_rpc_grammar(grammar) self.conn.send(json.dumps(data)) result = json.loads(self.conn.recv()) if result.get("error"): logger.error("Exercise validation error: %s", result) return result def set_alternatives_grammar(self, *args, **kwargs): if not "version" in kwargs: kwargs["version"] = self.grammar_version return self.setgrammar(alternatives_grammar(*args, **kwargs)) def recognize_wav(self, wavf): w = wave.open(wavf, 'r') nchannels, sampwidth, framerate, nframes, comptype, compname = w.getparams() if nchannels > 1: logging.error("Please use .wav with only 1 channel, found %d channels in %s", nchannels, wavf) return if (sampwidth != 2): logging.error("Please use .wav with 2-byte PCM data, found %d bytes in %s", sampwidth, wavf) return if (framerate != 16000.0): logging.error("Please use .wav sampled at 16000 Hz, found %1.0f in %s", framerate, wavf) return if (comptype != 'NONE'): logging.error("Please use .wav with uncompressed data, found %s in %s", compname, wavf) return buf = w.readframes(nframes) w.close() return self.recognize_data(buf) def recognize_data(self, buf): nbytes_sent = 0 start = time.time() for j in range(0, len(buf), buffer_size): #audio_packet = str(buf[j:j + buffer_size]) audio_packet = buf[j:j + buffer_size] nbytes_sent += len(audio_packet) self.conn.send_binary(audio_packet) self.conn.send(json.dumps({"jsonrpc": "2.0", "method": "get_result", "id": rpcid.next()})) logger.info("Waiting for recognition result...") self.last_message = self.conn.recv() ## keep result for the interested applications message = json.loads(self.last_message) dur = time.time() - start logger.info("Recognition took %5.3f seconds", dur) if "error" in message: raise RuntimeError("Error from recognition backend: %r" % message.get("error")) return segmentation(message["result"]["words"]) def recognize_url(self, url): start = time.time() data = json.dumps({"jsonrpc": "2.0", "method": "send_audio", "id": rpcid.next(), "params": {"type": "url", "data": url, "details": ["word", "utterance"]}}) self.conn.send(data) logger.info("Waiting for recognition result...") self.last_message = self.conn.recv() ## keep result for the interested applications #print self.last_message print(self.last_message) message = json.loads(self.last_message) dur = time.time() - start logger.info("Recognition took %5.3f seconds", dur) if "error" in message: raise RuntimeError("Error from recognition backend: %r" % message.get("error")) return segmentation(message["result"]["words"]) def __del__(self): sessions[self.session["uuid"]] -= 1 if self.conn and sessions[self.session["uuid"]] <= 0: self.conn.close() url = self.api_url + '/sessions/%d' % self.session_id if self.keepopen: logger.info("Keeping session open...") else: logger.info("Closing session: %s", url) r = requests.delete(url, headers=self.headers, verify=self.verify) assert r.ok, r.reason def alternatives_grammar(parts, version="0.1", ret=None): """Make a grammar of alternatives, as array(sequence)-of-array(alternatives)-of-strings""" r = {"type": "confusion_network", "version": version} if version=="0.1": r["data"] = {"type": "multiple_choice", "parts": parts} if isinstance(ret, list) and "dict" in ret: r["return_dict"] = True elif version=="1.0": seqels = [] for part in parts: altels = [] for alt in part: words = alt.split(" ") if len(words) > 1: alt = {"kind": "sequence", "elements": words} altels.append(alt) seqels.append({"kind": "alternatives", "elements": altels}) r["data"] = {"kind": "sequence", "elements": seqels} if isinstance(ret, list): r["return_objects"] = ret else: raise ValueError("Unsupported version: %s" % version) asr.spraaklab.schema.validate_rpc_grammar(r) return r