将inputstream连接到输出stream
在java9中更新: https ://docs.oracle.com/javase/9/docs/api/java/io/InputStream.html#transferTo-java.io.OutputStream-
我看到了一些类似的,但并不完全是我需要的线程。
我有一个服务器,它将基本上从客户端,客户端A的input,并转发它,字节为字节,另一个客户端,客户端B.
我想将客户端A的inputstream与客户端B的输出stream连接起来。这可能吗? 有什么办法做到这一点?
而且,这些客户端正在发送对方有时间敏感的消息,所以缓冲是不行的。 我不想要一个500的缓冲区和一个客户端发送499字节,然后我的服务器暂缓转发500字节,因为它没有收到最后一个字节来填充缓冲区。
现在,我正在parsing每个消息来find它的长度,然后读取长度字节,然后转发它们。 我认为(和testing)这会比读一个字节和转发一个字节反复,因为这将是非常缓慢的。 我也不想使用缓冲区或计时器,因为我在上一段中提到的原因 – 我不希望消息等待很长时间才能通过,因为缓冲区未满。
什么是这样做的好方法?
仅仅因为你使用缓冲区并不意味着stream必须填充缓冲区。 换句话说,这应该是好的:
public static void copyStream(InputStream input, OutputStream output) throws IOException { byte[] buffer = new byte[1024]; // Adjust if you want int bytesRead; while ((bytesRead = input.read(buffer)) != -1) { output.write(buffer, 0, bytesRead); } }
这应该工作得很好 – 基本上read
调用将阻塞,直到有一些数据可用,但它不会等到它可用于填充缓冲区。 (我想可以,而且我相信FileInputStream
通常会填充缓冲区,但是连接到套接字的stream更有可能立即为您提供数据。)
我认为至less首先尝试这个简单的解决scheme是值得的。
如何使用
void feedInputToOutput(InputStream in, OutputStream out) { IOUtils.copy(in, out); }
并完成它?
从雅加达apache的公共I / O库已经被大量的项目使用,所以你可能已经在你的classpath中有jar。
为了完整性, 番石榴也有这方面的方便的工具
ByteStreams.copy(input, output);
您可以使用循环缓冲区:
码
// buffer all data in a circular buffer of infinite size CircularByteBuffer cbb = new CircularByteBuffer(CircularByteBuffer.INFINITE_SIZE); class1.putDataOnOutputStream(cbb.getOutputStream()); class2.processDataFromInputStream(cbb.getInputStream());
Maven依赖
<dependency> <groupId>org.ostermiller</groupId> <artifactId>utils</artifactId> <version>1.07.00</version> </dependency>
模式细节
asynchronous的方式来实现它。
void inputStreamToOutputStream(final InputStream inputStream, final OutputStream out) { Thread t = new Thread(new Runnable() { public void run() { try { int d; while ((d = inputStream.read()) != -1) { out.write(d); } } catch (IOException ex) { //TODO make a callback on exception. } } }); t.setDaemon(true); t.start(); }
JDK 9为此function添加了InputStream#transferTo(OutputStream out)
。
BUFFER_SIZE是读入卡盘的大小。应该大于1kb,小于10MB。
private static final int BUFFER_SIZE = 2 * 1024 * 1024; private void copy(InputStream input, OutputStream output) throws IOException { try { byte[] buffer = new byte[BUFFER_SIZE]; int bytesRead = input.read(buffer); while (bytesRead != -1) { output.write(buffer, 0, bytesRead); bytesRead = input.read(buffer); } //If needed, close streams. } finally { input.close(); output.close(); } }
这是一个干净和快速的Scala版本(没有stackoverflow):
import scala.annotation.tailrec import java.io._ implicit class InputStreamOps(in: InputStream) { def >(out: OutputStream): Unit = pipeTo(out) def pipeTo(out: OutputStream, bufferSize: Int = 1<<10): Unit = pipeTo(out, Array.ofDim[Byte](bufferSize)) @tailrec final def pipeTo(out: OutputStream, buffer: Array[Byte]): Unit = in.read(buffer) match { case n if n > 0 => out.write(buffer, 0, n) pipeTo(out, buffer) case _ => in.close() out.close() } }
这使得能够使用>
符号例如inputinputstream > outputstream
并且还传递定制缓冲器/尺寸。
使用org.apache.commons.io.IOUtils
InputStream inStream = new ... OutputStream outStream = new ... IOUtils.copy(inStream, outStream);
或copyLarge大小> 2GB
如果你进入了function,这是一个用Scala编写的函数,展示了如何使用val(而不是variables)将inputstream复制到输出stream。
def copyInputToOutputFunctional(inputStream: InputStream, outputStream: OutputStream,bufferSize: Int) { val buffer = new Array[Byte](bufferSize); def recurse() { val len = inputStream.read(buffer); if (len > 0) { outputStream.write(buffer.take(len)); recurse(); } } recurse(); }
请注意,这是不build议在一个可用内存很小的Java应用程序中使用,因为使用recursion函数,您可以很容易地得到一个堆栈溢出exception错误