后台线程与QThread在PyQt

我有一个程序,它通过我在PyQt中编写的gui使用的无线电接口。 很明显,收音机的主要function之一就是传输数据,但是为了连续这样做,我必须循环写入,这会导致gui挂起。 由于我从来没有处理线程,我试图摆脱这些挂起使用QCoreApplication.processEvents(). 收音机需要在传输之间睡觉,所以gui仍然挂在这些睡眠持续多久。

有一个简单的方法来解决这个使用QThread? 我已经find了关于如何使用PyQt实现multithreading的教程,但是其中大部分都是关于设置服务器的,并且比我需要的要先进得多。 我真的不需要我的线程在运行时更新任何东西,我只需要启动它,让它在后台传输,然后停止。

我创build了一个小例子,展示了处理线程的三种不同的简单方法。 我希望这会帮助你find解决问题的正确途径。

 import sys import time from PyQt5.QtCore import (QCoreApplication, QObject, QRunnable, QThread, QThreadPool, pyqtSignal) # Subclassing QThread # http://qt-project.org/doc/latest/qthread.html class AThread(QThread): def run(self): count = 0 while count < 5: time.sleep(1) print("A Increasing") count += 1 # Subclassing QObject and using moveToThread # http://blog.qt.digia.com/blog/2007/07/05/qthreads-no-longer-abstract class SomeObject(QObject): finished = pyqtSignal() def long_running(self): count = 0 while count < 5: time.sleep(1) print("B Increasing") count += 1 self.finished.emit() # Using a QRunnable # http://qt-project.org/doc/latest/qthreadpool.html # Note that a QRunnable isn't a subclass of QObject and therefore does # not provide signals and slots. class Runnable(QRunnable): def run(self): count = 0 app = QCoreApplication.instance() while count < 5: print("C Increasing") time.sleep(1) count += 1 app.quit() def using_q_thread(): app = QCoreApplication([]) thread = AThread() thread.finished.connect(app.exit) thread.start() sys.exit(app.exec_()) def using_move_to_thread(): app = QCoreApplication([]) objThread = QThread() obj = SomeObject() obj.moveToThread(objThread) obj.finished.connect(objThread.quit) objThread.started.connect(obj.long_running) objThread.finished.connect(app.exit) objThread.start() sys.exit(app.exec_()) def using_q_runnable(): app = QCoreApplication([]) runnable = Runnable() QThreadPool.globalInstance().start(runnable) sys.exit(app.exec_()) if __name__ == "__main__": #using_q_thread() #using_move_to_thread() using_q_runnable() 

从马特非常好的例子,我修正了错字,现在pyqt4.8也很常见,所以我删除了虚拟类,并为dataReady信号添加了一个示例

 # -*- coding: utf-8 -*- import sys from PyQt4 import QtCore, QtGui from PyQt4.QtCore import Qt # very testable class (hint: you can use mock.Mock for the signals) class Worker(QtCore.QObject): finished = QtCore.pyqtSignal() dataReady = QtCore.pyqtSignal(list, dict) @QtCore.pyqtSlot() def processA(self): print "Worker.processA()" self.finished.emit() @QtCore.pyqtSlot(str, list, list) def processB(self, foo, bar=None, baz=None): print "Worker.processB()" for thing in bar: # lots of processing... self.dataReady.emit(['dummy', 'data'], {'dummy': ['data']}) self.finished.emit() def onDataReady(aList, aDict): print 'onDataReady' print repr(aList) print repr(aDict) app = QtGui.QApplication(sys.argv) thread = QtCore.QThread() # no parent! obj = Worker() # no parent! obj.dataReady.connect(onDataReady) obj.moveToThread(thread) # if you want the thread to stop after the worker is done # you can always call thread.start() again later obj.finished.connect(thread.quit) # one way to do it is to start processing as soon as the thread starts # this is okay in some cases... but makes it harder to send data to # the worker object from the main gui thread. As you can see I'm calling # processA() which takes no arguments thread.started.connect(obj.processA) thread.finished.connect(app.exit) thread.start() # another way to do it, which is a bit fancier, allows you to talk back and # forth with the object in a thread safe way by communicating through signals # and slots (now that the thread is running I can start calling methods on # the worker object) QtCore.QMetaObject.invokeMethod(obj, 'processB', Qt.QueuedConnection, QtCore.Q_ARG(str, "Hello World!"), QtCore.Q_ARG(list, ["args", 0, 1]), QtCore.Q_ARG(list, [])) # that looks a bit scary, but its a totally ok thing to do in Qt, # we're simply using the system that Signals and Slots are built on top of, # the QMetaObject, to make it act like we safely emitted a signal for # the worker thread to pick up when its event loop resumes (so if its doing # a bunch of work you can call this method 10 times and it will just queue # up the calls. Note: PyQt > 4.6 will not allow you to pass in a None # instead of an empty list, it has stricter type checking app.exec_() 

