改进Android语音对讲系统的方法

发布时间:2020-07-06 07:58:45 作者:xsster
来源:网络 阅读:615

本文主要包括以下内容:

  1. 通过生产者-消费者模式保证数据链路的鲁棒性

  2. 改进音频录制及播放,提高语音通信质量

  3. 采用多播实现设备发现及跨路由通信

  4. 实现对讲进程与UI进程的通信(AIDL)

一、通过生产者-消费者模式保证数据链路的鲁棒性

1. 从责任链到生产者-消费者

在《实时Android语音对讲系统架构》对语音对讲系统的数据链路的分析中提到,数据包要经过Record、Encoder、Transmission、Decoder、Play这一链条的处理,这种数据流转就是对讲机核心抽象,鉴于这种场景,采用了责任链设计模式。

在后续实践中发现这样的结构存在一些问题,责任链模式适用于数据即时流转,需要整个链路没有阻塞、等待。而在本应用场景中,编解码及录制播放均可能存在时间延迟,责任链模式无法兼顾网络、编解码的延时。

事实上,通过缓存队列则可以保证数据链路的稳定性,分别在编解码和数据发送接收时加入阻塞队列,可以实现数据包的缓冲,同时降低丢包的可能。因此,在本系统场景下,基于阻塞队列实现了生产者-消费者模式,是对责任链模式的优化,意在提高数据链路的鲁棒性。

2. 基于阻塞队列实现生产者-消费者模式

本节包括以下内容:

阻塞队列(数据结构)

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:

阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

阻塞队列提供了四种处理方法:

方法\处理方式抛出异常返回特殊值一直阻塞超时退出
插入方法add(e)offer(e)put(e)offer(e,time,unit)
移除方法remove()poll()take()poll(time,unit)
检查方法element()peek()不可用不可用

本文通过LinkedBlockingQueue的put和take方法实现线程阻塞。LinkedBlockingQueue是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。

首先看下LinkedBlockingQueue中核心的域:

static class Node<E> {
    E item;
    Node<E> next;
    Node(E x) { item = x; }
}private final int capacity;private final AtomicInteger count = new AtomicInteger();transient Node<E> head;private transient Node<E> last;private final ReentrantLock takeLock = new ReentrantLock();private final Condition notEmpty = takeLock.newCondition();private final ReentrantLock putLock = new ReentrantLock();private final Condition notFull = putLock.newCondition();

其次,LinkedBlockingQueue有三个构造方法,分别如下:

public LinkedBlockingQueue() {    this(Integer.MAX_VALUE);
}public LinkedBlockingQueue(int capacity) {    if (capacity <= 0) throw new IllegalArgumentException();    this.capacity = capacity;
    last = head = new Node<E>(null);
}public LinkedBlockingQueue(Collection<? extends E> c) {    this(Integer.MAX_VALUE);
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); // Never contended, but necessary for visibility
    try {        int n = 0;        for (E e : c) {            if (e == null)                throw new NullPointerException();            if (n == capacity)                throw new IllegalStateException("Queue full");
            enqueue(new Node<E>(e));
            ++n;
        }
        count.set(n);
    } finally {
        putLock.unlock();
    }
}

默认构造函数直接调用LinkedBlockingQueue(int capacity),LinkedBlockingQueue(int capacity)会初始化首尾节点,并置位null。LinkedBlockingQueue(Collection<? extends E> c)在初始化队列的同时,将一个集合的全部元素加入队列。

最后,重点分析下put和take的过程:

public void put(E e) throws InterruptedException {    if (e == null) throw new NullPointerException();
    int c = -1;    Node<E> node = new Node<E>(e);    final ReentrantLock putLock = this.putLock;    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();    try {        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);        c = count.getAndIncrement();        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }    if (c == 0)
        signalNotEmpty();
}
public E take() throws InterruptedException {    E x;
    int c = -1;    final AtomicInteger count = this.count;    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();    try {        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();        c = count.getAndDecrement();        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }    if (c == capacity)
        signalNotFull();    return x;
}

