基于通义多模态大模型的实时音视频交互

简介: Qwen-Omni是通义千问系列的全新多模态大模型,支持文本、图像、音频和视频的输入,并输出文本和音频。Omni-Realtime服务针对实时交互场景优化,提供低延迟的人机交互体验。

Qwen-Omni 系列模型支持输入多种模态的数据,包括视频、音频、图片、文本,并输出文本和音频数据。Qwen-Omni-Realtime是针对实时交互场景部署的Omni服务,支持音频、视频的流式输入和输出功能,并且在推理上进行优化,提供了低延迟的人机交互服务。

本文会根据示例代码,介绍如何接入Qwen-Omni-Realtime服务并进行交互。您将可以完成和演示视频相同的交互体验。

💡说明:在本文中使用Python作为演示语言,完整示例代码地址Github 中的视频交互示例

建议您下载代码后亲自运行示例代码。您也可以访问该项目了解Java SDK以及更多的示例。

Qwen-Omni多模态大模型

Qwen-Omni是Qwen系列中全新的旗舰级端到端多模态大模型,专为全面的多模式感知设计,无缝处理包括文本、图像、音频和视频在内的各种输入,同时支持流式的文本生成和自然语音合成输出。



模型采用了全新的Thinker-Talker架构,这是一种端到端的多模态模型,旨在支持文本/图像/音频/视频的跨模态理解。关于模型的详细介绍可以访问官方Github仓库。

目前可直接在阿里云百炼平台通过API调用该模型。

https://bailian.console.aliyun.com/?tab=doc#/doc/?type=model&url=2880812

https://bailian.console.aliyun.com/?tab=doc#/doc/?type=model&url=2867839

Omni-Realtime服务介绍

Omni-Realtime服务集成了最新版本的Omni模型,并且在推理上进行优化,部署在百炼大模型服务平台,提供低延迟的人机交互服务API。

Omni-Realtime的服务协议采用和OpenAI的Realtime API相同的协议风格。在协议中有如下概念需要关注:

会话(session):Omni-Realtime的服务以连接为单位提供服务,每一个连接对应一个会话,在一次会话中可以进行多轮的交互。在全部的交互结束后关闭连接以结束会话。

云端缓冲区(buffer):在一次对话中,用户的音频、文本和视频输入是以流的方式实时上传的,在对话结束前、已上传的音频和图片将保存在缓冲区中。

响应(response):响应是指Omni大模型对用户输入的回复。在一次响应中,大模型的音频和文本输出会按照流的方式下发。文本的下发速度会比音频更快。

    ○在vad模式中,服务端会使用语音活动检测(VAD)算法检测用户的一次提问是否结束,并且自动创建响应。

   ○在manual模式中,用户可以自己决定何时停止发送音视频、并创建响应。

打断:响应是可以打断的。在server_vad中,如果用户在响应下发时开始下一轮的提问,将会打断当前的响应。

对话:从用户开始输入新一轮的音视频开始、到一次响应结束(或打断)是一次对话。

VAD模式交互流程

更多事件名称和介绍请参考Omni-Realtime的服务协议

Omni-Realtime SDK

我们提供了Java和Python两种语言的官方Omni-Realtime SDK,可以让您更方便的接入服务。有关SDK的安装请参考官网文档,Omni SDK设计理念为保持服务协议的交互逻辑,因此在交互流程上和服务协议完全一致,这里不做过多介绍,具体的接口说明请参考SDK文档



VAD模式调用

在VAD (语音活动检测) 模式下,使用者可以不关注语音的开始和结束,服务端会通过VAD算法自动检测一次提问的结束并且生成回答。

主线程调用

下面我们正式进入代码环节,下方的示例代码展示了如何通过Python SDK在VAD模式下调用Omni:

#回调
callback = MyCallback()

conversation = OmniRealtimeConversation(
    model='qwen-omni-turbo-realtime-latest',
    callback=callback, 
    )

conversation.connect()

conversation.update_session(
    output_modalities=[MultiModality.AUDIO, MultiModality.TEXT],
    voice=voice,
    input_audio_format=AudioFormat.PCM_16000HZ_MONO_16BIT,
    output_audio_format=AudioFormat.PCM_24000HZ_MONO_16BIT,
    enable_input_audio_transcription=True,
    input_audio_transcription_model='gummy-realtime-v1',
    enable_turn_detection=True,
    turn_detection_type='server_vad',
)

