为什么没有更多的Java代码使用PipedInputStream / PipedOutputStream?

最近我已经“发现”了这个成语,而且我想知道是否有什么东西丢失了。 我从来没有见过它使用。 几乎所有与“野外”合作的Java代码都倾向于将数据浸入string或缓冲区,而不是像本示例中那样(使用HttpClient和XML API):

final LSOutput output; // XML stuff initialized elsewhere final LSSerializer serializer; final Document doc; // ... PostMethod post; // HttpClient post request final PipedOutputStream source = new PipedOutputStream(); PipedInputStream sink = new PipedInputStream(source); // ... executor.execute(new Runnable() { public void run() { output.setByteStream(source); serializer.write(doc, output); try { source.close(); } catch (IOException e) { throw new RuntimeException(e); } }}); post.setRequestEntity(new InputStreamRequestEntity(sink)); int status = httpClient.executeMethod(post); 

该代码使用Unixpipe道风格的技术来防止保存在内存中的XML数据的多个副本。 它使用HTTP Post OutputStream和DOM Load / Save API将XML Document序列化为HTTP请求的内容。 据我可以告诉它使用很less的额外代码(只是Runnable,PipedInputStream和PipedOutputStream的几行)最小化内存的使用。

那这个成语怎么了? 如果这个成语没有错,为什么我没看到呢?

编辑:澄清,PipedInputStream和PipedOutputStreamreplace逐个显示的样板缓冲区副本,并且它们还允许您同时处理传入的数据并写出处理的数据。 他们不使用OSpipe道。

从Javadocs :

通常,数据是通过一个线程从PipedInputStream对象中读取的,而数据则通过其他线程写入相应的PipedOutputStream。 不build议尝试从单个线程使用这两个对象,因为它可能会使线程死锁。

这可能部分解释了为什么它不是更常用。

我假设另一个原因是许多开发人员不明白其目的/好处。

在你的例子中,你正在创build两个线程来完成可以完成的工作。 并将I / O延迟引入混合中。

你有更好的例子吗? 或者我只是回答你的问题。


把一些评论(至less是我对他们的看法)引入主要的回应:

  • 并发将复杂性引入到应用程序中。 现在,您不必处理单个线性数据stream,而必须关注独立数据stream的sorting。 在某些情况下,增加的复杂性可能是合理的,特别是如果您可以利用多个内核/ CPU来执行CPU密集型工作。
  • 如果您处于可以从并发操作中受益的情况下,通常会有更好的方法来协调线程之间的数据stream。 例如,使用并发队列在线程之间传递对象,而不是将pipe道stream包装在对象stream中。
  • 如果一个pipe道stream可能是一个好的解决scheme,那么当你有多个线程执行文本处理的时候,一个Unixpipe道(例如:grep | sort)。

在具体的例子中,pipe道stream允许使用由HttpClient提供的现有的RequestEntity实现类。 我认为更好的解决scheme是创build一个新的实现类,如下所示,因为该示例最终是一个顺序操作,不能从并发实现的复杂性和开销中受益。 虽然我将RequestEntity显示为匿名类,但可重用性表明它应该是一stream的类。

 post.setRequestEntity(new RequestEntity() { public long getContentLength() { return 0-1; } public String getContentType() { return "text/xml"; } public boolean isRepeatable() { return false; } public void writeRequest(OutputStream out) throws IOException { output.setByteStream(out); serializer.write(doc, output); } }); 

我也只是最近才发现了PipedInputStream / PipedOutputStream类。

我正在开发一个Eclipse插件,需要通过SSH在远程服务器上执行命令。 我正在使用JSch和Channel API从inputstream读取并写入输出stream。 但是我需要通过inputstream提供命令,并从输出stream中读取响应。 那就是PipedInput / OutputStream进来的地方。

 import java.io.PipedInputStream; import java.io.PipedOutputStream; import com.jcraft.jsch.Channel; Channel channel; PipedInputStream channelInputStream = new PipedInputStream(); PipedOutputStream channelOutputStream = new PipedOutputStream(); channel.setInputStream(new PipedInputStream(this.channelOutputStream)); channel.setOutputStream(new PipedOutputStream(this.channelInputStream)); channel.connect(); // Write to channelInputStream // Read from channelInputStream channel.disconnect(); 

