在Java中有一个“阻塞直到条件成真”的function吗?
我正在为一个服务器编写一个监听线程,目前我正在使用:
while (true){ try { if (condition){ //do something condition=false; } sleep(1000); } catch (InterruptedException ex){ Logger.getLogger(server.class.getName()).log(Level.SEVERE, null, ex); } }
有了上面的代码,我遇到了运行函数吃掉所有cpu时间循环的问题。 睡眠function的作品,但它似乎是一个临时修复,而不是解决scheme。
是否有一些函数会阻塞,直到variables“条件”变成“真”? 或者是持续循环等待一个variables的值改变的标准方法?
像这样的投票肯定是最不喜欢的解决scheme。
我认为你有另一个线程会做一些事情,使条件成立。 有几种方法来同步线程。 在你的情况最简单的将是通过一个对象的通知:
主线程:
synchronized(syncObject) { try { // Calling wait() will block this thread until another thread // calls notify() on the object. syncObject.wait(); } catch (InterruptedException e) { // Happens if someone interrupts your thread. } }
其他线程:
// Do something // If the condition is true, do the following: synchronized(syncObject) { syncObject.notify(); }
syncObject本身可以是一个简单的Object
。
还有很多其他的线程间通信方式,但是使用哪一种方式取决于你在做什么。
EboMike的回答和Toby的回答都是正确的,但都包含一个致命的缺陷。 该漏洞被称为丢失通知 。
问题是,如果一个线程调用foo.notify()
,除非其他线程已经在foo.wait()
调用中hibernate,否则它将不会执行任何操作。 对象foo
不记得它被通知了。
有一个原因,除非线程在foo上同步,否则不允许调用foo.wait()
或foo.notify()
。 这是因为避免丢失通知的唯一方法就是用互斥体来保护条件。 完成之后,看起来像这样:
消费者线程:
try { synchronized(foo) { while(! conditionIsTrue()) { foo.wait(); } doSomethingThatRequiresConditionToBeTrue(); } } catch (InterruptedException e) { handleInterruption(); }
生产者线程:
synchronized(foo) { doSomethingThatMakesConditionTrue(); foo.notify(); }
更改条件的代码和检查条件的代码在同一个对象上全部同步,并且消费者线程在等待之前显式地testing条件。 当条件已经成立时,消费者没有办法错过通知,并最终永远停留在wait()
调用中。
还要注意wait()
是在一个循环中。 这是因为,在一般情况下,当消费者重新获得foo
锁并醒来时,其他一些线程可能再次使该条件失效。 即使在你的程序中这是不可能的,在某些操作系统中,甚至当foo.notify()
没有被调用时, foo.notify()
也可能返回。 这就是所谓的虚假唤醒 ,并且允许发生这种情况,因为它使等待/通知更容易在某些操作系统上实现。
与EboMike的答案类似,您可以使用类似于wait / notify / notifyAll的机制,但是可以使用Lock
。
例如,
public void doSomething() throws InterruptedException { lock.lock(); try { condition.await(); // releases lock and waits until doSomethingElse is called } finally { lock.unlock(); } } public void doSomethingElse() { lock.lock(); try { condition.signal(); } finally { lock.unlock(); } }
在那里,你会等待一些条件,这是由另一个线程通知(在这种情况下调用doSomethingElse
),在这一点上,第一个线程将继续…
在内部同步中使用Lock
有很多好处,但是我只是希望有一个明确的Condition
对象来表示这个条件(你可以有多于一个对于生产者 – 消费者来说是一个很好的接触)。
另外,我不禁会注意到,在你的例子中你如何处理被中断的exception。 你可能不应该像这样消耗这个exception,而是使用Thread.currentThrad().interrupt
来重置中断状态标志。
这是因为如果抛出exception,中断状态标志将被重置(它的意思是“ 我不再记得被中断,我不能告诉其他人,如果他们问的话,我一直在 )”,另一个过程可能依赖于这个问题。 这个例子就是其他的东西在这个基础上实现了一个中断策略… phew。 另一个例子可能是你是中断策略,而while(true)
可能已经实现为while(!Thread.currentThread().isInterrupted()
(这也将使你的代码更加…社会上体贴) 。
因此,总而言之,当你想使用Lock
时,使用Condition
是等同于使用wait / notify / notifyAll的,日志是邪恶的并且吞咽InterruptedException
是调皮的;)
你可以使用信号量 。
当条件不满足时,另一个线程获取信号量。
你的线程将试图通过acquireUninterruptibly()
获取它
或者tryAcquire(int permits, long timeout, TimeUnit unit)
并被阻塞。
当条件满足时,信号量也被释放,你的线程将获得它。
您也可以尝试使用SynchronousQueue或CountDownLatch 。
没有人用CountDownLatch发布解决scheme。 关于什么:
public class Lockeable{ private final CountDownLatch countDownLatch = new CountDownLatch(1); public void doAfterEvent(){ countDownLatch.await(); doSomething(); } public void reportDetonatingEvent(){ countDownLatch.countDown(); } }
无锁解决scheme(?)
我有同样的问题,但我想要一个解决scheme,不使用锁。
问题:我最多只有一个线程从队列中消耗。 多个生产者线程不断插入队列中,需要通知消费者是否在等待。 队列是无锁的,所以使用锁通知会导致生产者线程中不必要的阻塞。 每个生产者线程在通知等待的消费者之前需要获得该锁。 我相信我想出了一个使用LockSupport
和AtomicReferenceFieldUpdater
的无锁解决scheme。 如果JDK中存在无锁屏障,我找不到它。 CyclicBarrier
和CoundDownLatch
使用我能find的内部锁。
这是我稍微缩写的代码。 只是要清楚,这个代码将只允许一个线程一次等待。 它可以被修改,以允许多个awaiters /消费者通过使用某种types的primefaces集合来存储多个所有者(一个ConcurrentMap
可能工作)。
我已经使用这个代码,它似乎工作。 我没有广泛的testing。 我build议你在使用之前阅读LockSupport
的文档。
/* I release this code into the public domain. * http://unlicense.org/UNLICENSE */ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.locks.LockSupport; /** * A simple barrier for awaiting a signal. * Only one thread at a time may await the signal. */ public class SignalBarrier { /** * The Thread that is currently awaiting the signal. * !!! Don't call this directly !!! */ @SuppressWarnings("unused") private volatile Thread _owner; /** Used to update the owner atomically */ private static final AtomicReferenceFieldUpdater<SignalBarrier, Thread> ownerAccess = AtomicReferenceFieldUpdater.newUpdater(SignalBarrier.class, Thread.class, "_owner"); /** Create a new SignalBarrier without an owner. */ public SignalBarrier() { _owner = null; } /** * Signal the owner that the barrier is ready. * This has no effect if the SignalBarrer is unowned. */ public void signal() { // Remove the current owner of this barrier. Thread t = ownerAccess.getAndSet(this, null); // If the owner wasn't null, unpark it. if (t != null) { LockSupport.unpark(t); } } /** * Claim the SignalBarrier and block until signaled. * * @throws IllegalStateException If the SignalBarrier already has an owner. * @throws InterruptedException If the thread is interrupted while waiting. */ public void await() throws InterruptedException { // Get the thread that would like to await the signal. Thread t = Thread.currentThread(); // If a thread is attempting to await, the current owner should be null. if (!ownerAccess.compareAndSet(this, null, t)) { throw new IllegalStateException("A second thread tried to acquire a signal barrier that is already owned."); } // The current thread has taken ownership of this barrier. // Park the current thread until the signal. Record this // signal barrier as the 'blocker'. LockSupport.park(this); // If a thread has called #signal() the owner should already be null. // However the documentation for LockSupport.unpark makes it clear that // threads can wake up for absolutely no reason. Do a compare and set // to make sure we don't wipe out a new owner, keeping in mind that only // thread should be awaiting at any given moment! ownerAccess.compareAndSet(this, t, null); // Check to see if we've been unparked because of a thread interrupt. if (t.isInterrupted()) throw new InterruptedException(); } /** * Claim the SignalBarrier and block until signaled or the timeout expires. * * @throws IllegalStateException If the SignalBarrier already has an owner. * @throws InterruptedException If the thread is interrupted while waiting. * * @param timeout The timeout duration in nanoseconds. * @return The timeout minus the number of nanoseconds that passed while waiting. */ public long awaitNanos(long timeout) throws InterruptedException { if (timeout <= 0) return 0; // Get the thread that would like to await the signal. Thread t = Thread.currentThread(); // If a thread is attempting to await, the current owner should be null. if (!ownerAccess.compareAndSet(this, null, t)) { throw new IllegalStateException("A second thread tried to acquire a signal barrier is already owned."); } // The current thread owns this barrier. // Park the current thread until the signal. Record this // signal barrier as the 'blocker'. // Time the park. long start = System.nanoTime(); LockSupport.parkNanos(this, timeout); ownerAccess.compareAndSet(this, t, null); long stop = System.nanoTime(); // Check to see if we've been unparked because of a thread interrupt. if (t.isInterrupted()) throw new InterruptedException(); // Return the number of nanoseconds left in the timeout after what we // just waited. return Math.max(timeout - stop + start, 0L); } }
为了给出一个模糊的例子,我将采用james large的例子:
SignalBarrier barrier = new SignalBarrier();
消费者线程(单数, 不是复数! ):
try { while(!conditionIsTrue()) { barrier.await(); } doSomethingThatRequiresConditionToBeTrue(); } catch (InterruptedException e) { handleInterruption(); }
生产者线程:
doSomethingThatMakesConditionTrue(); barrier.signal();