• 企业400电话
  • 网络优化推广
  • AI电话机器人
  • 呼叫中心
  • 全 部 栏 目

    网站建设 商标✡知产 微网小程序 电商运营 彩铃•短信 增值拓展业务
    linux中编写自己的并发队列类(Queue 并发阻塞队列)
    POST TIME:2021-10-18 16:38

    设计并发队列

    复制代码 代码如下:

    #include pthread.h>
    #include list>
    using namespace std;

    template typename T>
    class Queue
    {
    public:
        Queue( )
        {
            pthread_mutex_init(_lock, NULL);
        }
        ~Queue( )
        {
            pthread_mutex_destroy(_lock);
        }
        void push(const T data);
        T pop( );
    private:
        listT> _list;
        pthread_mutex_t _lock;
    };

    template typename T>
    void QueueT>::push(const T value )
    {
        pthread_mutex_lock(_lock);
        _list.push_back(value);
        pthread_mutex_unlock(_lock);
    }

    template typename T>
    T QueueT>::pop( )
    {
        if (_list.empty( ))
        {
            throw "element not found";
        }
        pthread_mutex_lock(_lock);
        T _temp = _list.front( );
        _list.pop_front( );
        pthread_mutex_unlock(_lock);
        return _temp;
    }

    上述代码是有效的。但是,请考虑这样的情况:您有一个很长的队列(可能包含超过 100,000 个元素),而且在代码执行期间的某个时候,从队列中读取数据的线程远远多于添加数据的线程。因为添加和取出数据操作使用相同的互斥锁,所以读取数据的速度会影响写数据的线程访问锁。那么,使用两个锁怎么样?一个锁用于读取操作,另一个用于写操作。给出修改后的 Queue 类。

    复制代码 代码如下:

    template typename T>
    class Queue
    {
    public:
        Queue( )
        {
            pthread_mutex_init(_rlock, NULL);
            pthread_mutex_init(_wlock, NULL);
        }
        ~Queue( )
        {
            pthread_mutex_destroy(_rlock);
            pthread_mutex_destroy(_wlock);
        }
        void push(const T data);
        T pop( );
    private:
        listT> _list;
        pthread_mutex_t _rlock, _wlock;
    };


    template typename T>
    void QueueT>::push(const T value )
    {
        pthread_mutex_lock(_wlock);
        _list.push_back(value);
        pthread_mutex_unlock(_wlock);
    }

    template typename T>
    T QueueT>::pop( )
    {
        if (_list.empty( ))
        {
            throw "element not found";
        }
        pthread_mutex_lock(_rlock);
        T _temp = _list.front( );
        _list.pop_front( );
        pthread_mutex_unlock(_rlock);
        return _temp;
    }

    设计并发阻塞队列

    目前,如果读线程试图从没有数据的队列读取数据,仅仅会抛出异常并继续执行。但是,这种做法不总是我们想要的,读线程很可能希望等待(即阻塞自身),直到有数据可用时为止。这种队列称为阻塞的队列。如何让读线程在发现队列是空的之后等待?一种做法是定期轮询队列。但是,因为这种做法不保证队列中有数据可用,它可能会导致浪费大量 CPU 周期。推荐的方法是使用条件变量,即 pthread_cond_t 类型的变量。

    复制代码 代码如下:

    template typename T>
    class BlockingQueue
    {
    public:
        BlockingQueue ( )
        {
            pthread_mutexattr_init(_attr);
            // set lock recursive
            pthread_mutexattr_settype(_attr,PTHREAD_MUTEX_RECURSIVE_NP);
            pthread_mutex_init(_lock,_attr);
            pthread_cond_init(_cond, NULL);
        }
        ~BlockingQueue ( )
        {
            pthread_mutex_destroy(_lock);
            pthread_cond_destroy(_cond);
        }
        void push(const T data);
        bool push(const T data, const int seconds); //time-out push
        T pop( );
        T pop(const int seconds); // time-out pop

    private:
        listT> _list;
        pthread_mutex_t _lock;
        pthread_mutexattr_t _attr;
        pthread_cond_t _cond;
    };

    template typename T>
    T BlockingQueueT>::pop( )
    {
        pthread_mutex_lock(_lock);
        while (_list.empty( ))
        {
            pthread_cond_wait(_cond, _lock) ;
        }
        T _temp = _list.front( );
        _list.pop_front( );
        pthread_mutex_unlock(_lock);
        return _temp;
    }

    template typename T>
    void BlockingQueue T>::push(const T value )
    {
        pthread_mutex_lock(_lock);
        const bool was_empty = _list.empty( );
        _list.push_back(value);
        pthread_mutex_unlock(_lock);
        if (was_empty)
            pthread_cond_broadcast(_cond);
    }

    并发阻塞队列设计有两个要注意的方面:

    1.可以不使用 pthread_cond_broadcast,而是使用 pthread_cond_signal。但是,pthread_cond_signal 会释放至少一个等待条件变量的线程,这个线程不一定是等待时间最长的读线程。尽管使用 pthread_cond_signal 不会损害阻塞队列的功能,但是这可能会导致某些读线程的等待时间过长。

    2.可能会出现虚假的线程唤醒。因此,在唤醒读线程之后,要确认列表非空,然后再继续处理。强烈建议使用基于 while 循环的 pop()。

    设计有超时限制的并发阻塞队列

    在许多系统中,如果无法在特定的时间段内处理新数据,就根本不处理数据了。例如,新闻频道的自动收报机显示来自金融交易所的实时股票行情,它每 n 秒收到一次新数据。如果在 n 秒内无法处理以前的一些数据,就应该丢弃这些数据并显示最新的信息。根据这个概念,我们来看看如何给并发队列的添加和取出操作增加超时限制。这意味着,如果系统无法在指定的时间限制内执行添加和取出操作,就应该根本不执行操作。

    复制代码 代码如下:

    template typename T>
    bool BlockingQueue T>::push(const T data, const int seconds)
    {
        struct timespec ts1, ts2;
        const bool was_empty = _list.empty( );
        clock_gettime(CLOCK_REALTIME, ts1);
        pthread_mutex_lock(_lock);
        clock_gettime(CLOCK_REALTIME, ts2);
        if ((ts2.tv_sec – ts1.tv_sec) seconds)
        {
            was_empty = _list.empty( );
            _list.push_back(value);
        }
        pthread_mutex_unlock(_lock);
        if (was_empty)
            pthread_cond_broadcast(_cond);
    }

    template typename T>
    T BlockingQueue T>::pop(const int seconds)
    {
        struct timespec ts1, ts2;
        clock_gettime(CLOCK_REALTIME, ts1);
        pthread_mutex_lock(_lock);
        clock_gettime(CLOCK_REALTIME, ts2);

        // First Check: if time out when get the _lock
        if ((ts1.tv_sec – ts2.tv_sec) seconds)
        {
            ts2.tv_sec += seconds; // specify wake up time
            while(_list.empty( ) (result == 0))
            {
                result = pthread_cond_timedwait(_cond, _lock, ts2) ;
            }
            if (result == 0) // Second Check: if time out when timedwait 
            {
                T _temp = _list.front( );
                _list.pop_front( );
                pthread_mutex_unlock(_lock);
                return _temp;
            }
        }
        pthread_mutex_unlock(lock);
        throw "timeout happened";
    }

    设计有大小限制的并发阻塞队列

    最后,讨论有大小限制的并发阻塞队列。这种队列与并发阻塞队列相似,但是对队列的大小有限制。在许多内存有限的嵌入式系统中,确实需要有大小限制的队列。
    对于阻塞队列,只有读线程需要在队列中没有数据时等待。对于有大小限制的阻塞队列,如果队列满了,写线程也需要等待。

    复制代码 代码如下:

    template typename T>
    class BoundedBlockingQueue
    {
    public:
        BoundedBlockingQueue (int size) : maxSize(size)
        {
            pthread_mutex_init(_lock, NULL);
            pthread_cond_init(_rcond, NULL);
            pthread_cond_init(_wcond, NULL);
            _array.reserve(maxSize);
        }
        ~BoundedBlockingQueue ( )
        {
            pthread_mutex_destroy(_lock);
            pthread_cond_destroy(_rcond);
            pthread_cond_destroy(_wcond);
        }
        void push(const T data);
        T pop( );
    private:
        vectorT> _array; // or T* _array if you so prefer
        int maxSize;
        pthread_mutex_t _lock;
        pthread_cond_t _rcond, _wcond;
    };

    template typename T>
    void BoundedBlockingQueue T>::push(const T value )
    {
        pthread_mutex_lock(_lock);
        const bool was_empty = _array.empty( );
        while (_array.size( ) == maxSize)
        {
            pthread_cond_wait(_wcond, _lock);
        }
        _array.push_back(value);
        pthread_mutex_unlock(_lock);
        if (was_empty)
            pthread_cond_broadcast(_rcond);
    }

    template typename T>
    T BoundedBlockingQueueT>::pop( )
    {
        pthread_mutex_lock(_lock);
        const bool was_full = (_array.size( ) == maxSize);
        while(_array.empty( ))
        {
            pthread_cond_wait(_rcond, _lock) ;
        }
        T _temp = _array.front( );
        _array.erase( _array.begin( ));
        pthread_mutex_unlock(_lock);
        if (was_full)
            pthread_cond_broadcast(_wcond);
        return _temp;
    }

    要注意的第一点是,这个阻塞队列有两个条件变量而不是一个。如果队列满了,写线程等待 _wcond 条件变量;读线程在从队列中取出数据之后需要通知所有线程。同样,如果队列是空的,读线程等待 _rcond 变量,写线程在把数据插入队列中之后向所有线程发送广播消息。如果在发送广播通知时没有线程在等待 _wcond 或 _rcond,会发生什么?什么也不会发生;系统会忽略这些消息。还要注意,两个条件变量使用相同的互斥锁。

     

    您可能感兴趣的文章:
    • linux中高并发socket最大连接数的优化详解
    • Linux netstat命令查看并发连接数的方法
    • Linux下高并发socket最大连接数所受的各种限制(详解)
    • linux并发连接50万的配置方法
    • 浅谈Linux环境下并发编程中C语言fork()函数的使用
    • Linux下apache如何限制并发连接和下载速度
    • Linux并发执行很简单,这么做就对了
    上一篇:Linux oracle数据库自动备份自动压缩脚本代码
    下一篇:linux多线程编程详解教程(线程通过信号量实现通信代码)
  • 相关文章
  • 

    关于我们 | 付款方式 | 荣誉资质 | 业务提交 | 代理合作


    © 2016-2020 巨人网络通讯

    时间:9:00-21:00 (节假日不休)

    地址:江苏信息产业基地11号楼四层

    《增值电信业务经营许可证》 苏B2-20120278

    X

    截屏,微信识别二维码

    微信号:veteran88

    (点击微信号复制,添加好友)

     打开微信