把这个答案更新为PyQt5,python3.4

将其作为一种模式来启动一个不需要数据并返回数据的工作人员,因为他们可用于表单。

1 – 工人类更小,放在自己的文件worker.py中,便于记忆和独立的软件重用。

2 – main.py文件是定义GUI Form类的文件

3 – 线程对象不是子类。

4 – 线程对象和工作对象都属于Form对象

5 – 程序的步骤在评论中。

  # worker.py from PyQt5.QtCore import QThread, QObject, pyqtSignal, pyqtSlot import time class Worker(QObject): finished = pyqtSignal() intReady = pyqtSignal(int) @pyqtSlot() def procCounter(self): # A slot takes no params for i in range(1, 100): time.sleep(1) self.intReady.emit(i) self.finished.emit() 

主要文件是:

  #main.py from PyQt5.QtCore import QThread from PyQt5.QtWidgets import QApplication, QLabel, QWidget, QGridLayout import sys import worker class Form(QWidget): def __init__(self): super().__init__() self.label = QLabel("0") # 1 - create Worker and Thread inside the Form self.obj = worker.Worker() # no parent! self.thread = QThread() # no parent! # 2 - Connect Worker`s Signals to Form method slots to post data. self.obj.intReady.connect(self.onIntReady) # 3 - Move the Worker object to the Thread object self.obj.moveToThread(self.thread) # 4 - Connect Worker Signals to the Thread slots self.obj.finished.connect(self.thread.quit) # 5 - Connect Thread started signal to Worker operational slot method self.thread.started.connect(self.obj.procCounter) # * - Thread finished signal will close the app if you want! #self.thread.finished.connect(app.exit) # 6 - Start the thread self.thread.start() # 7 - Start the form self.initUI() def initUI(self): grid = QGridLayout() self.setLayout(grid) grid.addWidget(self.label,0,0) self.move(300, 150) self.setWindowTitle('thread test') self.show() def onIntReady(self, i): self.label.setText("{}".format(i)) #print(i) app = QApplication(sys.argv) form = Form() sys.exit(app.exec_()) 

