使用等待条件的生产者与消费者

QWaitConditionQMutex 的使用示例演示了如何通过生产者线程和消费者线程共享的环形缓冲器来控制对缓冲器的访问。

生产者将数据写入缓冲区,直到到达缓冲区末尾,然后从开头重新开始,覆盖现有数据。消费者线程读取生成的数据并将其写入标准错误。

等待条件使得具有比使用互斥锁更高的并发级别成为可能。如果仅通过 QMutex 来保护缓冲区的访问,则消费者线程不能在生产者线程同时访问缓冲区。然而,两个线程同时在不同部分工作于缓冲区上是没有害处的。

该示例包括两个类:ProducerConsumer。两个类都继承自 QThread。用于这两个类之间通信的环形缓冲区和保护它的同步工具都是全局变量。

使用 QWaitConditionQMutex 解决生产者-消费者问题的另一种方法是使用 QSemaphore。这正是 使用信号量生产者与消费者 示例所完成的。

全局变量

让我们从查看环形缓冲区和相关的同步工具开始

constexpr int DataSize = 100000;
constexpr int BufferSize = 8192;

QMutex mutex; // protects the buffer and the counter
char buffer[BufferSize];
int numUsedBytes;

QWaitCondition bufferNotEmpty;
QWaitCondition bufferNotFull;

DataSize 是生产者将生成的数据量。为了使示例尽可能简单,我们将其设为常量。BufferSize 是环形缓冲区的大小。它小于 DataSize,这意味着在某个时刻生产者将到达缓冲区末尾,然后从开头重新开始。

为了同步生产者和消费者,我们需要两个等待条件和一对互斥锁。当生产者生成了某些数据时,bufferNotEmpty 条件被触发,告诉消费者可以开始读取。当消费者读取了一些数据时,bufferNotFull 条件被触发,告诉生产者可以生成更多数据。numUsedBytes 是缓冲区中包含数据的字节数。

等待条件、互斥锁以及 numUsedBytes 计数器共同确保生产者永远不会比消费者超出 BufferSize 字节,并且消费者永远不会读取生产者尚未生成的数据。

生产者类

让我们回顾一下 Producer 类的代码

class Producer : public QThread
{
public:
    explicit Producer(QObject *parent = nullptr)
        : QThread(parent)
    {
    }

private:
    void run() override
    {
        for (int i = 0; i < DataSize; ++i) {
            {
                const QMutexLocker locker(&mutex);
                while (numUsedBytes == BufferSize)
                    bufferNotFull.wait(&mutex);
            }

            buffer[i % BufferSize] = "ACGT"[QRandomGenerator::global()->bounded(4)];

            {
                const QMutexLocker locker(&mutex);
                ++numUsedBytes;
                bufferNotEmpty.wakeAll();
            }
        }
    }
};

生产者生成 DataSize 字节数据。在它将字节写入环形缓冲区之前,必须首先检查缓冲区是否已满(即numUsedBytes等于BufferSize)。如果缓冲区已满,则线程将在bufferNotFull条件下等待。

最后,生产者会增加numUsedBytes,并通知bufferNotEmpty条件为true,因为numUsedBytes必然大于0。

我们使用互斥锁来保护对numUsedBytes变量的所有访问。此外,QWaitCondition::wait()函数接受一个互斥锁作为其参数。在将线程置于休眠状态之前,该互斥锁被解锁,在线程唤醒时被锁定。此外,从锁定状态到等待状态的转换是原子的,以防止发生竞争条件。

消费者类

我们现在转向Consumer

class Consumer : public QThread
{
public:
    explicit Consumer(QObject *parent = nullptr)
        : QThread(parent)
    {
    }

private:
    void run() override
    {
        for (int i = 0; i < DataSize; ++i) {
            {
                const QMutexLocker locker(&mutex);
                while (numUsedBytes == 0)
                    bufferNotEmpty.wait(&mutex);
            }

            fprintf(stderr, "%c", buffer[i % BufferSize]);

            {
                const QMutexLocker locker(&mutex);
                --numUsedBytes;
                bufferNotFull.wakeAll();
            }
        }
        fprintf(stderr, "\n");
    }
};

代码与生产者非常相似。在我们读取字节之前,我们检查缓冲区是否为空(numUsedBytes是0),而不是是否已满,并在缓冲区为空的情况下等待bufferNotEmpty条件。在读取字节后,我们减小numUsedBytes(而不是增加它),并发信号bufferNotFull条件(而不是bufferNotEmpty条件)。

main()函数

main()中,我们创建两个线程,并调用QThread::wait()以确保在我们退出之前,两个线程都有时间去完成。

int main(int argc, char *argv[])
{
    QCoreApplication app(argc, argv);
    Producer producer;
    Consumer consumer;
    producer.start();
    consumer.start();
    producer.wait();
    consumer.wait();
    return 0;
}

那么我们运行程序会发生什么?最初,只有生产者线程可以进行任何操作;消费者被阻塞等待bufferNotEmpty条件被触发(numUsedBytes是0)。一旦生产者在缓冲区中放置了一个字节,numUsedBytes严格大于0,并且触发bufferNotEmpty条件。在这种情况下,可能发生两件事:要么消费者线程接管并读取该字节,要么生产者生产第二个字节。

本例中所展示的生产者-消费者模型使得编写高度并发的多线程应用程序成为可能。在多处理器机器上,该程序可能比等效的基于互斥锁的程序快两倍,因为两个线程可以在缓冲区不同的部分同时活动。

但请注意,这些收益并不总是能实现。锁定和解锁一个QMutex是有成本的。在实践中,可能值得将缓冲区分成块,并用块而不是单个字节进行操作。缓冲区大小也是一个必须根据实验仔细选择的参数。

示例项目 @ code.qt.io

© 2024 The Qt Company Ltd. 本文档中的文档贡献的版权分别归其所有者所有。本提供的文档是根据自由软件基金会发布的GNU自由文档许可证版本1.3的条款许可的。Qt及其相应品牌是芬兰和/或世界其他地区的The Qt Company Ltd.的商标。所有其他商标归其各自所有者所有。