此外,回到原来的例子:不,它并不完全最小化内存使用。 DOM树已经build成,内存中的缓冲已经完成 – 虽然这比完整的字节数组副本要好,但并不是那么好。 但是在这种情况下缓冲会比较慢; 并且还创build了一个额外的线程 – 您不能在单个线程内使用PipedInput / OutputStream对。

有时PipedXxxStreams是有用的,但他们没有使用更多的原因是因为它们往往不是正确的解决scheme。 他们可以进行线程间的通信,这就是我用它们来实现这个价值的地方。 只是没有这么多的用例,因为SOA如何将大多数这样的边界推到服务之间,而不是在线程之间。

我尝试了一些使用这些类的东西,我忘记了细节。 但是我发现他们的实现是致命的缺陷。 我不记得是什么,但我有一个偷偷摸摸的记忆,这可能是一个竞争条件,这意味着他们偶尔会僵持(当然,我是在单独的线程中使用它们:它们根本不可用于单线程,并没有被devise为)。

我可能会看看他们的源代码,看看我能否看到问题可能是什么。

这里有一个用例,pipe道是有道理的:

假设你有一个第三方库,比如一个xslt映射器或者encryption库,它有这样一个接口:doSomething(inputStream,outputStream)。 在通过线路发送之前,您不想缓冲结果。 Apache和其他客户端不允许直接访问wire输出stream。 最接近的就是获取输出stream – 在写入标题之后的偏移量处,在请求实体对象中。 但是由于这是隐藏的,将inputstream和输出stream传递给第三方库还是不够的。 pipe道是解决这个问题的好办法。

顺便提一下,我使用Apache Commons HTTP Client 4.3.4编写了一个Apache的HTTP客户端API [PipedApacheClientOutputStream] ,它为HTTP POST提供了一个OutputStream接口。 这是一个Piped Streams可能有意义的例子。

那这个成语怎么了? 如果这个成语没有错,为什么我没看到呢?

编辑:澄清,PipedInputStream和PipedOutputStreamreplace逐个显示的样板缓冲区副本,并且它们还允许您同时处理传入的数据并写出处理的数据。 他们不使用OSpipe道。

你已经说明了它的作用,但没有说明你为什么这样做。

如果你相信这会减less使用的资源(CPU /内存)或提高性能,那么它也不会。 但是它会使你的代码更复杂。

基本上你有没有解决它的问题的解决scheme。

java.iopipe道有太多的上下文切换(每个字节读/写)和他们的java.nio对应需要你有一些NIO背景和正确的渠道和东西的使用,这是我自己的pipe道使用阻塞队列的实现一个单一的生产者/消费者将performance得很快,

 import java.io.IOException; import java.io.OutputStream; import java.util.concurrent.*; public class QueueOutputStream extends OutputStream { private static final int DEFAULT_BUFFER_SIZE=1024; private static final byte[] END_SIGNAL=new byte[]{}; private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>(); private final byte[] buffer; private boolean closed=false; private int count=0; public QueueOutputStream() { this(DEFAULT_BUFFER_SIZE); } public QueueOutputStream(final int bufferSize) { if(bufferSize<=0){ throw new IllegalArgumentException("Buffer size <= 0"); } this.buffer=new byte[bufferSize]; } private synchronized void flushBuffer() { if(count>0){ final byte[] copy=new byte[count]; System.arraycopy(buffer,0,copy,0,count); queue.offer(copy); count=0; } } @Override public synchronized void write(final int b) throws IOException { if(closed){ throw new IllegalStateException("Stream is closed"); } if(count>=buffer.length){ flushBuffer(); } buffer[count++]=(byte)b; } @Override public synchronized void write(final byte[] b, final int off, final int len) throws IOException { super.write(b,off,len); } @Override public synchronized void close() throws IOException { flushBuffer(); queue.offer(END_SIGNAL); closed=true; } public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream) { return executor.submit( new Callable<Void>() { @Override public Void call() throws Exception { try{ byte[] buffer=queue.take(); while(buffer!=END_SIGNAL){ outputStream.write(buffer); buffer=queue.take(); } outputStream.flush(); } catch(Exception e){ close(); throw e; } finally{ outputStream.close(); } return null; } } ); }