注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

沙漠里de烟雨

原创分享,禁止转载

 
 
 

日志

 
 

Qt 5.x 自已开发的消息中心网络通信模块,类似交换机功能  

2017-01-08 13:47:24|  分类: QT5.x与QML |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |

很久没有用这个博客了,之前本来想在csdn里写,无奈管理员莫名其妙的关了我的博客,比较了其它的一些博客,不是太杂乱就是感觉太复杂,还是网易来得简单些。只是可惜不能传资源,只能尽量贴代码,如果代码量较多,则提供csdn的下载路径。

如题,贴代码前,把设计文档贴上,这样,看起代码就没有这么困难了。

Qt 5.x 自已开发的消息中心网络通信模块,类似交换机功能 - 漠雨 - 沙漠里de烟雨__风尘无名

 

下面就来详细设计一下消息交换中心。为了以后方便加入新的功能模块,以及能够支持深度二次开发,有必要将它设计成既能与程序的模块间进行通信,也能与新添的独立的新程序进行通信。

1)数据包结构如下所示,其中包头8个字节,加上最后一个结束符1个字节,中间的数据部分可任意长度,不过为了安全起见,建议不要超过1000个字节,即x<=1000。结构这样排列的用意是根据数据被访问的频率和先后顺序来排序的,这里只是一个小小的优化。

Qt 5.x 自已开发的消息中心网络通信模块,类似交换机功能 - 漠雨 - 沙漠里de烟雨__风尘无名

 

因为这些结构必须要有,所以为减少传输的数据量,包头尽量压减,前四个均以一个字符来区分,其实1个字节,它可以有255种情况,除去特定信号的情况,光可显示的字符都有90多个(可显,方便测试),所以对于软件内部使用的话,完全够用,因为不会有这么多的模块需要相互通信,也不会有这么多种内部指令需要处理。后面添加一个数据包结束符,是为了验证这个包是否完整,对于粘包情况,很容易做区分。这里没有采用MD5做校验,原因是体系内部通信,不会有人恶意修改数据,所以没必要做MD5校验,对于不必要的负担,能省则省。消息交换是个体力活,应该按简设计,以求高效。第一个字节为消息标识,是为了区分此消息是否是合法消息,即是否是模块或用户发送过来的消息。不合法的消息,直接丢弃。因为有数据包结束符的检测,所以不用担心因消息被分段收取而错把正常消息误弃。

 

2)工作原理

这里,将消息交换中心简称为交换机,以下同。

首先,当新用户程序(或新模块,这里统称客户)首次接入交换机时,先跟交换机预约一个身份ID,然后交换机会查看登记列表,分配一个身份ID给此客户(不一定是预约的ID),并将此ID发送给此客户,客户得到此ID并保存,以便在今后的数据封装时带上自己的ID。此时,客户便可以与其它客户进行通信。下面列一下ID分布,如下:

ID ==0  交换机ID (==>0,代表原始,基础之意)

ID ==A-Z 模块ID (==>大写字母)

ID == a-z 外来程序ID (==>小写字母)

现在,来设计一下消息通信:

如客户A向客户B发送消息,包装数据时,分别填入A自己的ID和对方BID,并将数据及其数据长度填入数据包中,打上结束符标记,发送到交换机,交换机接收数据包,检测第一个字节,看是否是系统内指定的标识符,如果不是,丢弃数据。并返回给A约定标识不匹配(暗号对不上)。如果是,检测第二个字节,查看内部指令,如果是广播指令,检查最后一个结束符,如果不是,可能没有接收完,待接收完数据,检查出结束符,将整个数据包原封不动的发送到除客户A除外的客户(通过遍历客户列表),如果是单向通信,则再看第三个字节,根据接收者ID,将数据发送给相应的B客户。当客户B收到此消息后,剥离出真实数据,至于A想要求B响应什么指令,那是AB之间的协议,千言万语都在此数据包的Data中,对交换机而言是透明的,交换机只作转发之事(邮递员大叔才没时间拆看你俩的书信呢,他只负责把信件带到)。至于AB这些客户,可对存取感兴趣的对方的ID,然后是否可达,这些客户并不清楚,也不能保证,所以这些事就交给交换机了(就像邮寄,客户并不知道其它客户是否健在,门牌号是否更换,是否换了住址,只要按预先知道的地址把信件发出去即可,至于可不可达,那是交换机要关心的事,交换机中的客户列表中如果存在,则发送,如果不存在,则退件,即通知发送方目的不可达,将数据包原路退回),一旦有ID退出,便将此ID从客户列表中删除,下次再有对此ID的通信,都会被退回(当然数据不会再发回,没此必要,只要通知发送者,此ID的客户已经不在线了,让它下次不要再给此ID发送任何数据),这里,目的不可达而退件,不是把数据包丢弃,而是原封不动的将信件退回,只是修改了内部指令的代码(打了一个不可达的邮戳)。这里这样设计是有所考虑的:如果客户A同时给两个客户BC发了数据两份不同的,B的到不了,如果交换机将不可达的数据丢弃,只返回给客户说此数据不可达,已丢弃,由于A客户可能给B客户发了很多条信息,它不清楚到底是哪条没发送成功。当然直接丢弃也是可以的,只是在客户A处需要维护一个发送历史记录,且数据结构中要加入一个数据编号,这势必增加了系统的复杂度。所以直接将信件打回方是上策。

