警告

本部分包含从C++自动翻译到Python的代码段,可能包含错误。

使用信号量进行生产和消费#

使用信号量进行生产和消费的示例说明如何使用QSemaphore 控制生产者和消费者线程共享的循环缓冲区的访问。

生产者向缓冲区写入数据,直到达到缓冲区末尾,此时它从开头重新开始,覆盖现有数据。消费者线程读取生成数据并将其写入标准错误。

信号量可以实现比互斥锁更高级别的并发。如果缓冲区的访问受QMutex 保护,则消费者线程无法在生产者线程同一时间访问缓冲区。然而,两个线程同时在工作于缓冲区的不同部分上是没有害处的。

此示例包含两个类:ProducerConsumer。两者都继承自QThread。这些类之间通信所使用的循环缓冲区和保护它的信号量是全局变量。

解决生产者-消费者问题而不使用QSemaphore 的另一种方法是使用QWaitConditionQMutex。这正是使用等待条件的生产者和消费者 示例所做的。

全局变量#

让我们首先回顾循环缓冲区和相关的信号量

constexpr int DataSize = 100000
constexpr int BufferSize = 8192
buffer[BufferSize] = char()
freeBytes = QSemaphore(BufferSize)
usedBytes = QSemaphore()

DataSize 是生产者将生成数据的数量。为了使示例尽可能简单,我们将其设为常量。BufferSize 是循环缓冲区的大小。它小于DataSize,这意味着生产者会在某个时候达到缓冲区末尾,并从开头重新开始。

为了同步生产者和消费者,我们需要两个信号量。freeBytes 信号量控制缓冲区的“空闲”区域(即生产者尚未填充数据的区域或消费者已经读取的区域)。usedBytes 信号量控制缓冲区的“使用”区域(即生产者已填充但消费者尚未读取的区域)。

信号量协同工作,确保生产者永远不会领先消费者超过 BufferSize 字节,并且消费者 never reads data that the producer hasn’t generated yet.

freeBytes 信号量初始化为 BufferSize,因为开始时整个缓冲区为空。而 usedBytes 信号量初始化为 0(非指定值时的默认值)。

生产者类#

下面我们来查看 Producer 类的代码

class Producer(QThread):

# public
    def run():

        for i in range(0, DataSize):
            freeBytes.acquire()
            buffer[i % BufferSize] = "ACGT"[QRandomGenerator.global().bounded(4)]
            usedBytes.release()

生产者生成 DataSize 字节数据。在它将字节写入循环缓冲区之前,它必须使用 freeBytes 信号量获取一个“空闲”字节。如果消费者没有跟上生产者的步伐,acquire() 调用可能会阻塞。

到最后,生产者使用 usedBytes 信号量释放一个字节。这个“空闲”字节已经成功地转变成了“已使用”字节,准备好被消费者读取。

消费者类#

现在让我们看一下 Consumer

class Consumer(QThread):

# public
    def run():

        for i in range(0, DataSize):
            usedBytes.acquire()
            fprintf(stderr, "%c", buffer[i % BufferSize])
            freeBytes.release()

        fprintf(stderr, "\n")

代码与生产者非常相似,但这次我们获取一个“已使用”字节并释放一个“空闲”字节,而不是反其道而行之。

主函数#

main() 中,我们创建了两个线程,并调用 wait() 确保在退出之前,两个线程都有时间完成。

if __name__ == "__main__":
app = QCoreApplication(argc, argv)
producer = Producer()
consumer = Consumer()
producer.start()
consumer.start()
producer.wait()
consumer.wait()
return 0

那么,当我们运行程序时会发生什么?最初,只有一个生产者线程可以进行操作;消费者被阻塞,等待 usedBytes 信号量被释放(它的初始 available() 计数是 0)。一旦生产者将一个字节放入缓冲区,freeBytes.available() 就是 BufferSize - 1,而 usedBytes.available() 是 1。这时,可能会发生两件事:消费者线程接管并读取该字节,或者生产者线程继续生产第二个字节。

这个例子中介绍的生产者-消费者模型使得编写高性能的多线程应用程序成为可能。在多核机器上,由于两个线程可以同时在不同的缓冲区部分活动,程序可能是基于互斥锁的程序的两倍快。

然而,要注意的是,这些好处并不总是能够实现。获取和释放 QSemaphore 有一定的开销。在实际情况中,可能值得将缓冲区分成块,并操作这些块而不是单个字节。缓冲区大小也是一个必须仔细根据实验选择的参数。

示例项目 @ code.qt.io