使用信号量进行生产者与消费者操作

使用信号量进行生产者与消费者操作的示例演示了如何使用QSemaphore 来控制由生产者和消费者线程共享的环形缓冲区的访问。

生产者将数据写入缓冲区,直到达到缓冲区的末尾,此时它从开始处重新启动,覆盖现有数据。消费者线程在生成数据的同时读取数据并将其写入标准错误。

信号量使并发级别高于互斥锁。如果缓冲区的访问受QMutex 的保护,消费者线程就无法在生产者线程访问缓冲区时同时访问缓冲区。然而,两个线程同时处理缓冲区的不同部分并无害处。

此例包括两个类:ProducerConsumer。它们都继承自 QThread。用于这两个类之间通信的环形缓冲区及其保护的信号量是全局变量。

解决生产者-消费者问题的另一种方法是使用 QWaitConditionQMutex,这是使用等待条件的生产者与消费者 示例所采用的。

全局变量

让我们首先回顾环形缓冲区和相关的信号量。

constexpr int DataSize = 100000;

constexpr int BufferSize = 8192;
char buffer[BufferSize];

QSemaphore freeBytes(BufferSize);
QSemaphore usedBytes;

DataSize 是生产者将生成的数据量。为了使示例尽可能简单,我们将其设为常数。BufferSize 是环形缓冲区的大小。它小于 DataSize,这意味着生产者会在某个时刻达到缓冲区的末尾,然后重新从开始处启动。

为了同步生产者和消费者,我们需要两个信号量。freeBytes 信号量控制缓冲区的“空闲”区域(生产者尚未填充数据或消费者已读取的区域)。usedBytes 信号量控制缓冲区的“已使用”区域(生产者已填充但消费者尚未读取的区域)。

信号量共同确保生产者永远不会领先消费者超过 BufferSize 字节,并且在消费者不会读取生产者尚未生成的数据。

freeBytes 信号量初始化为 BufferSize(因为初始时缓冲区是空的)。usedBytes 信号量初始化为 0(如果未指定则默认值)。

生产者类

让我们回顾 Producer 类的代码。

class Producer : public QThread
{
public:
    void run() override
    {
        for (int i = 0; i < DataSize; ++i) {
            freeBytes.acquire();
            buffer[i % BufferSize] = "ACGT"[QRandomGenerator::global()->bounded(4)];
            usedBytes.release();
        }
    }
};

生产者生成 DataSize 字节数据。在它将字节写入环形缓冲区之前,必须使用 freeBytes 信号量获取一个“空闲”字节。如果消费者没有跟上生产者的步伐,QSemaphore::acquire() 调用可能会阻塞。

最后,生产者使用 usedBytes 信号量释放一个字节。一个“空闲”字节已经成功转变为“使用中”的字节,准备由消费者读取。

消费者类

让我们现在转向 Consumer

class Consumer : public QThread
{
public:
    void run() override
    {
        for (int i = 0; i < DataSize; ++i) {
            usedBytes.acquire();
            fprintf(stderr, "%c", buffer[i % BufferSize]);
            freeBytes.release();
        }
        fprintf(stderr, "\n");
    }
};

代码非常类似于生产者,只是这次我们获取一个“使用中”的字节并释放一个“空闲”的字节,而不是相反。

main() 函数

main() 中,我们创建两个线程并调用电QThread::wait() 来确保在退出之前,两个线程都有足够的时间完成。

int main(int argc, char *argv[])
{
    QCoreApplication app(argc, argv);
    Producer producer;
    Consumer consumer;
    producer.start();
    consumer.start();
    producer.wait();
    consumer.wait();
    return 0;
}

那么我们运行程序时会发生什么?最初,只有生产者线程可以进行任何操作;消费者因为等待 usedBytes 信号量被释放而被阻塞(其初始 available() 计数为 0)。一旦生产者将一个字节放入缓冲区,freeBytes.available()BufferSize - 1,而 usedBytes.available() 是 1。在这种情况下,可能会发生两件事:要么消费者线程接管并读取该字节,要么生产者线程生产第二个字节。

本例中展示的生产者-消费者模型使得编写高度并发的多线程应用程序成为可能。在多处理器机器上,该程序可能比等效的基于互斥锁的程序快两倍,因为两个线程可以在不同的缓冲区部分同时活跃。

但请注意,这些好处并不总是能实现。获取和释放一个 QSemaphore 是有成本的。在实践中,可能值得将缓冲区划分为多个块,并使用块而不是单个字节进行操作。缓冲区大小也是一个必须经过实验仔细选择的参数。

示例项目 @ code.qt.io

© 2024 Qt 公司。本文件中包含的文档贡献属于其各自的版权所有者。本文件中的文档是根据由自由软件基金会发布的 GNU 自由文档许可证版本 1.3 的条款许可的。Qt 和相应的商标是芬兰的 Qt 公司及其在世界各地的子公司和关联公司的商标。所有其他商标都属于其各自的所有者。