下面列出设计出的相关字段(均是Acsii码字符):

消息标识符:@ (==>代表发消息,发信件之意)

结束标志符:. (==>代表一句话讲完之意)

发送、接收客户ID:交换机ID,模块ID以及外部程序ID。上面已经列出,不累述。

 

内部指令(交换机需要读懂的指令)

-->通知:a (==>代表颁发(award) ID)

-->广播:* (==>代表客户要向全网广播,所有其他客户都会收到)

-->定向单播:> (==>代表客户指定要向某个客户私信)

-->目的不可达,信件退回:x (==>代表交换机告知客户,目的不可达,并退件)

-->非法消息,未知消息标识符:?(==>代表交换机告知客户,为非法消息,并退件)

[注:因为返回此通告时首字节为正确的标识符,所以可以拿到合法的消息标识符]

-->客户已达最大连接数:m  (==> 代表max之意,内部模块26,外部程序26)

 

客户询问-->请求告知目的客户是否可达:[ (==>代表客户向交换机询问单个客户可达性)

交换机回复-->回复客户目的客户是否可达:] (==>代表交换机回复请求)

客户询问-->请求告知所有在线的客户列表:{ (==>代表客户向交换机请求客户列表)

交换机回复-->回复客户所有在线的客户列表:} (==>代表交换机回复请求)

 

依上不同情况,列出下列的几种数据结构:

1)客户连接上时发送预约ID;

 

Qt 5.x 自已开发的消息中心网络通信模块,类似交换机功能 - 漠雨 - 沙漠里de烟雨__风尘无名

 

2)客户颁发ID(不一定是预约的ID)

 

Qt 5.x 自已开发的消息中心网络通信模块,类似交换机功能 - 漠雨 - 沙漠里de烟雨__风尘无名

 

3)定向单播:

 

Qt 5.x 自已开发的消息中心网络通信模块,类似交换机功能 - 漠雨 - 沙漠里de烟雨__风尘无名

 

4)广播:

 

Qt 5.x 自已开发的消息中心网络通信模块,类似交换机功能 - 漠雨 - 沙漠里de烟雨__风尘无名

 

 

5)退件之目的不可达(单播时)

 

Qt 5.x 自已开发的消息中心网络通信模块,类似交换机功能 - 漠雨 - 沙漠里de烟雨__风尘无名

 

6)退件之消息非法、未知消息标识符(接收者可能无ID,也可能有ID,均以?号替代):

 

Qt 5.x 自已开发的消息中心网络通信模块,类似交换机功能 - 漠雨 - 沙漠里de烟雨__风尘无名

 

7)交换机已达到最大的客户连接数,通知后断开与此新客户的连接(接收者无ID,?号替代):

 

Qt 5.x 自已开发的消息中心网络通信模块,类似交换机功能 - 漠雨 - 沙漠里de烟雨__风尘无名

 

8)请求告知目的是否可达:

Qt 5.x 自已开发的消息中心网络通信模块,类似交换机功能 - 漠雨 - 沙漠里de烟雨__风尘无名

 

9)回复客户目的地是否可达:

可达的情况(接收者ID不会被修改,说明可达)

Qt 5.x 自已开发的消息中心网络通信模块,类似交换机功能 - 漠雨 - 沙漠里de烟雨__风尘无名

 

不可达的情况(接收者ID被修改成交换机的ID后发回)

 

Qt 5.x 自已开发的消息中心网络通信模块,类似交换机功能 - 漠雨 - 沙漠里de烟雨__风尘无名

 

10)请求目的可达的所有其它用户的用户列表:

 

Qt 5.x 自已开发的消息中心网络通信模块,类似交换机功能 - 漠雨 - 沙漠里de烟雨__风尘无名

 

11)回复客户所要的所有在线的客户列表

没有其它客户在线时的情况(没有Data数据,根据第五个字节即可判断)

Qt 5.x 自已开发的消息中心网络通信模块,类似交换机功能 - 漠雨 - 沙漠里de烟雨__风尘无名

 

