移动开发 \ Android \ 解读Paho MQTT源码

解读Paho MQTT源码

总点击157
简介:这两天要重点突破一下MQTT的东西,找到了它的源码,解读一下,作为下一步优化的路标。

这两天要重点突破一下MQTT的东西, 找到了它的源码,解读一下,作为下一步优化的路标。

Paho是基于socket开做的,本质上还是维持一个长socket。


以TCP socket为例:(org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule)

发起连接

SocketAddress sockaddr = new InetSocketAddress(host,port);

socket = factory.createSocket();

socket.connect(sockaddr,conTimeout*1000);

//传出 接收消息

socket.getInputStream();

//传入 心跳、发布消息

socket.getOutputStream();

中断连接

if (socket != null) {

    socket.close();

}

这么简单的代码怎么搭起一个信息管理中心呢?


那怎么知道socket什么时候断呢?这时候就不得不提到一个接口:

public interface MqttCallback {

    public void connectionLost(Throwable cause);

    public void messageArrived(String topic,MqttMessage message) throws Exception;

    public void deliveryComplete(IMqttDeliveryToken token);

}

“connectionLost” 

好在connectionLost 只有一个入口:org.eclipse.paho.client.mqttv3.internal.CommsCallback.connectionLost

这个入口也只有一次调用:org.eclipse.paho.client.mqttv3.internal.CommsCallback.shutdownConnection


找了一下调用,发现就是在“每个消息接受/发出”时(即getInputStream、getOutputStream 读写处),检查Exception,有Exception就提示客户端中断连接。

由这样的设计可以知道,这个函数connectionLost 的最大误差在一个“心跳周期”。

(当然可以通过各种 “优化手段” 去优化,不过最恶劣的情况就是这样了。)


基于getInputStream、getOutputStream,Paho封装了两个方法:CommsReceiver 、CommsSender。很直观的名字,里面也很大方的开了两条线程,以CommsSender为例子:

线程在这里开启:(发现这个库很多地方都是这样开线程)

public void start(String threadName) {

    synchronized (lifecycle) {

        if (!running) {

            running = true;

            sendThread = new Thread(this,threadName);

            sendThread.start();

        }

    }

}

然后这样做:

out = new MqttOutputStream(clientState,OutputStream );


public void run() {

    while (running && (out != null)) {

        message = clientState.get();


        out.write(message);

        out.flush();

    }

}

简单的说 就是不断的从clientState里面拿信息,往socket里面送。其实也不是“不断”,在clientState.get()里面有锁,一个消息解一次锁,解开了就发一次,不然就在clientState.get死循环等 锁,那怎么等呢?

protected MqttWireMessage get() throws MqttException {

    synchronized (queueLock) {

        while (result == null) {

            queueLock.wait();


            result = (MqttWireMessage)pendingMessages.elementAt(0);

        }

    }

}

看到queueLock.wait();就是在这里 等,pendingMessages就是消息队列。要找什么时候发消息?就找什么时候这个锁被解开了。


写了这个多,其实也就想问一个简单的问题,怎么发起一次心跳?

最直接就找名字,找到了这个接口 org.eclipse.paho.client.mqttv3.MqttPingSender。

这个接口在库里面只看到一个实现org.eclipse.paho.client.mqttv3.TimerPingSender。

注意到里面有这样的代码:

public void schedule(long delayInMilliseconds) {

    timer.schedule(new PingTask(),delayInMilliseconds);       

}

private class PingTask extends TimerTask {

    public void run() {

        comms.checkForActivity();           

    }

}

checkForActivity进去,又包了一层,

public MqttToken checkForActivity(){

    MqttToken token = null;

    token = clientState.checkForActivity();

    return token;

}

再进去:

public MqttToken checkForActivity() throws MqttException {

    ...

    pingSender.schedule(nextPingTime);

    return token;

}

看到pingSender.schedule,这里定时,准备做下一次心跳。

这个是维持心跳的方法。


直到后面我看了Paho给的android的例子,才知道原来MqttPingSender这样做是为了易于扩展。

org.eclipse.paho.android.service.AlarmPingSender

@Override

public void schedule(long delayInMilliseconds) {

    long nextAlarmInMilliseconds = System.currentTimeMillis()

            + delayInMilliseconds;

    AlarmManager alarmManager = (AlarmManager) service

            .getSystemService(Service.ALARM_SERVICE);

    alarmManager.set(AlarmManager.RTC_WAKEUP,nextAlarmInMilliseconds,

            pendingIntent);

}

class AlarmReceiver extends BroadcastReceiver {

    @Override

    public void onReceive(Context context,Intent intent) {

        ...

    }

}

在android里面,休眠后Timer无效,只能用这种迂回的方式去定时,也是因为有这种迂回的方式,Paho库才把发送Ping的方法收得这么深。