#多轮交互,在server_vad模式下,服务会自动处理打断,用户可以持续发送静音
while True:
    # 阻塞的从麦克风接收100ms音频。
    audio_data = mic_stream.read(3200, exception_on_overflow=False)
    record_pcm_file.write(audio_data)
    audio_b64 = base64.b64encode(audio_data).decode('ascii')
    # 发送音频帧
    conversation.append_audio(audio_b64)
    video_frame = latest_image.get()
    # 如果存在新的视频帧则发送。
    if video_frame:
        conversation.append_video(video_frame)

在这个示例代码中包含如下部分:

1.创建连接:我们首先创建了一个带有callback的OmniRealtimeConversation对象,并且通过connect接口和服务端建立连接。在建立连接后,服务端会立刻创建一个会话,并且通过session.created事件下发session的信息、通过session.updated事件下发服务端的默认配置。

2.配置服务:通过update_session接口开启输入语音转写和VAD模式。

3.发送音视频:在循环中持续的发送音频和视频帧(图片)。在VAD模式下,音频和视频流在对话中和对话之间不需要中断。服务端会自动检测语音的开始和结束、触发大模型回复或打断、并过滤回复时用户发送的静音。

回调事件处理

class MyCallback(OmniRealtimeCallback):
    def on_open(self) -> None:
        global pya
        global mic_stream
        global b64_player
        print('connection opened, init microphone')
        pya = pyaudio.PyAudio()
        mic_stream = pya.open(format=pyaudio.paInt16,
                        channels=1,
                        rate=16000,
                        input=True)
        b64_player = B64PCMPlayer(pya)

    def on_close(self, close_status_code, close_msg) -> None:
        print('connection closed with code: {}, msg: {}, destroy microphone'.format(close_status_code, close_msg))
        sys.exit(0)

    def on_event(self, response: str) -> None:
        try:
            global conversation
            global b64_player
            type = response['type']
            if 'session.created' == type:
                print('start session: {}'.format(response['session']['id']))
            if 'conversation.item.input_audio_transcription.completed' == type:
                print('question: {}'.format(response['transcript']))
            if 'response.audio_transcript.delta' == type:
                text = response['delta']
                print("got llm response delta: {}".format(text))
            if 'response.audio.delta' == type:
                recv_audio_b64 = response['delta']
                b64_player.add_data(recv_audio_b64)
            if 'input_audio_buffer.speech_started' == type:
                print('======VAD Speech Start======')
                b64_player.cancel_playing()
            if 'response.done' == type:
                print('======RESPONSE DONE======')
                print('[Metric] response: {}, first text delay: {}, first audio delay: {}'.format(
                                conversation.get_last_response_id(), 
                                conversation.get_last_first_text_delay(), 
                                conversation.get_last_first_audio_delay(),
                                ))
        except Exception as e:
            print('[Error] {}'.format(e))
            return

在上面的回调函数中,我们主要关注了六种服务端事件:

session.created事件:解析session_id,用于后续调试和debug。

input_audio_buffer.speech_started事件:服务端检测到新一段语音开始,此时如果上一次对话的回复还在播放,则会触发打断。

   ○当出现打断时:由于语音的生成速度大于播放速度,需要清空本地缓存的音频,我们在播放器中集成了打断操作。

conversation.item.input_audio_transcription.completed事件:获取输入语音的转写文本并打印。

response.audio_transcript.delta事件:获取服务端流式下发的大模型回复文本帧并打印。

response.audio.delta事件:获取服务端流式下发的大模型回复音频帧并播放。

response.done事件:大模型一次回复结束,在这里打印了本地统计的延迟等信息。需要注意,由于语音的生成速度大于播放速度,因此大模型回复结束之后本地的音频播放没有结束。

音视频采集

音频采集

示例项目使用了pyaudio作为音频采集器,按照100ms一帧的大小阻塞采集麦克风的输入音频。需要注意pyaudio是线程不安全的,因此在多线程调用时需要加锁。

视频采集

def start_receive_image():
    async def handler(websocket, path):
        print('连接到前端摄像头')
        async for message in websocket:
            if isinstance(message, bytes):
                latest_image.set(base64.b64encode(message).decode('ascii'))
        print('前端摄像头连接结束')

    async def run_server():
        async with websockets.serve(handler, "localhost", 5000):
            print("WebSocket server started on ws://localhost:5000/video")
            await asyncio.Future()  # 保持服务运行

    asyncio.run(run_server())