有其它客户在线时的情况(数据Data中是交换机填入的数据,为所有的在线、目的可达的、除发送者之外的客户列表)

 

Qt 5.x 自已开发的消息中心网络通信模块,类似交换机功能 - 漠雨 - 沙漠里de烟雨__风尘无名

 

这里,一直没有对数据Data部分进行说明,并非避而不谈,只是因为它有些故事需要单独拿出来讲,所以在此节只字未提,以清视听。

当然,还可以再添加些内部指令,然交换机知道的无非也就那么点事,客户想知道的都可以从交换机中获得,所以也没什么可以添加了,故而不必再多事。

其实数据Data可以是客户间订立的协议,也可以是一些其它纯粹的数据,如果仅是协议的话,只须双方遵从约定,沟通便可以顺行(什么是协议呢?如上所示的数据包结构,内部指令等,都属于协议的一部分,只是,它是客户与交换机之间的协议,而客户与客户之间的协议,自然是隐藏在这份协议的数据Data之内,双方根据业务的需求来制定。这好比间谍密语,一封看似普通的信,也许内藏机密,只有知道密码方可破译,而这份密码,便是这里所说的协议)。但如果是一些业务数据,因为可能它会牵涉到数据表现或是数据处理,而且这些业务数据的量有时比较庞大,连续不断,就会用到一些数据结构,所以需要把这些数据结构转换成字节块,方可放在此数据Data块中。所以就有了序列化以及之后的反序列化之说。关于序列化与反序列化,这里就不再谈及了。

 

此设计已经经过测试,也已经应用到我的一些项目中,所以,如果您的应用场合不是要求高并发的话,这是一个不错的架构。 

代码如下,有四个文件,用法这里就不贴了,public和signals都是可以调用的,所以接口也是一目了然。

//MessageClient .h

#ifndef MESSAGECLIENT_H
#define MESSAGECLIENT_H

#include <QObject>
#include <QTcpSocket>

class MessageClient : public QObject
{
    Q_OBJECT
public:
    MessageClient(char id,uint port,const QString& server,QObject* parent=0);//id为客户预约的Id,真实拿到的ID不一定是预约的ID,这个得注意了;
    ~MessageClient();
signals://发送给其它的对象,比如UI;
    void    RecvDataSignal(char senderId,QByteArray data);
    void    RecvBroadcastSignal(char senderId,QByteArray data);
    void    UnreachableMessageSignal(char destId,QByteArray data);
    void    InvaildMessageSignal();
    void    MaxConnectedSignal();
    void    RspDestClientStateSignal(char destId,bool online);
    void    RspClientListSignal(QByteArray data);

public slots: //因为防止界面的循环造成socket的发送与接收阻塞,应单独放入一线程,并通过信号来与这些槽函数相连,以达到控制的目的;
    void    StartSlot();
    void    StopSlot();
    void    BroadcastSlot(QByteArray data);
    void    UnicastSlot(char destClient,QByteArray data);
    void    ReqDestClientStateSlot(char destClient);
    void    ReqClientListSlot();
    void    CloseClientSlot();
private slots:
    void    ReadyReadSlot();
    void    GetErrorMsgSlot(QAbstractSocket::SocketError err);
    void    StateChangedSlot(QAbstractSocket::SocketState state);
public:
    void    Start(); //启动连接;
    void    Stop(); //断开与服务器的连接;
    char    GetSelfClientID(); //获取自身的客户ID;
public:
    void    Broadcast(QByteArray data); //广播(广播发送);
    void    Unicast(char destClient,QByteArray data); //单播(定向发送);
    void    ReqDestClientState(char destClient); //请求目的客户在线状态;
    void    ReqClientList(); //请求在线客户列表;
public://virtual;可以重载;
    virtual void RecvData(char senderId,QByteArray data); //获取到的普通数据;
    virtual void RecvBroadcast(char senderId,QByteArray data); //获取到广播数据;
    virtual void UnreachableMessage(char destId,QByteArray data); //获取到不可达的退件数据;
    virtual void InvaildMessage();//获取到的从交换机定义的非法消息的回复;
    virtual void MaxConnected(); //交换机已达最大连接数时的回复;
    virtual void RspDestClientState(char destId,bool online); //响应目的客户在线状态的请求;
    virtual void RspClientList(QByteArray destClients); //响应在线客户列表的请求;

private:
    void        SendMsg(QByteArray bytes); //发送消息;
    uint        GetUint(QByteArray bytes); //根据字节内容转换成数据长度;
    QByteArray  GetByteArray(uint num); //根据数据长度转换成字节内容;
private:
    uint        m_Port; //端口号;
    QString     m_Server; //服务器地址;
    char        m_ClientID; //本客户ID;
    QByteArray  m_RecvSurplus; //剩余数据段;
private:
    QTcpSocket* m_TcpSocket;
};

