Qwen-Omni 系列模型支持输入多种模态的数据,包括视频、音频、图片、文本,并输出文本和音频数据。Qwen-Omni-Realtime是针对实时交互场景部署的Omni服务,支持音频、视频的流式输入和输出功能,并且在推理上进行优化,提供了低延迟的人机交互服务。
本文会根据示例代码,介绍如何接入Qwen-Omni-Realtime服务并进行交互。您将可以完成和演示视频相同的交互体验。
💡说明:在本文中使用Python作为演示语言,完整示例代码地址:Github 中的视频交互示例。 建议您下载代码后亲自运行示例代码。您也可以访问该项目了解Java SDK以及更多的示例。 |
Qwen-Omni多模态大模型
Qwen-Omni是Qwen系列中全新的旗舰级端到端多模态大模型,专为全面的多模式感知设计,无缝处理包括文本、图像、音频和视频在内的各种输入,同时支持流式的文本生成和自然语音合成输出。
模型采用了全新的Thinker-Talker架构,这是一种端到端的多模态模型,旨在支持文本/图像/音频/视频的跨模态理解。关于模型的详细介绍可以访问官方Github仓库。
目前可直接在阿里云百炼平台通过API调用该模型。
https://bailian.console.aliyun.com/?tab=doc#/doc/?type=model&url=2880812
https://bailian.console.aliyun.com/?tab=doc#/doc/?type=model&url=2867839
Omni-Realtime服务介绍
Omni-Realtime服务集成了最新版本的Omni模型,并且在推理上进行优化,部署在百炼大模型服务平台,提供低延迟的人机交互服务API。
Omni-Realtime的服务协议采用和OpenAI的Realtime API相同的协议风格。在协议中有如下概念需要关注:
●会话(session):Omni-Realtime的服务以连接为单位提供服务,每一个连接对应一个会话,在一次会话中可以进行多轮的交互。在全部的交互结束后关闭连接以结束会话。
●云端缓冲区(buffer):在一次对话中,用户的音频、文本和视频输入是以流的方式实时上传的,在对话结束前、已上传的音频和图片将保存在缓冲区中。
●响应(response):响应是指Omni大模型对用户输入的回复。在一次响应中,大模型的音频和文本输出会按照流的方式下发。文本的下发速度会比音频更快。
○在vad模式中,服务端会使用语音活动检测(VAD)算法检测用户的一次提问是否结束,并且自动创建响应。
○在manual模式中,用户可以自己决定何时停止发送音视频、并创建响应。
●打断:响应是可以打断的。在server_vad中,如果用户在响应下发时开始下一轮的提问,将会打断当前的响应。
●对话:从用户开始输入新一轮的音视频开始、到一次响应结束(或打断)是一次对话。
VAD模式交互流程
更多事件名称和介绍请参考Omni-Realtime的服务协议。
Omni-Realtime SDK
我们提供了Java和Python两种语言的官方Omni-Realtime SDK,可以让您更方便的接入服务。有关SDK的安装请参考官网文档,Omni SDK设计理念为保持服务协议的交互逻辑,因此在交互流程上和服务协议完全一致,这里不做过多介绍,具体的接口说明请参考SDK文档。
VAD模式调用
在VAD (语音活动检测) 模式下,使用者可以不关注语音的开始和结束,服务端会通过VAD算法自动检测一次提问的结束并且生成回答。
主线程调用
下面我们正式进入代码环节,下方的示例代码展示了如何通过Python SDK在VAD模式下调用Omni:
#回调 callback = MyCallback() conversation = OmniRealtimeConversation( model='qwen-omni-turbo-realtime-latest', callback=callback, ) conversation.connect() conversation.update_session( output_modalities=[MultiModality.AUDIO, MultiModality.TEXT], voice=voice, input_audio_format=AudioFormat.PCM_16000HZ_MONO_16BIT, output_audio_format=AudioFormat.PCM_24000HZ_MONO_16BIT, enable_input_audio_transcription=True, input_audio_transcription_model='gummy-realtime-v1', enable_turn_detection=True, turn_detection_type='server_vad', ) #多轮交互,在server_vad模式下,服务会自动处理打断,用户可以持续发送静音 while True: # 阻塞的从麦克风接收100ms音频。 audio_data = mic_stream.read(3200, exception_on_overflow=False) record_pcm_file.write(audio_data) audio_b64 = base64.b64encode(audio_data).decode('ascii') # 发送音频帧 conversation.append_audio(audio_b64) video_frame = latest_image.get() # 如果存在新的视频帧则发送。 if video_frame: conversation.append_video(video_frame)
在这个示例代码中包含如下部分:
1.创建连接:我们首先创建了一个带有callback的OmniRealtimeConversation对象,并且通过connect接口和服务端建立连接。在建立连接后,服务端会立刻创建一个会话,并且通过session.created事件下发session的信息、通过session.updated事件下发服务端的默认配置。
2.配置服务:通过update_session接口开启输入语音转写和VAD模式。
3.发送音视频:在循环中持续的发送音频和视频帧(图片)。在VAD模式下,音频和视频流在对话中和对话之间不需要中断。服务端会自动检测语音的开始和结束、触发大模型回复或打断、并过滤回复时用户发送的静音。
回调事件处理
class MyCallback(OmniRealtimeCallback): def on_open(self) -> None: global pya global mic_stream global b64_player print('connection opened, init microphone') pya = pyaudio.PyAudio() mic_stream = pya.open(format=pyaudio.paInt16, channels=1, rate=16000, input=True) b64_player = B64PCMPlayer(pya) def on_close(self, close_status_code, close_msg) -> None: print('connection closed with code: {}, msg: {}, destroy microphone'.format(close_status_code, close_msg)) sys.exit(0) def on_event(self, response: str) -> None: try: global conversation global b64_player type = response['type'] if 'session.created' == type: print('start session: {}'.format(response['session']['id'])) if 'conversation.item.input_audio_transcription.completed' == type: print('question: {}'.format(response['transcript'])) if 'response.audio_transcript.delta' == type: text = response['delta'] print("got llm response delta: {}".format(text)) if 'response.audio.delta' == type: recv_audio_b64 = response['delta'] b64_player.add_data(recv_audio_b64) if 'input_audio_buffer.speech_started' == type: print('======VAD Speech Start======') b64_player.cancel_playing() if 'response.done' == type: print('======RESPONSE DONE======') print('[Metric] response: {}, first text delay: {}, first audio delay: {}'.format( conversation.get_last_response_id(), conversation.get_last_first_text_delay(), conversation.get_last_first_audio_delay(), )) except Exception as e: print('[Error] {}'.format(e)) return
在上面的回调函数中,我们主要关注了六种服务端事件:
●session.created事件:解析session_id,用于后续调试和debug。
●input_audio_buffer.speech_started事件:服务端检测到新一段语音开始,此时如果上一次对话的回复还在播放,则会触发打断。
○当出现打断时:由于语音的生成速度大于播放速度,需要清空本地缓存的音频,我们在播放器中集成了打断操作。
●conversation.item.input_audio_transcription.completed事件:获取输入语音的转写文本并打印。
●response.audio_transcript.delta事件:获取服务端流式下发的大模型回复文本帧并打印。
●response.audio.delta事件:获取服务端流式下发的大模型回复音频帧并播放。
●response.done事件:大模型一次回复结束,在这里打印了本地统计的延迟等信息。需要注意,由于语音的生成速度大于播放速度,因此大模型回复结束之后本地的音频播放没有结束。
音视频采集
音频采集
示例项目使用了pyaudio作为音频采集器,按照100ms一帧的大小阻塞采集麦克风的输入音频。需要注意pyaudio是线程不安全的,因此在多线程调用时需要加锁。
视频采集
def start_receive_image(): async def handler(websocket, path): print('连接到前端摄像头') async for message in websocket: if isinstance(message, bytes): latest_image.set(base64.b64encode(message).decode('ascii')) print('前端摄像头连接结束') async def run_server(): async with websockets.serve(handler, "localhost", 5000): print("WebSocket server started on ws://localhost:5000/video") await asyncio.Future() # 保持服务运行 asyncio.run(run_server())
在示例项目中,我们演示了如何从网络上获取视频帧并编码为base64字符串。需要注意的是,omni-realtime推荐的视频输入帧率为1 ffps,并且每张图片的大小不超过500KB。
我们使用浏览器采集视频并且通过websocket实时发送给python程序。
流式播放器
import contextlib import time import pyaudio import threading import queue import base64 class B64PCMPlayer: def __init__(self, pya: pyaudio.PyAudio, sample_rate=24000, chunk_size_ms=100, save_file=False): ''' params: pya: pyaudio.PyAudio sample_rate: int, sample rate of audio chunk_size_ms: int, chunk size of audio in milliseconds, this will effect cancel latency ''' self.pya = pya self.sample_rate = sample_rate self.chunk_size_bytes = chunk_size_ms * sample_rate *2 // 1000 self.player_stream = pya.open(format=pyaudio.paInt16, channels=1, rate=sample_rate, output=True) self.raw_audio_buffer: queue.Queue = queue.Queue() self.b64_audio_buffer: queue.Queue = queue.Queue() self.status_lock = threading.Lock() self.status = 'playing' self.decoder_thread = threading.Thread(target=self.decoder_loop) self.player_thread = threading.Thread(target=self.player_loop) self.decoder_thread.start() self.player_thread.start() self.complete_event: threading.Event = None self.save_file = save_file if self.save_file: self.out_file = open('result.pcm', 'wb') def decoder_loop(self): while self.status != 'stop': recv_audio_b64 = None with contextlib.suppress(queue.Empty): recv_audio_b64 = self.b64_audio_buffer.get(timeout=0.1) if recv_audio_b64 is None: continue recv_audio_raw = base64.b64decode(recv_audio_b64) # push raw audio data into queue by chunk for i in range(0, len(recv_audio_raw), self.chunk_size_bytes): chunk = recv_audio_raw[i:i + self.chunk_size_bytes] self.raw_audio_buffer.put(chunk) if self.save_file: self.out_file.write(chunk) def player_loop(self): while self.status != 'stop': recv_audio_raw = None with contextlib.suppress(queue.Empty): recv_audio_raw = self.raw_audio_buffer.get(timeout=0.1) if recv_audio_raw is None: if self.complete_event: self.complete_event.set() continue # write chunk to pyaudio audio player, wait until finish playing this chunk. self.player_stream.write(recv_audio_raw) def cancel_playing(self): self.b64_audio_buffer.queue.clear() self.raw_audio_buffer.queue.clear() def add_data(self, data): self.b64_audio_buffer.put(data) def wait_for_complete(self): self.complete_event = threading.Event() self.complete_event.wait() self.complete_event = None def shutdown(self): self.status = 'stop' self.decoder_thread.join() self.player_thread.join() self.player_stream.close() if self.save_file: self.out_file.close()
我们提供了一种Python中基于pyaudio的播放器实现参考。在播放器中:
1.我们使用了pyaudio作为播放器,pyaudio可以阻塞播放pcm格式的音频。
2.我们将解码和播放分开不同的线程,避免解码带来的播放卡顿:
a.有新音频时,放入编码队列中。
b.生产者线程:将编码队列中的base64音频解码为pcm采样点序列,并且按照最大chunk=100ms切割放入音频队列。
c.消费者线程:将音频队列中的音频按照chunk播放。生产者和消费者都采用阻塞获取数据,最大等待100ms避免忙等待。
3.当出现打断事件时,清空编码队列和音频队列,此时播放器中最多还存有chunk=100ms的音频,因此打断延迟最大为100ms。
总结
在本文中,介绍了基于Qwen-Omni-Realtime SDK的多模态服务接入方案,通过WebSocket协议实现低延迟的实时音视频交互,并构建支持对话打断的示例程序。由于音视频采集、播放器等组件的集成,项目复杂度较高,本文旨在帮助开发者快速掌握基础交互流程,为构建复杂系统提供技术参考。
针对服务接入过程中可能遇到的问题,开发者可通过加入语音团队开发者交流群获取支持。同时,欢迎将基于Omni模型的创新应用场景(如多模型融合方案)通过PR提交至开源示例Github代码库,共同完善具备技术深度与实用价值的Gallery工程案例。
🏀如果想要了解更多通义大模型的模型详细信息以及直接进入体验,可以点击🔗https://www.aliyun.com/product/tongyi直接进入查看和体验哦~~
如果需要进一步了解阿里云百炼的,可直接点击下面链接哦:
👉阿里云百炼详情了解可点击此官网链接:阿里云百炼官网介绍
👉阿里云百炼控制台页面可点击此链接直接进入:阿里云百炼控制台