根据Qt的开发者,QThread的子类是不正确的(参见http://blog.qt.digia.com/blog/2010/06/17/youre-doing-it-wrong/ )。 但那篇文章真的很难理解(加上标题有点居高临下)。 我发现了一个更好的博客文章,更详细地解释了为什么你应该使用另一种线程风格: http : //mayaposch.wordpress.com/2011/11/01/how-to-really-truly-use- qthreads最全的解释/

在我看来,你可能永远不应该为了重载run方法的意图而inheritance线程。 虽然这确实有效,但你基本上绕过了Qt要你如何工作。 另外你会错过事件和正确的线程安全信号和插槽。 另外,你可能会在上面的博客文章中看到,“正确”的线程方式迫使你编写更多可testing的代码。

这里有几个如何利用PyQt中的QThreads的例子(我发布了一个单独的答案,正确使用QRunnable和合并信号/插槽,如果你有很多asynchronous任务,你需要负载平衡,答案会更好) 。

 import sys from PyQt4 import QtCore from PyQt4 import QtGui from PyQt4.QtCore import Qt # very testable class (hint: you can use mock.Mock for the signals) class Worker(QtCore.QObject): finished = QtCore.pyqtSignal() dataReady = QtCore.pyqtSignal(list, dict) @QtCore.pyqtSlot() def processA(self): print "Worker.processA()" self.finished.emit() @QtCore.pyqtSlot(str, list, list) def processB(self, foo, bar=None, baz=None): print "Worker.processB()" for thing in bar: # lots of processing... self.dataReady.emit(['dummy', 'data'], {'dummy': ['data']}) self.finished.emit() class Thread(QtCore.QThread): """Need for PyQt4 <= 4.6 only""" def __init__(self, parent=None): QtCore.QThread.__init__(self, parent) # this class is solely needed for these two methods, there # appears to be a bug in PyQt 4.6 that requires you to # explicitly call run and start from the subclass in order # to get the thread to actually start an event loop def start(self): QtCore.QThread.start(self) def run(self): QtCore.QThread.run(self) app = QtGui.QApplication(sys.argv) thread = Thread() # no parent! obj = Worker() # no parent! obj.moveToThread(thread) # if you want the thread to stop after the worker is done # you can always call thread.start() again later obj.finished.connect(thread.quit) # one way to do it is to start processing as soon as the thread starts # this is okay in some cases... but makes it harder to send data to # the worker object from the main gui thread. As you can see I'm calling # processA() which takes no arguments thread.started.connect(obj.processA) thread.start() # another way to do it, which is a bit fancier, allows you to talk back and # forth with the object in a thread safe way by communicating through signals # and slots (now that the thread is running I can start calling methods on # the worker object) QtCore.QMetaObject.invokeMethod(obj, 'processB', Qt.QueuedConnection, QtCore.Q_ARG(str, "Hello World!"), QtCore.Q_ARG(list, ["args", 0, 1]), QtCore.Q_ARG(list, [])) # that looks a bit scary, but its a totally ok thing to do in Qt, # we're simply using the system that Signals and Slots are built on top of, # the QMetaObject, to make it act like we safely emitted a signal for # the worker thread to pick up when its event loop resumes (so if its doing # a bunch of work you can call this method 10 times and it will just queue # up the calls. Note: PyQt > 4.6 will not allow you to pass in a None # instead of an empty list, it has stricter type checking app.exec_() # Without this you may get weird QThread messages in the shell on exit app.deleteLater() 

根据其他答案中提到的Worker对象方法,我决定看看是否可以扩展解决scheme以调用更多的线程 – 在这种情况下,机器可以运行的最佳数量,并以不确定的完成时间旋转多个工人。 要做到这一点,我仍然需要inheritanceQThread – 但只分配一个线程号,并'重新'信号'完成'和'开始'包括他们的线程号。

我主要关注了主要的主线,线程和工人之间的信号。

同样,其他答案一直是一个痛苦的指出,没有亲子QThread,但我不认为这是一个真正的关注。 不过,我的代码也很小心摧毁了QThread对象。

但是,我无法将工作对象作为父对象,因此在线程函数完成或GUI被销毁时,向它们发送deleteLater()信号似乎是可取的。 我有我自己的代码挂不执行此操作。

我觉得有必要的另一个增强是重新实现GUI(QWidget)的closeEvent,这样线程将被指示退出,然后GUI将等待,直到所有线程完成。 当我玩这个问题的其他答案时,我得到了QThread破坏错误。

也许这会对别人有用。 我当然认为这是一个有用的练习。 也许别人会知道一个更好的方式来让线程宣布它的身份。

 #!/usr/bin/env python3 #coding:utf-8 # Author: --<> # Purpose: To demonstrate creation of multiple threads and identify the receipt of thread results # Created: 19/12/15 import sys from PyQt4.QtCore import QThread, pyqtSlot, pyqtSignal from PyQt4.QtGui import QApplication, QLabel, QWidget, QGridLayout import sys import worker class Thread(QThread): #make new signals to be able to return an id for the thread startedx = pyqtSignal(int) finishedx = pyqtSignal(int) def __init__(self,i,parent=None): super().__init__(parent) self.idd = i self.started.connect(self.starttt) self.finished.connect(self.finisheddd) @pyqtSlot() def starttt(self): print('started signal from thread emitted') self.startedx.emit(self.idd) @pyqtSlot() def finisheddd(self): print('finished signal from thread emitted') self.finishedx.emit(self.idd) class Form(QWidget): def __init__(self): super().__init__() self.initUI() self.worker={} self.threadx={} self.i=0 i=0 #Establish the maximum number of threads the machine can optimally handle #Generally relates to the number of processors self.threadtest = QThread(self) self.idealthreadcount = self.threadtest.idealThreadCount() print("This machine can handle {} threads optimally".format(self.idealthreadcount)) while i <self.idealthreadcount: self.setupThread(i) i+=1 i=0 while i<self.idealthreadcount: self.startThread(i) i+=1 print("Main Gui running in thread {}.".format(self.thread())) def setupThread(self,i): self.worker[i]= worker.Worker(i) # no parent! #print("Worker object runningt in thread {} prior to movetothread".format(self.worker[i].thread()) ) self.threadx[i] = Thread(i,parent=self) # if parent isn't specified then need to be careful to destroy thread self.threadx[i].setObjectName("python thread{}"+str(i)) #print("Thread object runningt in thread {} prior to movetothread".format(self.threadx[i].thread()) ) self.threadx[i].startedx.connect(self.threadStarted) self.threadx[i].finishedx.connect(self.threadFinished) self.worker[i].finished.connect(self.workerFinished) self.worker[i].intReady.connect(self.workerResultReady) #The next line is optional, you may want to start the threads again without having to create all the code again. self.worker[i].finished.connect(self.threadx[i].quit) self.threadx[i].started.connect(self.worker[i].procCounter) self.destroyed.connect(self.threadx[i].deleteLater) self.destroyed.connect(self.worker[i].deleteLater) #This is the key code that actually get the worker code onto another processor or thread. self.worker[i].moveToThread(self.threadx[i]) def startThread(self,i): self.threadx[i].start() @pyqtSlot(int) def threadStarted(self,i): print('Thread {} started'.format(i)) print("Thread priority is {}".format(self.threadx[i].priority())) @pyqtSlot(int) def threadFinished(self,i): print('Thread {} finished'.format(i)) @pyqtSlot(int) def threadTerminated(self,i): print("Thread {} terminated".format(i)) @pyqtSlot(int,int) def workerResultReady(self,j,i): print('Worker {} result returned'.format(i)) if i ==0: self.label1.setText("{}".format(j)) if i ==1: self.label2.setText("{}".format(j)) if i ==2: self.label3.setText("{}".format(j)) if i ==3: self.label4.setText("{}".format(j)) #print('Thread {} has started'.format(self.threadx[i].currentThreadId())) @pyqtSlot(int) def workerFinished(self,i): print('Worker {} finished'.format(i)) def initUI(self): self.label1 = QLabel("0") self.label2= QLabel("0") self.label3= QLabel("0") self.label4 = QLabel("0") grid = QGridLayout(self) self.setLayout(grid) grid.addWidget(self.label1,0,0) grid.addWidget(self.label2,0,1) grid.addWidget(self.label3,0,2) grid.addWidget(self.label4,0,3) #Layout parents the self.labels self.move(300, 150) self.setGeometry(0,0,300,300) #self.size(300,300) self.setWindowTitle('thread test') self.show() def closeEvent(self, event): print('Closing') #this tells the threads to stop running i=0 while i <self.idealthreadcount: self.threadx[i].quit() i+=1 #this ensures window cannot be closed until the threads have finished. i=0 while i <self.idealthreadcount: self.threadx[i].wait() i+=1 event.accept() if __name__=='__main__': app = QApplication(sys.argv) form = Form() sys.exit(app.exec_()) 

下面的工人代码

 #!/usr/bin/env python3 #coding:utf-8 # Author: --<> # Purpose: Stack Overflow # Created: 19/12/15 import sys import unittest from PyQt4.QtCore import QThread, QObject, pyqtSignal, pyqtSlot import time import random class Worker(QObject): finished = pyqtSignal(int) intReady = pyqtSignal(int,int) def __init__(self, i=0): '''__init__ is called while the worker is still in the Gui thread. Do not put slow or CPU intensive code in the __init__ method''' super().__init__() self.idd = i @pyqtSlot() def procCounter(self): # This slot takes no params for j in range(1, 10): random_time = random.weibullvariate(1,2) time.sleep(random_time) self.intReady.emit(j,self.idd) print('Worker {0} in thread {1}'.format(self.idd, self.thread().idd)) self.finished.emit(self.idd) if __name__=='__main__': unittest.main() 

在PyQt中有很多select获取asynchronous行为。 对于需要事件处理的事情(即QtNetwork等),你应该使用我在这个线程的其他答案中提供的QThread例子。 但是对于绝大多数的线程需求,我认为这个解决scheme远胜于其他方法。

这样做的好处是QThreadPool将您的QRunnable实例安排为任务。 这与英特尔TBB中使用的任务模式类似。 它并不像我喜欢的那么优雅,但是确实能够实现出色的asynchronous行为。

这允许您通过QRunnable利用Python中的大部分线程能力,并仍然利用信号和插槽。 我在几个应用程序中使用这个相同的代码,一些做了几百个asynchronousREST调用,一些打开文件或列表目录,最好的部分是使用这个方法,Qt任务为我平衡了系统资源。

 import time from PyQt4 import QtCore from PyQt4 import QtGui from PyQt4.QtCore import Qt def async(method, args, uid, readycb, errorcb=None): """ Asynchronously runs a task :param func method: the method to run in a thread :param object uid: a unique identifier for this task (used for verification) :param slot updatecb: the callback when data is receieved cb(uid, data) :param slot errorcb: the callback when there is an error cb(uid, errmsg) The uid option is useful when the calling code makes multiple async calls and the callbacks need some context about what was sent to the async method. For example, if you use this method to thread a long running database call and the user decides they want to cancel it and start a different one, the first one may complete before you have a chance to cancel the task. In that case, the "readycb" will be called with the cancelled task's data. The uid can be used to differentiate those two calls (ie. using the sql query). :returns: Request instance """ request = Request(method, args, uid, readycb, errorcb) QtCore.QThreadPool.globalInstance().start(request) return request class Request(QtCore.QRunnable): """ A Qt object that represents an asynchronous task :param func method: the method to call :param list args: list of arguments to pass to method :param object uid: a unique identifier (used for verification) :param slot readycb: the callback used when data is receieved :param slot errorcb: the callback used when there is an error The uid param is sent to your error and update callbacks as the first argument. It's there to verify the data you're returning After created it should be used by invoking: .. code-block:: python task = Request(...) QtCore.QThreadPool.globalInstance().start(task) """ INSTANCES = [] FINISHED = [] def __init__(self, method, args, uid, readycb, errorcb=None): super(Request, self).__init__() self.setAutoDelete(True) self.cancelled = False self.method = method self.args = args self.uid = uid self.dataReady = readycb self.dataError = errorcb Request.INSTANCES.append(self) # release all of the finished tasks Request.FINISHED = [] def run(self): """ Method automatically called by Qt when the runnable is ready to run. This will run in a separate thread. """ # this allows us to "cancel" queued tasks if needed, should be done # on shutdown to prevent the app from hanging if self.cancelled: self.cleanup() return # runs in a separate thread, for proper async signal/slot behavior # the object that emits the signals must be created in this thread. # Its not possible to run grabber.moveToThread(QThread.currentThread()) # so to get this QObject to properly exhibit asynchronous # signal and slot behavior it needs to live in the thread that # we're running in, creating the object from within this thread # is an easy way to do that. grabber = Requester() grabber.Loaded.connect(self.dataReady, Qt.QueuedConnection) if self.dataError is not None: grabber.Error.connect(self.dataError, Qt.QueuedConnection) try: result = self.method(*self.args) if self.cancelled: # cleanup happens in 'finally' statement return grabber.Loaded.emit(self.uid, result) except Exception as error: if self.cancelled: # cleanup happens in 'finally' statement return grabber.Error.emit(self.uid, unicode(error)) finally: # this will run even if one of the above return statements # is executed inside of the try/except statement see: # https://docs.python.org/2.7/tutorial/errors.html#defining-clean-up-actions self.cleanup(grabber) def cleanup(self, grabber=None): # remove references to any object or method for proper ref counting self.method = None self.args = None self.uid = None self.dataReady = None self.dataError = None if grabber is not None: grabber.deleteLater() # make sure this python obj gets cleaned up self.remove() def remove(self): try: Request.INSTANCES.remove(self) # when the next request is created, it will clean this one up # this will help us avoid this object being cleaned up # when it's still being used Request.FINISHED.append(self) except ValueError: # there might be a race condition on shutdown, when shutdown() # is called while the thread is still running and the instance # has already been removed from the list return @staticmethod def shutdown(): for inst in Request.INSTANCES: inst.cancelled = True Request.INSTANCES = [] Request.FINISHED = [] class Requester(QtCore.QObject): """ A simple object designed to be used in a separate thread to allow for asynchronous data fetching """ # # Signals # Error = QtCore.pyqtSignal(object, unicode) """ Emitted if the fetch fails for any reason :param unicode uid: an id to identify this request :param unicode error: the error message """ Loaded = QtCore.pyqtSignal(object, object) """ Emitted whenever data comes back successfully :param unicode uid: an id to identify this request :param list data: the json list returned from the GET """ NetworkConnectionError = QtCore.pyqtSignal(unicode) """ Emitted when the task fails due to a network connection error :param unicode message: network connection error message """ def __init__(self, parent=None): super(Requester, self).__init__(parent) class ExampleObject(QtCore.QObject): def __init__(self, parent=None): super(ExampleObject, self).__init__(parent) self.uid = 0 self.request = None def ready_callback(self, uid, result): if uid != self.uid: return print "Data ready from %s: %s" % (uid, result) def error_callback(self, uid, error): if uid != self.uid: return print "Data error from %s: %s" % (uid, error) def fetch(self): if self.request is not None: # cancel any pending requests self.request.cancelled = True self.request = None self.uid += 1 self.request = async(slow_method, ["arg1", "arg2"], self.uid, self.ready_callback, self.error_callback) def slow_method(arg1, arg2): print "Starting slow method" time.sleep(1) return arg1 + arg2 if __name__ == "__main__": import sys app = QtGui.QApplication(sys.argv) obj = ExampleObject() dialog = QtGui.QDialog() layout = QtGui.QVBoxLayout(dialog) button = QtGui.QPushButton("Generate", dialog) progress = QtGui.QProgressBar(dialog) progress.setRange(0, 0) layout.addWidget(button) layout.addWidget(progress) button.clicked.connect(obj.fetch) dialog.show() app.exec_() app.deleteLater() # avoids some QThread messages in the shell on exit # cancel all running tasks avoid QThread/QTimer error messages # on exit Request.shutdown() 

退出应用程序时,您需要确保取消所有的任务,否则应用程序将挂起,直到每个计划任务完成