#endif // MESSAGECLIENT_H


//MessageClient .cpp

#include "MessageClient.h"

#include <QByteArray>
#include <QMetaType>
#include <QDebug>
MessageClient::MessageClient(char id,uint port,const QString& server,QObject* parent) : QObject(parent)
{
    m_ClientID = '\0';
    if( (id>='A'&&id<='Z') || (id>='a'&&id<='z'))
    {
        m_Port = port;
        m_Server = server;
        m_ClientID = id;
        m_TcpSocket = new QTcpSocket(this);
        m_TcpSocket->setSocketOption(QAbstractSocket::KeepAliveOption,1);//立即发送;

        //注册特殊类型;
        qRegisterMetaType<QAbstractSocket::SocketState>("QAbstractSocket::SocketState");
        qRegisterMetaType<QAbstractSocket::SocketError>("QAbstractSocket::SocketError");

        connect(m_TcpSocket,SIGNAL(stateChanged(QAbstractSocket::SocketState)),this,SLOT(StateChangedSlot(QAbstractSocket::SocketState)));
        connect(m_TcpSocket,SIGNAL(readyRead()),this,SLOT(ReadyReadSlot()));
        connect(m_TcpSocket,SIGNAL(error(QAbstractSocket::SocketError)),this,SLOT(GetErrorMsgSlot(QAbstractSocket::SocketError)));
    }
}

MessageClient::~MessageClient()
{
    if(m_TcpSocket->isValid())
    {
        m_TcpSocket->close();
        m_TcpSocket->deleteLater();
    }
  //  qDebug() << "Client: " << m_ClientID << " disconnect!" << endl;
}

void MessageClient::StartSlot()
{
    this->Start();
}

void MessageClient::StopSlot()
{
    this->Stop();
}

void MessageClient::BroadcastSlot(QByteArray data)
{
    this->Broadcast(data);
}

void MessageClient::UnicastSlot(char destClient,QByteArray data)
{
    this->Unicast(destClient,data);
}

void MessageClient::ReqDestClientStateSlot(char destClient)
{
    this->ReqDestClientState(destClient);
}

void MessageClient::ReqClientListSlot()
{
    this->ReqClientList();
}

void MessageClient::CloseClientSlot()
{
    if(m_TcpSocket->isValid())
    {
     //   m_TcpSocket->close();
        m_TcpSocket->abort();
        m_TcpSocket->deleteLater();
    }
    this->destroyed(this);
}

void MessageClient::ReadyReadSlot()
{
    QByteArray bytes = m_TcpSocket->readAll();

    //未收到数据,直接返回;
    if(bytes.length()<=0)
        return ;

    //将上次剩余的数据拼接在最前面;
    if(m_RecvSurplus.length()>0)
    {
        bytes.push_front(m_RecvSurplus);
    }

    while(bytes.length()>=5) //有意义的数据包至少5个字节;
    {
        //收到的消息为非法消息,直接丢弃(返回);
        if(bytes[0]!='@') //首字符必须为'@',否则被视为非法消息;
        {
            m_RecvSurplus.clear();
            return ;
        }

        char cmd = bytes[1]; //内部指令;

        //前五个字节中包含一条有意义的指令;
        if(bytes[4]=='.')
        {
            if(cmd == 'a') //交换机给此客户颁发的通信ID;
            {
                m_ClientID = bytes[2];
            //    qDebug() << "Client: " << m_ClientID << " connected!" << endl;
            }
            else if(cmd == '?') //此客户机因发送了非法消息而被交换机通告
            {
                this->InvaildMessage();
            }
            else if(cmd == 'm') //交换机因达到最大连接数而通知此客户已达最大连接数,此客户收到此数据后便会知晓连接被断开的原因;
            {
                this->MaxConnected();
            }
            else if(cmd == ']') //获得询问的目的客户是否可达的状态信息;
            {
                this->RspDestClientState(bytes[2],bytes[2]!='0');
            }
            else if(cmd == '}') //获得询问所有其它在线客户的客户列表之无其它客户在线时;
            {
                this->RspClientList(QByteArray());
            }

            //剩余的字节;
            bytes = bytes.right(bytes.length()-5);
        }
        else //说明是第二种数据结构的情况;
        {
            if(bytes.length() <= 9) //说明数据不全,需要下次接收时再一起拼接,跳出循环;
                break;

            uint data_len = GetUint(bytes.mid(4,4)); //数据长度;

            if(data_len==0 || data_len>1000)  //无数据或数据超长,残忍丢弃;
            {
              //  qDebug() << "data_len==0 or data_len>1000, droped!" << endl;
                m_RecvSurplus.clear();
                return ;
            }

            if((uint)bytes.length() < 9+data_len) //说明数据不全,需要下次接收时再一起拼接,跳出循环;
                break;

            if(bytes[8+data_len] != '.') //说明前9+data_len个字节不是一条有意义的指令,因为无结束符,无法分割,残忍丢弃;
            {
              //  qDebug() << "find not the terminator, droped!" << endl;
                m_RecvSurplus.clear();
                return ;
            }

            //前9+data_len个字节包含一条有意义的指令;
            if(cmd == '>') //单播数据;
            {
                this->RecvData(bytes[3],bytes.mid(8,data_len));
            }
            else if(cmd == '*') //广播数据;
            {
                this->RecvBroadcast(bytes[3],bytes.mid(8,data_len));
            }
            else if(cmd == 'x') //退件之目的不可达;
            {
                this->UnreachableMessage(bytes[2],bytes.mid(8,data_len));
            }
            else if(cmd == '}') //获得询问所有其它在线客户的客户列表之存在其它在线客户时;
            {
                this->RspClientList(bytes.mid(8,data_len));
            }

            //剩余的字节;
            bytes = bytes.right(bytes.length()-data_len-9);
        }
    }

    //先清空剩余字节(因为前面已经将它拼接);
    m_RecvSurplus.clear();

    //如果还剩下有未处理的字节,将之存放在m_RecvSurplus变量中;
    if(bytes.length()>0)
    {
        m_RecvSurplus.push_back(bytes);
    }
}

