博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
BOOST 线程完全攻略 - 扩展 - 线程消息通讯
阅读量:5883 次
发布时间:2019-06-19

本文共 10578 字,大约阅读时间需要 35 分钟。

hot3.png

// controlled_module_ex.hpp : controlled_module类的扩展// 增强线程之间消息通讯// 增加线程安全启动和安全关闭功能// 增加定时器功能#pragma once#include 
#include 
#include "controlled_module.hpp"struct _command{    typedef boost::shared_ptr<_command> CCmdPtr;    unsigned int nCmd;    boost::any anyParam;};struct _wait_command{           boost::any par;    unsigned int command;    void * event;    boost::shared_ptr
 resp;};class controlled_module_ex;struct _notify{    controlled_module_ex * sender;    int id;    boost::any par;};#define BM_RESERVE 1000#define BM_RING_START BM_RESERVE+1#define BM_RING_STOP BM_RESERVE+2#define BM_RING_SETTIME  BM_RESERVE+3#define BM_RING_SETPARENT BM_RESERVE+4#define BM_RING_CYCLE BM_RESERVE+5#define BM_RING_PROCESS BM_RESERVE+6#define BM_RING_PROCESSEND BM_RESERVE+7#define BM_RING_PROCESSFAIL BM_RESERVE+8#define BM_TIMER    BM_RESERVE+9#define BM_COMMAND  BM_RESERVE+10#define BM_NOTIFY   BM_RESERVE+11#define BM_USER 9000class controlled_timer;class controlled_module_ex: public controlled_module{public:    controlled_module_ex()    {        m_safe = false;    }    ~controlled_module_ex()    {        safestop();    }public:    template
    bool postmessage(unsigned int nCmd, const boost::shared_ptr
& p)    {        if(this==0||!m_safe)return false;        boost::mutex::scoped_lock lock(m_mutex_command);        _command::CCmdPtr cmd(new _command);        cmd->nCmd = nCmd;        cmd->anyParam = p;        m_list_command.push_back(cmd);        return true;    }    boost::any execute(unsigned int command,boost::any par,int timeout=-1)    {        boost::shared_ptr<_wait_command> shared(new _wait_command);        _wait_command & cmd = *shared;        cmd.command = command;        cmd.event = (void *)CreateEvent(0,FALSE,FALSE,0);        cmd.par = par;        cmd.resp = boost::shared_ptr
(new boost::any);        if(this->postmessage(BM_COMMAND,shared))        {            DWORD dw = WaitForSingleObject(cmd.event,timeout);            CloseHandle(cmd.event);            if(dw!=WAIT_OBJECT_0)                return boost::any();            else                return *cmd.resp;        }        else        {            CloseHandle(cmd.event);            return boost::any();        }    }    void notify(_notify p)    {        this->postmessage(BM_NOTIFY,p);    }    bool postmessage(unsigned int nCmd,boost::any p)    {        if(this==0||!m_safe)            return false;        boost::mutex::scoped_lock lock(m_mutex_command);        _command::CCmdPtr cmd(new _command);        cmd->nCmd = nCmd;        cmd->anyParam = p;        m_list_command.push_back(cmd);        return true;    }    bool postmessage(unsigned int nCmd)    {        if(this==0||!m_safe)            return false;        boost::mutex::scoped_lock lock(m_mutex_command);        _command::CCmdPtr cmd(new _command);        cmd->nCmd = nCmd;        cmd->anyParam = 0;        m_list_command.push_back(cmd);        return true;    }    virtual bool work()    {        if(!getmessage())            return false;        else        {            Sleep(this->m_sleeptime);            return true;        }    }    virtual void message(const _command & cmd)    {        if(cmd.nCmd==BM_RING_START)        {            this->on_safestart();        }        else if(cmd.nCmd==BM_RING_STOP)        {            this->on_safestop();        }        else if(cmd.nCmd==BM_TIMER)        {            this->on_timer(boost::any_cast
(cmd.anyParam));        }        else if(cmd.nCmd==BM_COMMAND)        {            boost::shared_ptr<_wait_command> shared = boost::any_cast< boost::shared_ptr<_wait_command> >(cmd.anyParam);            _wait_command & cmd = *shared;            *cmd.resp = this->on_command(cmd.command,cmd.par);            SetEvent((HANDLE)cmd.event);        }        else if(cmd.nCmd==BM_NOTIFY)        {            try            {                _notify par = boost::any_cast<_notify>(cmd.anyParam);                this->on_notify(par);            }            catch(boost::bad_any_cast)            {            }        }    }    virtual void release()    {        boost::mutex::scoped_lock lock(m_mutex_command);        m_list_command.clear();    }    void safestart()    {        if(!islive())            start();        m_safe = true;        m_safestart_event = (void*)CreateEvent(NULL,FALSE,FALSE,0);        postmessage(BM_RING_START);        ::WaitForSingleObject((HANDLE)m_safestart_event,INFINITE);        CloseHandle(m_safestart_event);    }    void safestop()    {    if(this->islive())    {        m_safe = false;        m_safestop_event = (void*)CreateEvent(NULL,FALSE,FALSE,0);        {            boost::mutex::scoped_lock lock(m_mutex_command);            _command::CCmdPtr cmd(new _command);            cmd->nCmd = BM_RING_STOP;            cmd->anyParam = 0;            m_list_command.push_back(cmd);        }        DWORD dw = ::WaitForSingleObject((HANDLE)m_safestop_event,3*1000);        if(WAIT_OBJECT_0!=dw)        {        }        CloseHandle(m_safestop_event);        stop();    }    }    virtual void on_timer(const controlled_timer * p){}    virtual void on_safestart()    {        SetEvent(m_safestart_event);    }    virtual void on_safestop()    {        SetEvent(m_safestop_event);    }    virtual void on_notify(const _notify & p)    {    }protected:    virtual boost::any on_command(const unsigned int command,const boost::any par)    {        return boost::any();    }    bool getmessage()    {        std::list<_command::CCmdPtr> cache;        {            boost::mutex::scoped_lock lock(m_mutex_command);            while(!m_list_command.empty())            {                _command::CCmdPtr p = m_list_command.front();                m_list_command.pop_front();                cache.push_back(p);            }        }        _command::CCmdPtr stop_command;        std::list<_command::CCmdPtr>::iterator item;        for(item = cache.begin();item!=cache.end();item++)        {            if((*(*item)).nCmd==BM_RING_STOP)            {                stop_command = *item;                               break;            }        }        if(stop_command.get()==0)        {            while(!cache.empty())            {                _command::CCmdPtr p = cache.front();                cache.pop_front();                try                {                    if((*p).nCmd!=BM_RING_START)                    {                        if(!this->m_safe)                            continue;                    }                    this->message(*p);                }                catch(boost::bad_any_cast &)                {                }            }            return true;        }        else        {            cache.clear();            this->message(*stop_command);            return false;        }    }private:    void*   m_safestart_event;    void* m_safestop_event;    bool m_safe;//在多线程,尤其牵涉到线程之间有类似socket级别关联时,当父线程safestop以后有可能会收到其他线程的postmessage,这时会引起线程死锁,这个m_safe就是解决这个问题的,当safestop以后不再接收新消息处理    boost::mutex m_mutex_command;    std::list<_command::CCmdPtr> m_list_command;};class controlled_timer: public controlled_module_ex{public:  controlled_timer()  {    this->m_time = 0;    this->m_parent = 0;    this->m_step = 0;  }  ~controlled_timer(){  }protected:  controlled_module_ex* m_parent;  int m_time;  int m_step;public:  void starttimer(int time,controlled_module_ex* parent)  {    this->safestart();    this->postmessage(BM_RING_SETPARENT,parent);    this->postmessage(BM_RING_SETTIME,time);  }  void stoptimer()  {    this->safestop();  }public:  virtual void on_safestop()  {    m_time = 0;    controlled_module_ex::on_safestop();  }  virtual void message(const _command & cmd)  {    controlled_module_ex::message(cmd);    if(cmd.nCmd==BM_RING_SETTIME)    {        int time = boost::any_cast
(cmd.anyParam);        this->m_time = time/this->m_sleeptime;        this->postmessage(BM_RING_CYCLE);    }    else if(cmd.nCmd==BM_RING_SETPARENT)    {        this->m_parent  = boost::any_cast
(cmd.anyParam);    }       else if(cmd.nCmd==BM_RING_CYCLE)    {        if(m_time>0)        {            if(m_step>m_time)            {                m_parent->postmessage(BM_TIMER,this);                m_step=0;            }            m_step++;        }        this->postmessage(BM_RING_CYCLE);    }   }};1.向线程PostMessage  函数controlled_module_ex::postmessage完成消息推送。  虚拟函数controlled_module_ex::message(const _command & cmd)实现消息接收。  #include "controlled_module_ex.hpp"class thdex: public controlled_module_ex{protected:    virtual void message(const _command & cmd)    {        controlled_module_ex::message(cmd);        if(cmd.nCmd==BM_USER+1)        {            cout << "get message" << endl;        }    }};int _tmain(int argc, _TCHAR* argv[]){    thdex t;    t.safestart();    t.postmessage(BM_USER+1);    char buf[10];    gets_s(buf,sizeof buf);    t.safestop();    return 0;}  2.向线程PostMessage,并携带简单对象参数 我们都知道常规的PostMessage要传参,如果是整型参数,还可以用强制转换,但如果是其他类型,例如字符串,我们就必须创建一个字符串缓冲,把缓冲指针作为参数传过去,线程还不能忘记删除,否则导致内存泄漏,自定义结构也是一样的操作,如果想尝试传递一个CString对象,是不可能的。  幸运的是boost提供了boost::any来抽象任何对象类型,controlled_module_ex的消息传递都是基于boost::any来完成,程序员可以由此写出干净而且内存安全的代码。#include "controlled_module_ex.hpp"struct mystruct{    string a;    int b;};class thdex: public controlled_module_ex{protected:    virtual void message(const _command & cmd)    {        controlled_module_ex::message(cmd);        if(cmd.nCmd==BM_USER+1)        {            cout << "get integer:" << boost::any_cast
(cmd.anyParam) << endl;        }        if(cmd.nCmd==BM_USER+2)        {            cout << "get string:" << boost::any_cast
(cmd.anyParam) << endl;        }        if(cmd.nCmd==BM_USER+3)        {            mystruct par = boost::any_cast
(cmd.anyParam);            cout << "get mystruct:" << par.a << "," << par.b << endl;        }    }};int _tmain(int argc, _TCHAR* argv[]){    thdex t;    t.safestart();    t.postmessage(BM_USER+1,123);    t.postmessage(BM_USER+2,string("hello world"));    mystruct par;    par.a = "hello world";    par.b = 123;    t.postmessage(BM_USER+3,par);    char buf[10];    gets_s(buf,sizeof buf);    t.safestop();    return 0;}3.向线程PostMessage,并传递内存块参数  假如我们书写了一个录音子线程,如何将录制的语音数据传递给其他线程呢,常规做法是创建一个缓冲,将语音数据填充进去,然后将缓冲地址作为参数传递,这种做法要求接收线程不能忘记删除,否则会导致内存泄漏。  幸运的是boost提供了智能指针,可以类似java,c#的智能内存回收一样来管理内存分配,我们如果使用这个对象来作为参数传递,就可以完美的防范内存泄漏行为,就算子线程没有处理,别担心,内存它会自动回收的。#include "controlled_module_ex.hpp"struct mystruct{    boost::shared_ptr
 data;    int datalen;};class thdex: public controlled_module_ex{protected:    virtual void message(const _command & cmd)    {        controlled_module_ex::message(cmd);        if(cmd.nCmd==BM_USER+1)        {            cout << "get sharedptr" << endl; //仅仅得到数据,得不到数据长度        }        if(cmd.nCmd==BM_USER+2)        {            mystruct par = boost::any_cast
(cmd.anyParam);            cout << "get sharedptr len:" << par.datalen << endl;        }    }};int _tmain(int argc, _TCHAR* argv[]){    thdex t;    t.safestart();    t.postmessage(BM_USER+1,boost::shared_ptr
(new char[1000]));    mystruct par;    par.datalen = 1000;    par.data = boost::shared_ptr
(new char[par.datalen]);    t.postmessage(BM_USER+2,par);    char buf[10];    gets_s(buf,sizeof buf);    t.safestop();    return 0;}3.向线程SendMessage  函数controlled_module_ex::execute完成这个工作  虚拟函数controlled_module_ex::on_command(const unsigned int command,const boost::any par)响应消息处理#include "controlled_module_ex.hpp"class thdex: public controlled_module_ex{protected:    boost::any on_command(const unsigned int command,const boost::any par)    {        if(command==1)        {            cout << "on command" << endl;            return 0;        }        if(command==2)        {            cout << "on command,par:" << boost::any_cast
(par) << endl;            return 0;        }        if(command==3)        {            return true;        }        else            return controlled_module_ex::on_command(command,par);    }};int _tmain(int argc, _TCHAR* argv[]){    thdex t;    t.safestart();    t.execute(1,0);//等待子线程处理完成    t.execute(2,string("hello world"));//带参数 等待子线程完成    bool rs = boost::any_cast
(t.execute(3,0));//等待子线程处理完成,并取得返回值    cout << "get thread result:" << rs << endl;    boost::any timeout = t.execute(4,0,1000);//等待子线程处理,超时1秒    if(timeout.empty())        cout << "timeout " << endl;    char buf[10];    gets_s(buf,sizeof buf);    t.safestop();    return 0;}4.定时器  类似于CWnd::OnTimer,controlled_module_ex也提供一个虚拟函数virtual void on_timer(const controlled_timer * p);来处理定时#include "controlled_module_ex.hpp"class thdex: public controlled_module_ex{protected:    virtual void on_timer(const controlled_timer *p)    {        cout << "ontimer" << endl;    }};int _tmain(int argc, _TCHAR* argv[]){    thdex t;    controlled_timer timer;    t.safestart();    timer.starttimer(1000,&t);    char buf[10];    gets_s(buf,sizeof buf);    t.safestop();    return 0;}

转载于:https://my.oschina.net/lcxidian/blog/381652

你可能感兴趣的文章
Sublime console installation instructions install Package Control
查看>>
javaweb各种框架组合案例(三):maven+spring+springMVC+hibernate
查看>>
【HDOJ】2319 Card Trick
查看>>
MySQL在Linux下的安装
查看>>
第二周 登录小界面
查看>>
php函数 array_values()
查看>>
兼容firefox的iframe高度自适应代码
查看>>
Angularjs学习笔记(三)----依赖注入
查看>>
小学期软件工程团队项目进度1
查看>>
MOOC_Java进阶_翁恺讲_第三周题
查看>>
React-router匹配的组件 如何使用?
查看>>
HTML5 JS 实现浏览器全屏(F11的效果)
查看>>
Python进修> What is Python ?
查看>>
Thinkphp 学习笔记
查看>>
[洛谷P1631] 序列合并
查看>>
全局组件父子传参(父传子)
查看>>
Linux vsftpd编译安装和配置允许本地用户登录FTP
查看>>
技术 | 使用深度学习检测DGA(域名生成算法)
查看>>
使用java中的反射实现简单的web请求解耦
查看>>
js网页滚动条滚动事件
查看>>