在示例项目中,我们演示了如何从网络上获取视频帧并编码为base64字符串。需要注意的是,omni-realtime推荐的视频输入帧率为1 ffps,并且每张图片的大小不超过500KB

我们使用浏览器采集视频并且通过websocket实时发送给python程序。

流式播放器

import contextlib
import time
import pyaudio
import threading
import queue
import base64

class B64PCMPlayer:
    def __init__(self, pya: pyaudio.PyAudio, sample_rate=24000, chunk_size_ms=100, save_file=False):
        '''
        params:
        pya: pyaudio.PyAudio
        sample_rate: int, sample rate of audio
        chunk_size_ms: int, chunk size of audio in milliseconds, this will effect cancel latency
        '''

        self.pya = pya
        self.sample_rate = sample_rate
        self.chunk_size_bytes = chunk_size_ms * sample_rate *2 // 1000
        self.player_stream = pya.open(format=pyaudio.paInt16,
                channels=1,
                rate=sample_rate,
                output=True)

        self.raw_audio_buffer: queue.Queue = queue.Queue()
        self.b64_audio_buffer: queue.Queue = queue.Queue()
        self.status_lock = threading.Lock()
        self.status = 'playing'
        self.decoder_thread = threading.Thread(target=self.decoder_loop)
        self.player_thread = threading.Thread(target=self.player_loop)
        self.decoder_thread.start()
        self.player_thread.start()
        self.complete_event: threading.Event = None
        self.save_file = save_file
        if self.save_file:
            self.out_file = open('result.pcm', 'wb')

    def decoder_loop(self):
        while self.status != 'stop':
            recv_audio_b64 = None
            with contextlib.suppress(queue.Empty):
                recv_audio_b64 = self.b64_audio_buffer.get(timeout=0.1)
            if recv_audio_b64 is None:
                continue
            recv_audio_raw = base64.b64decode(recv_audio_b64)
            # push raw audio data into queue by chunk
            for i in range(0, len(recv_audio_raw), self.chunk_size_bytes):
                chunk = recv_audio_raw[i:i + self.chunk_size_bytes]
                self.raw_audio_buffer.put(chunk)
                if self.save_file:
                    self.out_file.write(chunk)

    def player_loop(self):
        while self.status != 'stop':
            recv_audio_raw = None
            with contextlib.suppress(queue.Empty):
                recv_audio_raw = self.raw_audio_buffer.get(timeout=0.1)
            if recv_audio_raw is None:
                if self.complete_event:
                    self.complete_event.set()
                continue
            # write chunk to pyaudio audio player, wait until finish playing this chunk.
            self.player_stream.write(recv_audio_raw)

    def cancel_playing(self):
        self.b64_audio_buffer.queue.clear()
        self.raw_audio_buffer.queue.clear()

    def add_data(self, data):
        self.b64_audio_buffer.put(data)

    def wait_for_complete(self):
        self.complete_event = threading.Event()
        self.complete_event.wait()
        self.complete_event = None

    def shutdown(self):
        self.status = 'stop'
        self.decoder_thread.join()
        self.player_thread.join()
        self.player_stream.close()
        if self.save_file:
            self.out_file.close()

我们提供了一种Python中基于pyaudio的播放器实现参考。在播放器中:

1.我们使用了pyaudio作为播放器,pyaudio可以阻塞播放pcm格式的音频。

2.我们将解码和播放分开不同的线程,避免解码带来的播放卡顿:

     a.有新音频时,放入编码队列中。

     b.生产者线程:将编码队列中的base64音频解码为pcm采样点序列,并且按照最大chunk=100ms切割放入音频队列。

c.消费者线程:将音频队列中的音频按照chunk播放。生产者和消费者都采用阻塞获取数据,最大等待100ms避免忙等待。

3.当出现打断事件时,清空编码队列和音频队列,此时播放器中最多还存有chunk=100ms的音频,因此打断延迟最大为100ms。

总结

在本文中,介绍了基于Qwen-Omni-Realtime SDK的多模态服务接入方案,通过WebSocket协议实现低延迟的实时音视频交互,并构建支持对话打断的示例程序。由于音视频采集、播放器等组件的集成,项目复杂度较高,本文旨在帮助开发者快速掌握基础交互流程,为构建复杂系统提供技术参考。