void MessageClient::GetErrorMsgSlot(QAbstractSocket::SocketError err)
{
  //  qDebug() << m_TcpSocket->errorString() << "," << err << endl;
}

void MessageClient::StateChangedSlot(QAbstractSocket::SocketState state)
{
    if(QAbstractSocket::ConnectedState == state)
    {
        //预约用户ID;
        if(m_ClientID != '\0')
        {
            QByteArray bytes;
            bytes.append("@a0");
            bytes.append(m_ClientID);
            bytes.append('.');
            this->SendMsg(bytes);
      //      qDebug() << m_ClientID << endl;
        }
    }
    else if(QAbstractSocket::UnconnectedState == state)
    {
      //  qDebug() << "disconnected..." << endl;
    }
}

void MessageClient::Start()
{
    if(m_TcpSocket && (m_TcpSocket->state()==QAbstractSocket::UnconnectedState))
    {
        m_RecvSurplus.clear();
        m_TcpSocket->connectToHost(m_Server,m_Port);
        bool ok = m_TcpSocket->waitForConnected();
     //   qDebug() << (ok?"connected ok!":"connected failed!") << endl;
    }
}

void MessageClient::Stop()
{
    m_TcpSocket->disconnectFromHost();
    bool ok = m_TcpSocket->waitForDisconnected();
  //  qDebug() << (ok?"disconnected ok!":"disconnected failed!") << endl;
}

char MessageClient::GetSelfClientID()
{
    return m_ClientID;
}

void MessageClient::Broadcast(QByteArray data)
{
    uint len = data.length();
    if( (m_ClientID !='\0') && (len>0 && len<1010) )
    {
        QByteArray bytes;
        bytes.append("@*0");
        bytes.append(m_ClientID);
        bytes.append(GetByteArray(len));
        bytes.append(data);
        bytes.append('.');
        this->SendMsg(bytes);
    }
}

void MessageClient::Unicast(char destClient,QByteArray data)
{
    uint len = data.length();
    if( (m_ClientID !='\0') && (len>0 && len<1010) &&
        ((destClient>='A'&&destClient<='Z')||(destClient>='a'&&destClient<='z')) )
    {
        QByteArray bytes;
        bytes.append("@>");
        bytes.append(destClient);
        bytes.append(m_ClientID);
        bytes.append(GetByteArray(len));
        bytes.append(data);
        bytes.append('.');
        this->SendMsg(bytes);
    }
}

void MessageClient::ReqDestClientState(char destClient)
{
    if( (m_ClientID !='\0') &&
        ((destClient>='A'&&destClient<='Z') || (destClient>='a'&&destClient<='z')) )
    {
        QByteArray bytes;
        bytes.append("@[");
        bytes.append(destClient);
        bytes.append(m_ClientID);
        bytes.append('.');
        this->SendMsg(bytes);
    }
}

void MessageClient::ReqClientList()
{
    if(m_ClientID !='\0')
    {
        QByteArray bytes;
        bytes.append("@{0");
        bytes.append(m_ClientID);
        bytes.append('.');
        this->SendMsg(bytes);
    }
}