也是因为这么深,很多之前用IBM的MQTT库的人,比如我,才不得不去看看具体的代码怎么跑。其实上面还有一个问题,就是AlarmManager定时在小米系统上无效,被定义为300S对齐唤醒,代码中貌似没有看到对“这种行为”的处理。

有一个小问题,不知道是不是设计上的遗漏,org.eclipse.paho.client.mqttv3.internal.ClientComms,ClientComms是internal的方法,居然可以在外面使用。


回到问题原点,怎么发起一次心跳?在上面的分析中,看到checkForActivity好像有点痕迹,checkForActivity的注释里面也有写明。(Check and send a ping if needed and check for ping timeout)简化那一堆代码,简单的就是这样做:


public MqttToken checkForActivity() throws MqttException {

   

    pendingFlows.insertElementAt(pingCommand,0);

    notifyQueueLock(); //Wake sender thread since it may be in wait state (in ClientState.get()) 这里解锁


    return token;

}


心跳包怎么做呢?

//org.eclipse.paho.client.mqttv3.internal.CommsSender

public void run() {

    MqttWireMessage message = clientState.get();

    out.write(message);

    out.flush();

}

//MqttOutputStream

public void write(MqttWireMessage message) throws IOException,MqttException {

    final String methodName = "write";

    byte[] bytes = message.getHeader();

    byte[] pl = message.getPayload();

    out.write(bytes,bytes.length);

    clientState.notifySentBytes(bytes.length);


    int offset = 0;

    int chunckSize = 1024;

    while (offset < pl.length) {

        int length = Math.min(chunckSize,pl.length - offset);

        out.write(pl,offset,length);

        offset += chunckSize;

        clientState.notifySentBytes(length);

    }       

}

//MqttWireMessage

public byte[] getHeader() throws MqttException {

    try {

        int first = ((getType() & 0x0f) << 4) ^ (getMessageInfo() & 0x0f);

        byte[] varHeader = getVariableHeader();

        int remLen = varHeader.length + getPayload().length;


        ByteArrayOutputStream baos = new ByteArrayOutputStream();

        DataOutputStream dos = new DataOutputStream(baos);

        dos.writeByte(first);

        dos.write(encodeMBI(remLen));

        dos.write(varHeader);

        dos.flush();

        return baos.toByteArray();

    } catch(IOException ioe) {

        throw new MqttException(ioe);

    }

}

//MqttPingReq

public class MqttPingReq extends MqttWireMessage {

    protected byte[] getVariableHeader() throws MqttException {

        return new byte[0];

    }

}

皮很厚,MqttWireMessage getHeader中,一共有两个字节(不知道有没有数错...)

第一部分有Type和Message组成,ping的type是12,信息长度是0,记作first(头部),两个字节,回头看看协议:

bit

7

6

5

4

3

2

1

byte 1

Message Type

DUP flag

QoS level

RETAIN

byte 2

Remaining Length

...http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html


怎么收到消息?

public void run() {

    final String methodName = "run";

    MqttToken token = null;


    while (running && (in != null)) {

        try {

            receiving = in.available() > 0;

            MqttWireMessage message = in.readMqttWireMessage();

            receiving = false;


            if (message instanceof MqttAck) {

                token = tokenStore.getToken(message);

                if (token!=null) {

                    synchronized (token) {

                        clientState.notifyReceivedAck((MqttAck)message);

                    }

                } else {

                    throw new MqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR);

                }

            } else {

                clientState.notifyReceivedMsg(message);

            }

        }finally {

            receiving = false;

        }

    }

}

意思大概就是不断的循环读InputStream的数据流。这里有一个问题,休眠了,整个android世界是停止工作了。线程什么的都会被挂起。

问题是“都被挂起了,循环读还有作用吗?”记得之前有一篇文章:http://my.oschina.net/u/1999248/blog/591440

摘要就是:通讯协议栈运行于BP,一旦收到数据包,BP会将AP唤醒,唤醒的时间足够AP执行代码完成对收到的数据包的处理过程。

就是有数据包来的时候,BP会唤醒CPU,CPU起来干活之后就接着循环读,就读到推送送过来的新鲜的消息了。BP耗电低于AP的1/10,类似收到短信、电话都是由BP进行监控。


读了一轮,有几点收获:

1、重新认识了socket。

2、对MQTT PING的认知多了一点。

3、维持一条线程的开关,以及Paho的封装方式都挺有意思的。

4、通过源码,理解的MQTT的部分性能底线。例如,以前觉得MQTT lostconnect应该是准的,看完代码才知道原来还有一个心跳周期的误差,不是设计没做好,而是“设计+实际情况”使然。


参考实现:(官网)

git clone https://git.eclipse.org/r/paho/org.eclipse.paho.apps


转载请保留本文地址-http://blog.csdn.net/yeshennet/article/details/50708115


意见反馈 常见问题 官方微信 返回顶部