之所以把put和take放在一起,是因为它们是一对互逆的过程:

LinkedBlockingQueue在入队列和出队列时使用的是不同的Lock,这也意味着它们之间的操作不会存在互斥。在多个CPU的情况下,可以做到在同一时刻既消费、又生产,做到并行处理。

阻塞队列实现生产者-消费者模式

通过对LinkedBlockingQueue主要源码的分析,实现生产者-消费者模式就变得简单了。

public class MessageQueue {

    private static MessageQueue messageQueue1, messageQueue2, messageQueue3, messageQueue4;    private BlockingQueue<AudioData> audioDataQueue = null;    private MessageQueue() {
        audioDataQueue = new LinkedBlockingQueue<>();
    }    @Retention(SOURCE)    @IntDef({ENCODER_DATA_QUEUE, SENDER_DATA_QUEUE, DECODER_DATA_QUEUE, TRACKER_DATA_QUEUE})    public @interface DataQueueType {
    }    public static final int ENCODER_DATA_QUEUE = 0;    public static final int SENDER_DATA_QUEUE = 1;    public static final int DECODER_DATA_QUEUE = 2;    public static final int TRACKER_DATA_QUEUE = 3;    public static MessageQueue getInstance(@DataQueueType int type) {        switch (type) {            case ENCODER_DATA_QUEUE:
                if (messageQueue1 == null) {
                    messageQueue1 = new MessageQueue();
                }                return messageQueue1;            case SENDER_DATA_QUEUE:
                if (messageQueue2 == null) {
                    messageQueue2 = new MessageQueue();
                }                return messageQueue2;            case DECODER_DATA_QUEUE:
                if (messageQueue3 == null) {
                    messageQueue3 = new MessageQueue();
                }                return messageQueue3;            case TRACKER_DATA_QUEUE:
                if (messageQueue4 == null) {
                    messageQueue4 = new MessageQueue();
                }                return messageQueue4;            default:
                return new MessageQueue();
        }
    }    public void put(AudioData audioData) {        try {
            audioDataQueue.put(audioData);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }    public AudioData take() {        try {            return audioDataQueue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }        return null;
    }
}

这里通过@IntDef来实现限定输入类型的功能,同时,阻塞队列保持单实例,然后将队列分别应用到各个生产者-消费者线程中。在本文的语音对讲系统中,以音频录制线程和编码线程为例,录制线程是音频数据包的生产者,编码线程是音频数据包的消费者。

音频录制线程:

@Overridepublic void run() {    while (isRecording) {        if (audioRecord.getRecordingState() == AudioRecord.RECORDSTATE_STOPPED) {
            audioRecord.startRecording();
        }        // 实例化音频数据缓冲
        short[] rawData = new short[inAudioBufferSize];
        audioRecord.read(rawData, 0, inAudioBufferSize);
        AudioData audioData = new AudioData(rawData);
        MessageQueue.getInstance(MessageQueue.ENCODER_DATA_QUEUE).put(audioData);
    }
}

编码线程:

@Overridepublic void run() {    AudioData data;
    // 在MessageQueue为空时,take方法阻塞
    while ((data = MessageQueue.getInstance(MessageQueue.ENCODER_DATA_QUEUE).take()) != null) {        data.setEncodedData(AudioDataUtil.raw2spx(data.getRawData()));        MessageQueue.getInstance(MessageQueue.SENDER_DATA_QUEUE).put(data);
    }}

同样的,编码线程和发送线程,接收线程和解码线程,解码线程和播放线程同样存在生产者-消费者的关系。

二、改进音频录制及播放,提高语音通信质量

目前的语音通信质量,个人感觉仍然需要继续优化,如果您有这方面的经验(包括但不限于Java层和Speex音频处理),不吝赐教!

三、采用多播实现设备发现及跨路由通信

《通过UDP广播实现Android局域网Peer Discovering》中从编程的角度说明了TCP与UDP的区别,主要分析了TCP是面向连接的、可靠的服务,建立连接需要经过三次握手、销毁连接需要四次挥手;UDP是无连接的传输层协议,提供面向事务的简单不可靠信息传送服务。

IP地址分为三类:单播、广播和多播。广播和多播仅用于UDP,它们用于将报文同时传送给多个接收者。广播分为:受限广播、指向网络的广播、指向子网的广播、指向所有子网的广播。

举个栗子:当前IP为10.13.200.16/22,首先广播地址为255.255.255.255,子网广播地址为10.13.203.255。

《通过UDP广播实现Android局域网Peer Discovering》采用子网广播实现局域网Android设备的发现,但在实践中,一般路由器会禁止所有广播跨路由器传输。所以,如果子网内有多个路由器,那么就无法实现设备发现了。因此,本文将设备发现也改为多播实现。多播组地址包括为1110的最高4bit和多播组号,范围为224.0.0.0到239.255.255.255。能够接收发往一个特定多播组地址数据的主机集合称为主机组,主机组可以跨越多个网络。

IANA 把224.0.0.0 到 224.0.0.255 范围内的地址全部都保留给了路由协议和其他网络维护功能。该范围内的地址属于局部范畴,不论生存时间字段(TTL)值是多少,都不会被路由器转发;D类保留地址的完整的列表可以参见RFC1700。
224.0.1.0 到 238.255.255.255 地址范围作为用户组播地址,在全网范围内有效。其中233/8 为 GLOP 地址。GLOP 是一种自治系统之间的组播地址分配机制,将 AS 号直接填入组播地址的中间两个字节中,每个自治系统都可以得到 255 个组播地址;
239.0.0.0 到 239.255.255.255 地址范围为本地管理组播地址(administratively scoped addresses),仅在特定的本地范围内有效。

本文对比了子网广播和多播,子网广播地址为:192.168.137.255,多播组地址为:224.5.6.7。

发送接收采用同一MulticastSocket,MulticastSocket设置TTL,TTL表示跨网络的级数。

try {
    inetAddress = InetAddress.getByName(Constants.MULTI_BROADCAST_IP);
    multicastSocket = new MulticastSocket(Constants.MULTI_BROADCAST_PORT);
    multicastSocket.setLoopbackMode(true);
    multicastSocket.joinGroup(inetAddress);
    multicastSocket.setTimeToLive(4);
} catch (IOException e) {
    e.printStackTrace();
}

joinGroup涉及到另一个协议:网路群组管理协议(Internet Group Management Protocol或简写IGMP),通过抓包可以观察到初始化MulticastSocket时加入组协议的报文。

setTimeToLive用于设置生存时间字段。默认情况下,多播数据报的TTL设置为1,使得多播数据报仅限于在同一个子网内传送,更大的TTL值能够被多播路由器转发。在实际传输过程中,多播组地址仍然需要转换为以太网地址。实际转换规则这里不再赘述。

上述多播地址224.5.6.7转换后为01:00:5e:05:06:07。

代码层面上,探测线程将子网广播改为多播实现。

if (command != null) {
    byte[] data = command.getBytes();
    DatagramPacket datagramPacket = new DatagramPacket(            data, data.length, Multicast.getMulticast().getInetAddress(), Constants.MULTI_BROADCAST_PORT);
    try {        Multicast.getMulticast().getMulticastSocket().send(datagramPacket);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

并且在接收端区分指令和音频数据。

while (true) {    // 设置接收缓冲段
    byte[] receivedData = new byte[512];
    DatagramPacket datagramPacket = new DatagramPacket(receivedData, receivedData.length);    try {        // 接收数据报文
        Multicast.getMulticast().getMulticastSocket().receive(datagramPacket);
    } catch (IOException e) {
        e.printStackTrace();
    }    // 判断数据报文类型,并做相应处理
    if (datagramPacket.getLength() == Command.DISC_REQUEST.getBytes().length ||
            datagramPacket.getLength() == Command.DISC_LEAVE.getBytes().length ||
            datagramPacket.getLength() == Command.DISC_RESPONSE.getBytes().length) {
        handleCommandData(datagramPacket);
    } else {
        handleAudioData(datagramPacket);
    }
}

四、实现对讲进程与UI进程的通信(AIDL)

在实际工程应用场景中,需要对讲机进程即使切换到后台,也依然能收到信息。因此,为了提高进程的优先级,降低被系统回收的概率,采用了在Service中访问网络服务,处理语音信息的发送和接收的方案。前台Activity负责显示组播组内用户(上线和下线,更新页面),通过AIDL与Service进行跨进程通信和回调。Service的清单说明如下:

<service
    android:name=".service.IntercomService"
    android:process=":intercom" />

:intercom表示定义子进程intercom。

使用多进程相比于常见的单进程,有一些需要注意的点:

因此,通过process定义了多进程之后,一定要避免单进程模式下对象共享的思路。另外,在AS中调试多进程应用的时候,断点一定要针对不同的进程,以本文为例,添加断点需要选择主进程和intercom进程。给两个进程分别添加调试断点后,可以看到有两个Debugger:3156和3230(由于存在Jni代码,所以显示了Hybrid Debugger)。

1. 定义AIDL文件

由于既存在Activity到Service的通信,也存在Service接收到消息之后更新Activity页面的需求,所以这里采用了跨进程回调的方式。首先,AIDL方法如下:

package com.jd.wly.intercom.service;import com.jd.wly.intercom.service.IIntercomCallback;interface IIntercomService {    void startRecord();    void stopRecord();    void registerCallback(IIntercomCallback callback);    void unRegisterCallback(IIntercomCallback callback);
}
package com.jd.wly.intercom.service;interface IIntercomCallback {    void findNewUser(String ipAddress);    void removeUser(String ipAddress);
}

IIntercomService定义了Activity到Service的通信方法,包含启动和停止音频录制,以及注册和解除回调接口;IIntercomCallback定义了从Service到Activity的回调接口,用于在Service发现用户上线、下线时通知前台Activity的显示。

AIDL文件的定义涉及一些规范:比如变量在同一包内也需要import,非基本数据类型参数列表需要指明in、out,自定义参数类型需要同时编写java文件和aidl文件等,本文篇幅有限,就不具体展开AIDL跨进程通信的细节了。

2. 从Activity到Service的通信

Activity检测用户的按键操作,然后将事件传递给Service进行对应的逻辑处理。
将Service绑定到Activity首先需要定义ServiceConnection:

/**
 * onServiceConnected和onServiceDisconnected运行在UI线程中
 */private IIntercomService intercomService;private ServiceConnection serviceConnection = new ServiceConnection() {    @Override
    public void onServiceConnected(ComponentName name, IBinder service) {
        intercomService = IIntercomService.Stub.asInterface(service);        try {
            intercomService.registerCallback(intercomCallback);
        } catch (RemoteException e) {
            e.printStackTrace();
        }
    }    @Override
    public void onServiceDisconnected(ComponentName name) {
        intercomService = null;
    }
};

在onStart()时绑定Service,onStop()时解除回调和绑定。

@Overrideprotected void onStart() {    super.onStart();
    Intent intent = new Intent(AudioActivity.this, IntercomService.class);
    bindService(intent, serviceConnection, BIND_AUTO_CREATE);
}
@Overrideprotected void onStop() {    super.onStop();    if (intercomService != null && intercomService.asBinder().isBinderAlive()) {        try {
            intercomService.unRegisterCallback(intercomCallback);
        } catch (RemoteException e) {
            e.printStackTrace();
        }
        unbindService(serviceConnection);
    }
}

Activity获取了Service的服务后,分别在按键事件处理中进行调用。

@Overridepublic boolean onKeyDown(int keyCode, KeyEvent event) {    if ((keyCode == KeyEvent.KEYCODE_F2 ||
            keyCode == KeyEvent.KEYCODE_VOLUME_UP || keyCode == KeyEvent.KEYCODE_VOLUME_DOWN)) {        try {
            intercomService.startRecord();
        } catch (RemoteException e) {
            e.printStackTrace();
        }        return true;
    }    return super.onKeyDown(keyCode, event);
}@Overridepublic boolean onKeyUp(int keyCode, KeyEvent event) {    if ((keyCode == KeyEvent.KEYCODE_F2 ||
            keyCode == KeyEvent.KEYCODE_VOLUME_UP || keyCode == KeyEvent.KEYCODE_VOLUME_DOWN)) {        try {
            intercomService.stopRecord();
        } catch (RemoteException e) {
            e.printStackTrace();
        }        return true;
    }    return super.onKeyUp(keyCode, event);
}

startRecord和stopRecord的具体实现定义在Service中:

public IIntercomService.Stub mBinder = new IIntercomService.Stub() {    @Override
    public void startRecord() throws RemoteException {        if (!recorder.isRecording()) {
            recorder.setRecording(true);
            tracker.setPlaying(false);
            threadPool.execute(recorder);
        }
    }    @Override
    public void stopRecord() throws RemoteException {        if (recorder.isRecording()) {
            recorder.setRecording(false);
            tracker.setPlaying(true);
        }
    }    @Override
    public void registerCallback(IIntercomCallback callback) throws RemoteException {
        mCallbackList.register(callback);
    }    @Override
    public void unRegisterCallback(IIntercomCallback callback) throws RemoteException {
        mCallbackList.unregister(callback);
    }
};
3. 从Service到Activity的通信

Service通过RemoteCallbackList保持回调方法,使用时首先定义RemoteCallbackList对象,泛型类型为IIntercomCallback。

private RemoteCallbackList<IIntercomCallback> mCallbackList = new RemoteCallbackList<>();

RemoteCallbackList并不是List,内部通过Map来保存,Key和Value分别为IBinder和Callback。

ArrayMap<IBinder, Callback> mCallbacks = new ArrayMap<IBinder, Callback>();

使用RemoteCallbackList回调Activity方法时,通过beginBroadcast获取数量,

/**
 * 发现新的组播成员
 *
 * @param ipAddress IP地址
 */private void findNewUser(String ipAddress) {    final int size = mCallbackList.beginBroadcast();    for (int i = 0; i < size; i++) {
        IIntercomCallback callback = mCallbackList.getBroadcastItem(i);        if (callback != null) {            try {
                callback.findNewUser(ipAddress);
            } catch (RemoteException e) {
                e.printStackTrace();
            }
        }
    }
    mCallbackList.finishBroadcast();
}

removeUser(String ipAddress)方法与findNewUser(String ipAddress)方法类似。它们具体的实现在Activity中:

/**
 * 被调用的方法运行在Binder线程池中,不能更新UI
 */private IIntercomCallback intercomCallback = new IIntercomCallback.Stub() {    @Override
    public void findNewUser(String ipAddress) throws RemoteException {
        sendMsg2MainThread(ipAddress, FOUND_NEW_USER);
    }    @Override
    public void removeUser(String ipAddress) throws RemoteException {
        sendMsg2MainThread(ipAddress, REMOVE_USER);
    }
};

需要注意的是,IIntercomCallback中的回调方法实现并不在UI线程中执行,如果需要更新UI,需要实现多线程调用,多线程依然通过Handler来实现


推荐阅读:
  1. ICU探视对讲
  2. 视频对讲技术

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

消费者 android 对讲机

上一篇:source navigator NG 网址

下一篇:[Unity3D+算法]一小时做个2048

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》