针对服务接入过程中可能遇到的问题,开发者可通过加入语音团队开发者交流群获取支持。同时,欢迎将基于Omni模型的创新应用场景(如多模型融合方案)通过PR提交至开源示例Github代码库,共同完善具备技术深度与实用价值的Gallery工程案例。

 image.png


🏀如果想要了解更多通义大模型的模型详细信息以及直接进入体验,可以点击🔗https://www.aliyun.com/product/tongyi直接进入查看和体验哦~~

如果需要进一步了解阿里云百炼的,可直接点击下面链接哦:

👉阿里云百炼详情了解可点击此官网链接:阿里云百炼官网介绍

👉阿里云百炼控制台页面可点击此链接直接进入阿里云百炼控制台


相关文章
|
2月前
|
存储 人工智能 NoSQL
AI大模型应用实践 八:如何通过RAG数据库实现大模型的私有化定制与优化
RAG技术通过融合外部知识库与大模型,实现知识动态更新与私有化定制,解决大模型知识固化、幻觉及数据安全难题。本文详解RAG原理、数据库选型(向量库、图库、知识图谱、混合架构)及应用场景,助力企业高效构建安全、可解释的智能系统。
|
3月前
|
人工智能 Java API
AI 超级智能体全栈项目阶段一:AI大模型概述、选型、项目初始化以及基于阿里云灵积模型 Qwen-Plus实现模型接入四种方式(SDK/HTTP/SpringAI/langchain4j)
本文介绍AI大模型的核心概念、分类及开发者学习路径,重点讲解如何选择与接入大模型。项目基于Spring Boot,使用阿里云灵积模型(Qwen-Plus),对比SDK、HTTP、Spring AI和LangChain4j四种接入方式,助力开发者高效构建AI应用。
1758 122
AI 超级智能体全栈项目阶段一:AI大模型概述、选型、项目初始化以及基于阿里云灵积模型 Qwen-Plus实现模型接入四种方式(SDK/HTTP/SpringAI/langchain4j)
|
2月前
|
机器学习/深度学习 人工智能 人机交互
当AI学会“看”和“听”:多模态大模型如何重塑人机交互
当AI学会“看”和“听”:多模态大模型如何重塑人机交互
401 121
|
2月前
|
数据采集 人工智能 搜索推荐
智能新纪元:多模态大模型如何重塑人机交互
智能新纪元:多模态大模型如何重塑人机交互
263 113
|
2月前
|
人工智能 人机交互 知识图谱
当AI学会“融会贯通”:多模态大模型如何重塑未来
当AI学会“融会贯通”:多模态大模型如何重塑未来
314 114
|
2月前
|
人工智能 安全 搜索推荐
当AI学会“看”和“听”:多模态大模型如何重塑人机交互
当AI学会“看”和“听”:多模态大模型如何重塑人机交互
297 117
|
3月前
|
机器学习/深度学习 数据采集 人工智能
通义实验室Mobile-Agent-v3开源,全平台SOTA的GUI智能体,支持手机电脑等多平台交互
近日,通义实验室MobileAgent团队正式开源全新图形界面交互基础模型 GUI-Owl,并同步推出支持多智能体协同的自动化框架 Mobile-Agent-v3。该模型基于Qwen2.5-VL打造,在手机端与电脑端共8个GUI任务榜单中全面刷新开源模型性能纪录,达成全平台SOTA。
1192 2
|
3月前
|
机器学习/深度学习 人工智能 自然语言处理
AI Compass前沿速览:Qwen3-Max、Mixboard、Qwen3-VL、Audio2Face、Vidu Q2 AI视频生成模型、Qwen3-LiveTranslate-全模态同传大模型
AI Compass前沿速览:Qwen3-Max、Mixboard、Qwen3-VL、Audio2Face、Vidu Q2 AI视频生成模型、Qwen3-LiveTranslate-全模态同传大模型
725 13
AI Compass前沿速览:Qwen3-Max、Mixboard、Qwen3-VL、Audio2Face、Vidu Q2 AI视频生成模型、Qwen3-LiveTranslate-全模态同传大模型
|
3月前
|
机器学习/深度学习 编解码 人工智能
InternVL3.5多模态大模型开源发布,1B-241B九种尺寸,支持跨平台GUI自动化与矢量图生成
近日,上海人工智能实验室(上海AI实验室)重磅开源发布了多模态大模型书生·万象 InternVL3.5,通过创新的级联式强化学习(Cascade RL)、动态视觉分辨率路由与解耦部署架构,实现推理能力、部署效率与通用能力的全面升级。
1177 7