void MessageClient::RecvData(char senderId,QByteArray data)
{
    emit RecvDataSignal(senderId,data);
}

void MessageClient::RecvBroadcast(char senderId, QByteArray data)
{
    emit RecvBroadcastSignal(senderId,data);
}

void MessageClient::UnreachableMessage(char destId,QByteArray data)
{
    emit UnreachableMessageSignal(destId,data);
}

void MessageClient::InvaildMessage()
{
    emit InvaildMessageSignal();
}

void MessageClient::MaxConnected()
{
    emit MaxConnectedSignal();
}

void MessageClient::RspDestClientState(char destId,bool online)
{
    emit RspDestClientStateSignal(destId,online);
}

void MessageClient::RspClientList(QByteArray destClients)
{
    emit RspClientListSignal(destClients);
}

void MessageClient::SendMsg(QByteArray bytes)
{
    uint size = m_TcpSocket->write(bytes);
    m_TcpSocket->flush();
    while(size<(uint)bytes.length())
    {
        size += m_TcpSocket->write(bytes.right(bytes.length()-size));
        m_TcpSocket->flush();
    }
}

uint MessageClient::GetUint(QByteArray bytes)
{
    uint sum = 0;
    if(bytes.length()==4)
    {
        sum = (bytes[0]-48)*1000 +
                (bytes[1]-48)*100 +
                (bytes[2]-48)*10 +
                (bytes[3]-48);
    }
    return sum;
}

QByteArray MessageClient::GetByteArray(uint num)
{
    QByteArray bytes;
    if(num>0 && num<1024)
    {
        if(num<10)
        {
            char n = num+48;
            bytes.append("000");
            bytes.append(n);
        }
        else if(num<100)
        {
            char n = num/10+48;
            bytes.append("00");
            bytes.append(n);
            n = num%10+48;
            bytes.append(n);
        }
        else if(num<1000)
        {
            char n = num/100+48;
            bytes.append('0');
            bytes.append(n);
            n = num%100/10+48;
            bytes.append(n);
            n = num%100%10+48;
            bytes.append(n);
        }
        else
        {
            char n = num/1000+48;
            bytes.append(n);
            n = num%1000/100+48;
            bytes.append(n);
            n = num%1000%100/10+48;
            bytes.append(n);
            n = num%1000%100%10+48;
            bytes.append(n);
        }
    }
    return bytes;
}

 

//MessageCenter.h

#ifndef MESSAGECENTER_H
#define MESSAGECENTER_H


#include <QObject>
#include <QTcpSocket>
#include <QTcpServer>

class MessageCenter : public QObject
{
    Q_OBJECT
public:
    MessageCenter(QObject *parent=0);
private slots:
    void    NewConnectionSlot();
    void    ReadyReadSlot();
    void    StateChangedSlot(QAbstractSocket::SocketState state);
public:
    void    Start();
private:
    void        FirstConn(char senderId,QTcpSocket* newSocket);
    void        Broadcast(char senderId,QByteArray bytes);
    void        SendMsg(QTcpSocket* socket, QByteArray bytes);
    char        DistributeId(char senderId,const QString& address);
    uint        GetUint(QByteArray bytes);
    QByteArray  GetBytesOfDataLen(uint num);
    QByteArray  GetBytesOfOnlineClientId(char senderId);
private:    //配置参数;
    uint       m_MaxMsgLen;
private:
    QTcpServer*             m_TcpServer;
    QMap<char,QTcpSocket*>  m_ClientMap;
    QMap<int,QByteArray>    m_SurplusDataMap;
};

#endif // MESSAGECENTER_H

 

//MessageCenter.cpp

#include "MessageCenter.h"

MessageCenter::MessageCenter(QObject *parent) : QObject(parent)
{
    m_TcpServer = new QTcpServer(this);
}

void MessageCenter::NewConnectionSlot()
{
    qDebug() << "new connection..." << endl;
    QTcpSocket* newSocket = m_TcpServer->nextPendingConnection();
    newSocket->setReadBufferSize(2000);
    newSocket->setSocketOption(QAbstractSocket::LowDelayOption,1);
    connect(newSocket,SIGNAL(readyRead()),this,SLOT(ReadyReadSlot()));
    connect(newSocket,SIGNAL(stateChanged(QAbstractSocket::SocketState)),this,SLOT(StateChangedSlot(QAbstractSocket::SocketState)));
}

