使用等待条件的生产者与消费者
QWaitCondition 和 QMutex 的使用示例演示了如何通过生产者线程和消费者线程共享的环形缓冲器来控制对缓冲器的访问。
生产者将数据写入缓冲区,直到到达缓冲区末尾,然后从开头重新开始,覆盖现有数据。消费者线程读取生成的数据并将其写入标准错误。
等待条件使得具有比使用互斥锁更高的并发级别成为可能。如果仅通过 QMutex 来保护缓冲区的访问,则消费者线程不能在生产者线程同时访问缓冲区。然而,两个线程同时在不同部分工作于缓冲区上是没有害处的。
该示例包括两个类:Producer
和 Consumer
。两个类都继承自 QThread。用于这两个类之间通信的环形缓冲区和保护它的同步工具都是全局变量。
使用 QWaitCondition 和 QMutex 解决生产者-消费者问题的另一种方法是使用 QSemaphore。这正是 使用信号量生产者与消费者 示例所完成的。
全局变量
让我们从查看环形缓冲区和相关的同步工具开始
constexpr int DataSize = 100000; constexpr int BufferSize = 8192; QMutex mutex; // protects the buffer and the counter char buffer[BufferSize]; int numUsedBytes; QWaitCondition bufferNotEmpty; QWaitCondition bufferNotFull;
DataSize
是生产者将生成的数据量。为了使示例尽可能简单,我们将其设为常量。BufferSize
是环形缓冲区的大小。它小于 DataSize
,这意味着在某个时刻生产者将到达缓冲区末尾,然后从开头重新开始。
为了同步生产者和消费者,我们需要两个等待条件和一对互斥锁。当生产者生成了某些数据时,bufferNotEmpty
条件被触发,告诉消费者可以开始读取。当消费者读取了一些数据时,bufferNotFull
条件被触发,告诉生产者可以生成更多数据。numUsedBytes
是缓冲区中包含数据的字节数。
等待条件、互斥锁以及 numUsedBytes
计数器共同确保生产者永远不会比消费者超出 BufferSize
字节,并且消费者永远不会读取生产者尚未生成的数据。
生产者类
让我们回顾一下 Producer
类的代码
class Producer : public QThread { public: explicit Producer(QObject *parent = nullptr) : QThread(parent) { } private: void run() override { for (int i = 0; i < DataSize; ++i) { { const QMutexLocker locker(&mutex); while (numUsedBytes == BufferSize) bufferNotFull.wait(&mutex); } buffer[i % BufferSize] = "ACGT"[QRandomGenerator::global()->bounded(4)]; { const QMutexLocker locker(&mutex); ++numUsedBytes; bufferNotEmpty.wakeAll(); } } } };
生产者生成 DataSize
字节数据。在它将字节写入环形缓冲区之前,必须首先检查缓冲区是否已满(即numUsedBytes
等于BufferSize
)。如果缓冲区已满,则线程将在bufferNotFull
条件下等待。
最后,生产者会增加numUsedBytes
,并通知bufferNotEmpty
条件为true,因为numUsedBytes
必然大于0。
我们使用互斥锁来保护对numUsedBytes
变量的所有访问。此外,QWaitCondition::wait()函数接受一个互斥锁作为其参数。在将线程置于休眠状态之前,该互斥锁被解锁,在线程唤醒时被锁定。此外,从锁定状态到等待状态的转换是原子的,以防止发生竞争条件。
消费者类
我们现在转向Consumer
类
class Consumer : public QThread { public: explicit Consumer(QObject *parent = nullptr) : QThread(parent) { } private: void run() override { for (int i = 0; i < DataSize; ++i) { { const QMutexLocker locker(&mutex); while (numUsedBytes == 0) bufferNotEmpty.wait(&mutex); } fprintf(stderr, "%c", buffer[i % BufferSize]); { const QMutexLocker locker(&mutex); --numUsedBytes; bufferNotFull.wakeAll(); } } fprintf(stderr, "\n"); } };
代码与生产者非常相似。在我们读取字节之前,我们检查缓冲区是否为空(numUsedBytes
是0),而不是是否已满,并在缓冲区为空的情况下等待bufferNotEmpty
条件。在读取字节后,我们减小numUsedBytes
(而不是增加它),并发信号bufferNotFull
条件(而不是bufferNotEmpty
条件)。
main()函数
在main()
中,我们创建两个线程,并调用QThread::wait()以确保在我们退出之前,两个线程都有时间去完成。
int main(int argc, char *argv[]) { QCoreApplication app(argc, argv); Producer producer; Consumer consumer; producer.start(); consumer.start(); producer.wait(); consumer.wait(); return 0; }
那么我们运行程序会发生什么?最初,只有生产者线程可以进行任何操作;消费者被阻塞等待bufferNotEmpty
条件被触发(numUsedBytes
是0)。一旦生产者在缓冲区中放置了一个字节,numUsedBytes
严格大于0,并且触发bufferNotEmpty
条件。在这种情况下,可能发生两件事:要么消费者线程接管并读取该字节,要么生产者生产第二个字节。
本例中所展示的生产者-消费者模型使得编写高度并发的多线程应用程序成为可能。在多处理器机器上,该程序可能比等效的基于互斥锁的程序快两倍,因为两个线程可以在缓冲区不同的部分同时活动。
但请注意,这些收益并不总是能实现。锁定和解锁一个QMutex是有成本的。在实践中,可能值得将缓冲区分成块,并用块而不是单个字节进行操作。缓冲区大小也是一个必须根据实验仔细选择的参数。
© 2024 The Qt Company Ltd. 本文档中的文档贡献的版权分别归其所有者所有。本提供的文档是根据自由软件基金会发布的GNU自由文档许可证版本1.3的条款许可的。Qt及其相应品牌是芬兰和/或世界其他地区的The Qt Company Ltd.的商标。所有其他商标归其各自所有者所有。