SwingWorker,done()在process()调用完成之前执行
我一直在与SwingWorker合作一段时间,结果出现了一个奇怪的行为,至less对我来说。 我清楚地明白,由于性能方面的原因,一次调用publish()方法的调用就被一次调用了。 这对我来说是完全有意义的,我怀疑SwingWorker会保留某种队列来处理所有的调用。
根据教程和API,当SwingWorker结束执行时, doInBackground()正常结束或工作线程从外部取消,然后调用done()方法。 到现在为止还挺好。
但是我有一个例子(类似于教程中显示的那样), 在 done()
方法执行之后 ,有process()
方法调用。 由于这两个方法都在Event Dispatch Thread中执行,我希望done()
在所有process()
调用完成后执行。 换一种说法:
预期:
Writing... Writing... Stopped!
结果:
Writing... Stopped! Writing...
示例代码
import java.awt.BorderLayout; import java.awt.Dimension; import java.awt.Graphics; import java.awt.event.ActionEvent; import java.util.List; import javax.swing.AbstractAction; import javax.swing.Action; import javax.swing.JButton; import javax.swing.JFrame; import javax.swing.JPanel; import javax.swing.JScrollPane; import javax.swing.JTextArea; import javax.swing.SwingUtilities; import javax.swing.SwingWorker; public class Demo { private SwingWorker<Void, String> worker; private JTextArea textArea; private Action startAction, stopAction; private void createAndShowGui() { startAction = new AbstractAction("Start writing") { @Override public void actionPerformed(ActionEvent e) { Demo.this.startWriting(); this.setEnabled(false); stopAction.setEnabled(true); } }; stopAction = new AbstractAction("Stop writing") { @Override public void actionPerformed(ActionEvent e) { Demo.this.stopWriting(); this.setEnabled(false); startAction.setEnabled(true); } }; JPanel buttonsPanel = new JPanel(); buttonsPanel.add(new JButton(startAction)); buttonsPanel.add(new JButton(stopAction)); textArea = new JTextArea(30, 50); JScrollPane scrollPane = new JScrollPane(textArea); JFrame frame = new JFrame("Test"); frame.setDefaultCloseOperation(JFrame.DISPOSE_ON_CLOSE); frame.add(scrollPane); frame.add(buttonsPanel, BorderLayout.SOUTH); frame.pack(); frame.setLocationRelativeTo(null); frame.setVisible(true); } private void startWriting() { stopWriting(); worker = new SwingWorker<Void, String>() { @Override protected Void doInBackground() throws Exception { while(!isCancelled()) { publish("Writing...\n"); } return null; } @Override protected void process(List<String> chunks) { String string = chunks.get(chunks.size() - 1); textArea.append(string); } @Override protected void done() { textArea.append("Stopped!\n"); } }; worker.execute(); } private void stopWriting() { if(worker != null && !worker.isCancelled()) { worker.cancel(true); } } public static void main(String[] args) { SwingUtilities.invokeLater(new Runnable() { @Override public void run() { new Demo().createAndShowGui(); } }); } }
简答题:
发生这种情况是因为publish()不直接调度process
,它设置了一个计时器,它将在DELAY
之后启动EDT中的进程()块的调度。 所以当这个worker被取消的时候,还有一个计时器等待着用最后一个发布的数据来调度一个process()。 使用定时器的原因是为了实现可以用多个发布的组合数据执行单个进程的优化。
长答案:
让我们看看publish()和cancel是如何相互作用的,为此,让我们深入一些源代码。
首先容易的部分, cancel(true)
:
public final boolean cancel(boolean mayInterruptIfRunning) { return future.cancel(mayInterruptIfRunning); }
这个取消结束了调用下面的代码:
boolean innerCancel(boolean mayInterruptIfRunning) { for (;;) { int s = getState(); if (ranOrCancelled(s)) return false; if (compareAndSetState(s, CANCELLED)) // <----- break; } if (mayInterruptIfRunning) { Thread r = runner; if (r != null) r.interrupt(); // <----- } releaseShared(0); done(); // <----- return true; }
SwingWorker状态设置为CANCELLED
,线程被中断,并调用done()
,但是这不是SwingWorker的完成,而是future
done(),它是在SwingWorker构造函数中实例化时指定的:
future = new FutureTask<T>(callable) { @Override protected void done() { doneEDT(); // <----- setState(StateValue.DONE); } };
而doneEDT()
代码是:
private void doneEDT() { Runnable doDone = new Runnable() { public void run() { done(); // <----- } }; if (SwingUtilities.isEventDispatchThread()) { doDone.run(); // <----- } else { doSubmit.add(doDone); } }
如果我们在美国东部时间,我们就直接调用SwingWorkers done()
。 此时SwingWorker应该停止,不应该再调用publish()
,这很容易通过以下修改来演示:
while(!isCancelled()) { textArea.append("Calling publish\n"); publish("Writing...\n"); }
但是,我们仍然从process()中获得“Writing …”消息。 所以让我们看看process()是如何调用的。 publish(...)
的源代码是
protected final void publish(V... chunks) { synchronized (this) { if (doProcess == null) { doProcess = new AccumulativeRunnable<V>() { @Override public void run(List<V> args) { process(args); // <----- } @Override protected void submit() { doSubmit.add(this); // <----- } }; } } doProcess.add(chunks); // <----- }
我们看到Runnable doProcess
的run()
是最终调用process(args)
,但是这个代码只是调用doProcess.add(chunks)
而不是doProcess.run()
并且还有一个doSubmit
。 我们来看看doProcess.add(chunks)
。
public final synchronized void add(T... args) { boolean isSubmitted = true; if (arguments == null) { isSubmitted = false; arguments = new ArrayList<T>(); } Collections.addAll(arguments, args); // <----- if (!isSubmitted) { //This is what will make that for multiple publishes only one process is executed submit(); // <----- } }
那么publish()
实际上是把块join到一些内部的ArrayList arguments
然后调用submit()
。 我们刚刚看到提交只是调用doSubmit.add(this)
,这是非常相同的add
方法,因为doProcess
和doSubmit
扩展AccumulativeRunnable<V>
,但是这次V
是Runnable
而不是doProcess
的String
。 所以块是调用process(args)
的可运行process(args)
。 但是, submit()
调用是doSubmit
类中定义的完全不同的方法:
private static class DoSubmitAccumulativeRunnable extends AccumulativeRunnable<Runnable> implements ActionListener { private final static int DELAY = (int) (1000 / 30); @Override protected void run(List<Runnable> args) { for (Runnable runnable : args) { runnable.run(); } } @Override protected void submit() { Timer timer = new Timer(DELAY, this); // <----- timer.setRepeats(false); timer.start(); } public void actionPerformed(ActionEvent event) { run(); // <----- } }
它会创build一个Timer,在DELAY
毫秒后触发一次actionPerformed
代码。 一旦事件被触发,代码将在EDT中排队,这将调用一个内部run()
,最终调用doProcess
run(flush())
,从而执行process(chunk)
,其中chunk是arguments
的刷新数据ArrayList的。 我跳过一些细节,“运行”调用的链是这样的:
- doSubmit.run()
- doSubmit.run(flush())//实际上是一个可运行的循环,但只会有一个(*)
- doProcess.run()
- doProcess.run(冲洗())
- 处理(块)
(*)布尔值isSubmited
和flush()
(重置此布尔值)使其更多的调用发布不添加doProcess runnables在doSubmit.run(flush())中调用,但是他们的数据不会被忽略。 从而在定时器的生命周期中调用任意数量的发布。
总而言之, publish("Writing...")
所做的是在 DELAY 之后调度在EDT中process(chunk)
的调用。 这就解释了为什么即使我们取消了线程并且没有更多的发布完成之后,仍然出现了一个进程执行,因为当我们取消了工作者的时候(很有可能)有一个计时器在done()
之后调度一个process()
调度。
为什么使用这个Timer而不是用invokeLater(doProcess)
调度EDT中的process( invokeLater(doProcess)
? 为了实现文档中解释的性能优化:
由于在“事件派发线程”上asynchronous调用stream程方法,所以在执行stream程方法之前,可能会对publish方法执行多个调用。 出于性能的目的,所有这些调用都被合并成一个具有连接参数的调用。 例如:
publish("1"); publish("2", "3"); publish("4", "5", "6"); might result in: process("1", "2", "3", "4", "5", "6")
我们现在知道这是有效的,因为在DELAY时间间隔内发生的所有发布都将它们的args
添加到我们看到的arguments
内部variables中,并且process(chunk)
将一起执行所有的数据。
这是一个错误吗? 解决方法吗?
很难说如果这是一个错误,那么处理后台线程发布的数据可能是有意义的,因为工作实际上已经完成,您可能有兴趣使用尽可能多的信息更新GUI (如果这就是process()
正在做的事情)。 然后,如果done()
需要处理所有数据和/或在done()创build数据/ GUI不一致之后调用process(),则可能没有意义。
如果不想在done()之后执行任何新的进程(),那么只需检查worker process
是否被取消,就有一个明显的解决方法。
@Override protected void process(List<String> chunks) { if (isCancelled()) return; String string = chunks.get(chunks.size() - 1); textArea.append(string); }
在最后一个进程()之后执行done()会比较棘手,例如完成时也可以使用一个计时器来计划在DELAY之后的实际完成工作。 虽然我不认为这会是一个常见的情况,因为如果你取消了,当我们知道我们实际上取消了所有未来的执行时,错过一个进程应该不是很重要。
读过DSquare的高超的答案,并从中得出结论说,需要一些子类化,我想出了这个想法,任何需要确保所有已发布的块已经在EDT中处理过的人,然后再继续。
NB我试图用Java而不是Jython来编写它(我的select语言和正式的世界上最好的语言),但它有点复杂,因为例如publish
是final
,所以你必须发明另一种方法调用它,也因为你必须(打哈欠)用Java中的generics参数化所有东西。
这个代码应该是任何Java人都可以理解的:只要帮助self.publication_counter.get()
,当结果为0时,它的计算结果为False
。
# this is how you say Worker... is a subclass of SwingWorker in Python/Jython class WorkerAbleToWaitForPublicationToFinish( javax.swing.SwingWorker ): # __init__ is the constructor method in Python/Jython def __init__( self ): # we just add an "attribute" (here, publication_counter) to the object being created (self) to create a field of the new object self.publication_counter = java.util.concurrent.atomic.AtomicInteger() def await_processing_of_all_chunks( self ): while self.publication_counter.get(): time.sleep( 0.001 ) # fully functional override of the Java method def process( self, chunks ): for chunk in chunks: pass # DO SOMETHING WITH EACH CHUNK # decrement the counter by the number of chunks received # NB do this AFTER dealing with the chunks self.publication_counter.addAndGet( - len( chunks ) ) # fully functional override of the Java method def publish( self, *chunks ): # increment the counter by the number of chunks received # NB do this BEFORE publishing the chunks self.publication_counter.addAndGet( len( chunks )) self.super__publish( chunks )
所以在你的调用代码中,你把这样的东西:
engine.update_xliff_task.get() engine.update_xliff_task.await_processing_of_all_chunks()
PS使用这样一个while
子句(即轮询技术)几乎没有优雅。 我查看了可用的java.util.concurrent
类,如CountDownLatch
和Phaser
(都使用线程阻塞方法),但是我不认为两者都适合这个目的。
后来
我有兴趣在这个调整适当的并发类(用Java编写,在Apache网站上find)称为CounterLatch
。 如果达到AtomicLong
计数器的值,它们的版本会在await()
处停止线程。 我在这里的版本可以让你做到这一点,或相反:说:“等到计数器达到一个特定的价值,然后解除闩锁”:
注意AtomicLong
用于signal
和AtomicBoolean
用于released
:因为在原始Java中它们使用volatile
关键字。 我认为使用primefaces类将达到相同的目的。
class CounterLatch(): def __init__( self, initial = 0, wait_value = 0, lift_on_reached = True ): self.count = java.util.concurrent.atomic.AtomicLong( initial ) self.signal = java.util.concurrent.atomic.AtomicLong( wait_value ) class Sync( java.util.concurrent.locks.AbstractQueuedSynchronizer ): def tryAcquireShared( sync_self, arg ): if lift_on_reached: return -1 if (( not self.released.get() ) and self.count.get() != self.signal.get() ) else 1 else: return -1 if (( not self.released.get() ) and self.count.get() == self.signal.get() ) else 1 def tryReleaseShared( self, args ): return True self.sync = Sync() self.released = java.util.concurrent.atomic.AtomicBoolean() # initialised at False def await( self, *args ): if args: assert len( args ) == 2 assert type( args[ 0 ] ) is int timeout = args[ 0 ] assert type( args[ 1 ] ) is java.util.concurrent.TimeUnit unit = args[ 1 ] return self.sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)) else: self.sync.acquireSharedInterruptibly( 1 ) def count_relative( self, n ): previous = self.count.addAndGet( n ) if previous == self.signal.get(): self.sync.releaseShared( 0 ) return previous
所以我现在的代码如下所示:
在SwingWorker构造函数中:
self.publication_counter_latch = CounterLatch()
在SW.publish:
self.publication_counter_latch.count_relative( len( chunks ) ) self.super__publish( chunks )
在等待块处理停止的线程中:
worker.publication_counter_latch.await()