void MessageCenter::ReadyReadSlot()
{
    QTcpSocket* socket = qobject_cast<QTcpSocket*>(sender());
    if(!socket)
        return ;

    QByteArray bytes = socket->readAll();

    if(m_SurplusDataMap.keys().contains(int(socket)))
    {
        //未收到数据,直接返回;
        if(bytes.length()<=0)
            return ;

        QByteArray& surplusData = m_SurplusDataMap[int(socket)]; //注意是引用;

        //将上次剩余的数据拼接在最前面;
        if(surplusData.length()>0)
        {
            bytes.push_front(surplusData);
        }

        while(bytes.length()>=5) //有意义的数据包至少5个字节;
        {
            //收到的消息为非法消息,通告给当有客户此消息为非法消息;
            if(bytes[0]!='@') //首字符必须为'@',否则被视为非法消息;
            {
                QByteArray bytes;
                bytes.append("@??0.");
                this->SendMsg(socket,bytes);
                surplusData.clear();
                return ;
            }

            char cmd = bytes[1]; //内部指令;

            //前五个字节中包含一条有意义的指令;
            if(bytes[4]=='.')
            {
                if(cmd == '[') //客户请求告知目的客户是否可达;
                {
                    bytes[1] = ']';
                    if(!m_ClientMap.keys().contains(bytes[2]))
                        bytes[2] = '0'; //被修改为'0',说明不可达;
                    this->SendMsg(socket,bytes.left(5));    //回复是否可达;
                }
                else if(cmd == '{') //客户机请求的所有其它在线的客户的列表;
                {
                    bytes[1] = '}';
                    QByteArray idList = GetBytesOfOnlineClientId(bytes[3]);
                    if(idList.length() <= 0)
                    {
                        this->SendMsg(socket,bytes.left(5)); //回复无其它任何客户,此时数据包不作任何修改直接打回;
                    }
                    else
                    {
                        QByteArray tmp = bytes.left(4);
                        tmp.append(GetBytesOfDataLen(idList.length()));
                        tmp.append(idList);
                        tmp.append('.');
                        this->SendMsg(socket,tmp); //回复其它在线的客户列表;
                    }
                }

                //剩余的字节;
                bytes = bytes.right(bytes.length()-5);
            }
            else //说明是第二种数据结构的情况;
            {
                if(bytes.length() <= 9) //说明数据不全,需要下次接收时再一起拼接,跳出循环;
                    break;

                uint data_len = GetUint(bytes.mid(4,4)); //数据长度;

                if(data_len==0 || data_len>1000)  //无数据或数据超长,残忍丢弃;
                {
            //        qDebug() << "data_len==0 or data_len>1000, droped!" << endl;
                    surplusData.clear();
                    return ;
                }

                if((uint)bytes.length() < 9+data_len) //说明数据不全,需要下次接收时再一起拼接,跳出循环;
                    break;

                if(bytes[8+data_len] != '.') //说明前9+data_len个字节不是一条有意义的指令,因为无结束符,无法分割,残忍丢弃;
                {
             //       qDebug() << "find not the terminator, droped!" << endl;
                    surplusData.clear();
                    return ;
                }

                //前9+data_len个字节包含一条有意义的指令;
                if(cmd == '>') //单播数据;
                {
                    if(m_ClientMap.keys().contains(bytes[2])) //接收者在线,转发;
                    {
                        this->SendMsg(m_ClientMap[bytes[2]],bytes.left(9+data_len));
                    }
                    else //目标不可达,修改内部指令,然后退件;
                    {
                        bytes[1]='x';
                        this->SendMsg(socket,bytes.left(9+data_len)); //回复数据不可达;
                    }
                }
                else if(cmd == '*') //广播数据;
                {
                    this->Broadcast(bytes[3],bytes.left(9+data_len)); //将广播类消息广播出去;
                }

                //剩余的字节;
                bytes = bytes.right(bytes.length()-data_len-9);
            }
        }

        //先清空剩余字节(因为前面已经将它拼接);
        surplusData.clear();

        //如果还剩下有未处理的字节,将之存放在surplusData引用变量中;
        if(bytes.length()>0)
        {
            surplusData.push_back(bytes);
        }
    }
    else //第一次连接;
    {
        if(bytes.length()==5 && bytes[4]=='.' && bytes.left(3)=="@a0")
            this->FirstConn(bytes[3],socket);
    }

}

void MessageCenter::StateChangedSlot(QAbstractSocket::SocketState state)
{
    QTcpSocket* socket = qobject_cast<QTcpSocket*>(sender());
    if(socket && (QAbstractSocket::ConnectedState == state))
    {
    }
    else if( socket && (QAbstractSocket::UnconnectedState == state))
    {
        QMap<char,QTcpSocket*>::const_iterator it = m_ClientMap.begin();
        for(;it!=m_ClientMap.end();it++)
        {
            if(it.value() == socket)
            {
                qDebug() << "Server: " << it.key() << " removed!" << endl;
                m_ClientMap.remove(it.key());
                m_SurplusDataMap.remove(int(socket));
                break;
            }
        }
    }
}

