警告

本节包含了从 C++ 自动翻译到 Python 的代码片段,可能含有错误。

生产者和消费者使用等待条件#

生产者和消费者使用等待条件示例展示了如何使用 QWaitConditionQMutex 控制生产者线程和消费者线程对共享循环缓冲区的访问。

生产者在缓冲区充满时向缓冲区写入数据,达到缓冲区末尾时,它重新从开始处写入,覆盖现有数据。消费者线程按数据产生的顺序读取数据并将其写入标准错误。

等待条件使得支持比单纯使用互斥锁更高的并发级别成为可能。如果缓冲区的访问仅通过一个 QMutex 来保护,那么消费者线程就不能在同时与生产者线程访问缓冲区。然而,两个线程同时工作在缓冲区的 不同部分 上是没有任何害处的。

示例包括两个类:ProducerConsumer。这两个类都继承自 QThread 。用于在这两个类之间通信的循环缓冲区和保护它的同步工具是全局变量。

另一种解决生产者-消费者问题的方法是不使用 QWaitConditionQMutex 而是使用 QSemaphore 。这正是 生产者和消费者使用信号量 示例所做的事情。

全局变量#

让我们先回顾一下循环缓冲区和相关的同步工具。

constexpr int DataSize = 100000
constexpr int BufferSize = 8192
QMutex mutex # protects the buffer and the counter
buffer[BufferSize] = char()
numUsedBytes = int()
bufferNotEmpty = QWaitCondition()
bufferNotFull = QWaitCondition()

DataSize 是生产者将生成的数据量。为了尽可能简单,我们将其设为常量。 BufferSize 是循环缓冲区的大小。它小于 DataSize,这意味着生产者在某些时候会到达缓冲区末尾,然后从开头重新开始。

为了同步生产者和消费者,我们需要两个等待条件和一个是互斥量。当生产者生成一些数据时,会触发bufferNotEmpty条件,告知消费者可以开始读取。当消费者读取了一些数据时,会触发bufferNotFull条件,告诉生产者可以生成更多数据。而numUsedBytes表示缓冲区内含有数据的字节数。

这三个要素(等待条件、互斥量和numUsedBytes计数器)确保生产者不会比消费者超出BufferSize字节的进度,同时消费者也不会读取生产者尚未生成的数据。

生产者类#

下面我们来看看Producer类的代码

class Producer(QThread):

# public
    def Producer(None):
    super().__init__(parent)


# private
    def run():

        for i in range(0, DataSize):

                locker = QMutexLocker(mutex)
                while numUsedBytes == BufferSize:
                    bufferNotFull.wait(mutex)

            buffer[i % BufferSize] = "ACGT"[QRandomGenerator.global().bounded(4)]

                locker = QMutexLocker(mutex)
                numUsedBytes = numUsedBytes + 1
                bufferNotEmpty.wakeAll()

生产者生成DataSize字节数据。在向循环缓冲区写入一个字节之前,它必须首先检查缓冲区是否已满(即numUsedBytes等于BufferSize)。如果缓冲区已满,则线程会在bufferNotFull条件下等待。

最后,生产者增加numUsedBytes,并信号表示条件bufferNotEmpty为真,因为numUsedBytes必然大于0。

我们使用互斥量来保护对numUsedBytes变量的所有访问。此外,wait()函数接收一个互斥量作为其参数。这个互斥量在线程被放置休眠之前被解锁,在线程醒来时被锁定。另外,从锁定状态到等待状态的转换是原子的,以防止竞争条件的发生。

消费者类#

现在让我们看看Consumer类的代码

class Consumer(QThread):

# public
    def Consumer(None):
    super().__init__(parent)


# private
    def run():

        for i in range(0, DataSize):

                locker = QMutexLocker(mutex)
                while numUsedBytes == 0:
                    bufferNotEmpty.wait(mutex)

            fprintf(stderr, "%c", buffer[i % BufferSize])

                locker = QMutexLocker(mutex)
                numUsedBytes = numUsedBytes - 1
                bufferNotFull.wakeAll()


        fprintf(stderr, "\n")

代码和产生者非常相似。在我们读取字节之前,我们会检查缓冲区是否为空(numUsedBytes为0)而不是是否已满。如果缓冲区为空,我们会等待bufferNotEmpty条件。在读取字节后,我们会减少numUsedBytes(而不是增加),并发送信号来触发bufferNotFull条件(而不是bufferNotEmpty条件)。

主函数#

main()中,我们创建了两个线程,并调用wait()来确保在我们退出之前,两个线程都有时间完成。

if __name__ == "__main__":
app = QCoreApplication(argc, argv)
producer = Producer()
consumer = Consumer()
producer.start()
consumer.start()
producer.wait()
consumer.wait()
return 0

当运行这个程序时会发生什么?最初,只有生产者线程可以执行操作;消费者因为等待 bufferNotEmpty 条件被触发(numUsedBytes 为 0)而被阻塞。一旦生产者在缓冲区中放入一个字节,numUsedBytes 就会严格大于 0,并触发 bufferNotEmpty 条件。这时,可能发生两件事:消费者线程接管并读取该字节,或者生产者继续生产第二个字节。

本例中展示的生产者-消费者模型使得编写高度并发的多线程应用程序成为可能。在多处理器机器上,程序的速度理论上是等价于基于互斥锁程序的两倍,因为两个线程可以同时在不同缓冲区部分上活跃。

但是需要注意的是,这些好处并不总是能实现。对 QMutex 加锁和解锁是有成本的。在实践中,可能值得将缓冲区分成块,并操作这些块而不是单个字节。此外,缓冲区大小也是一个需要根据实验仔细选择的参数。

示例项目 @ code.qt.io