void MessageCenter::Start()
{
    if(m_TcpServer->listen(QHostAddress::AnyIPv4,9981))
    {
        connect(m_TcpServer,SIGNAL(newConnection()),this,SLOT(NewConnectionSlot()));
    }
}

void MessageCenter::FirstConn(char senderId,QTcpSocket* newSocket)
{
    char id = this->DistributeId(senderId,newSocket->peerAddress().toString()); //如果是内部模块,则为127.0.0.1
    if(id=='\0')//已达最大连接数,通知后断开其socket连接;
    {
        QByteArray bytes;
        bytes.append("@m?0.");
        this->SendMsg(newSocket,bytes);
        newSocket->close(); //断开此socket连接;
        return ;
    }
    else //颁发给此客户一个通信ID;
    {
        QByteArray bytes;
        bytes.append("@a");
        bytes.append(id);
        bytes.append("0.");
        this->SendMsg(newSocket,bytes);\
        qDebug() << "server: " << id << " connected!" << endl;
        m_ClientMap.insert(id,newSocket);
        m_SurplusDataMap.insert(int(newSocket),QByteArray());
    }
}

void MessageCenter::Broadcast(char senderId,QByteArray bytes)
{
    QMap<char,QTcpSocket*>::const_iterator it = m_ClientMap.begin();
    for(;it!=m_ClientMap.end();it++)
    {
        if(it.key() != senderId)
        {
            this->SendMsg(it.value(),bytes);
        }
    }
}

void MessageCenter::SendMsg(QTcpSocket* socket, QByteArray bytes)
{
    uint size = socket->write(bytes);
    socket->flush();
    while(size<(uint)bytes.length())
    {
        size += socket->write(bytes.right(bytes.length()-size));
        socket->flush();
    }
}

char MessageCenter::DistributeId(char senderId,const QString& address)
{
    QList<char> ids = m_ClientMap.keys();
    if( (senderId>='A'&&senderId<='Z') || (senderId>='a'&&senderId<='z'))
    {
        if(!ids.contains(senderId))//如果客户ID的列表中没有,则返回此预约的ID;
            return senderId;
    }
    if(address=="127.0.0.1") //内部组件;
    {
        for(char i='A';i<='Z';i++)
        {
            if(!ids.contains(i))
            {
                return i;
            }
        }
    }
    else //外部组件;
    {
        for(char i='a';i<='z';i++)
        {
            if(!ids.contains(i))
            {
                return i;
            }
        }
    }

    return '\0';
}

uint MessageCenter::GetUint(QByteArray bytes)
{
    uint sum = 0;
    if(bytes.length()==4)
    {
        sum = (bytes[0]-48)*1000 +
                (bytes[1]-48)*100 +
                (bytes[2]-48)*10 +
                (bytes[3]-48);
    }
    return sum;
}

QByteArray MessageCenter::GetBytesOfDataLen(uint num)
{
    QByteArray bytes;
    if(num>0 && num<1024)
    {
        if(num<10)
        {
            char n = num+48;
            bytes.append("000");
            bytes.append(n);
        }
        else if(num<100)
        {
            char n = num/10+48;
            bytes.append("00");
            bytes.append(n);
            n = num%10+48;
            bytes.append(n);
        }
        else if(num<1000)
        {
            char n = num/100+48;
            bytes.append('0');
            bytes.append(n);
            n = num%100/10+48;
            bytes.append(n);
            n = num%100%10+48;
            bytes.append(n);
        }
        else
        {
            char n = num/1000+48;
            bytes.append(n);
            n = num%1000/100+48;
            bytes.append(n);
            n = num%1000%100/10+48;
            bytes.append(n);
            n = num%1000%100%10+48;
            bytes.append(n);
        }
    }
    return bytes;
}

QByteArray MessageCenter::GetBytesOfOnlineClientId(char senderId)
{
    QByteArray bytes;
    bytes.clear();
    if(m_ClientMap.count()>0 && m_ClientMap.keys().contains(senderId))
    {
        QMap<char,QTcpSocket*>::const_iterator it = m_ClientMap.begin();
        for(;it!=m_ClientMap.end();it++)
        {
            char id = it.key();
            if(id != senderId)
            {
                bytes.append(id);
            }
        }
    }
    return bytes;
}

  评论这张
 
阅读(123)| 评论(0)
推荐

历史上的今天

在LOFTER的更多文章

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017