Канал потока ввода и вывода в Java

Есть ли у кого-нибудь хорошие предложения для создания объекта Pipe в Java, который является одновременно InputStream и OutputStream, поскольку Java не имеет множественного наследования и оба потока являются абстрактными классами вместо интерфейсов?

Основная потребность заключается в том, чтобы иметь один объект, который можно передавать вещам, которым требуется либо InputStream, либо OutputStream для передачи вывода из одного потока во вход для другого.

13.12.2008 06:49:13
6 ОТВЕТОВ
РЕШЕНИЕ

Кажется, смысл этого вопроса упущен. Если я вас правильно понимаю, вам нужен объект, который работает как InputStream в одном потоке и OutputStream в другом, чтобы создать средство связи между двумя потоками.

Возможно, один из ответов - использовать композицию вместо наследования (что в любом случае рекомендуется). Создайте канал, который содержит PipedInputStream и PipedOutputStream, связанные друг с другом, с помощью методов getInputStream () и getOutputStream ().

Вы не можете напрямую передать объект Pipe чему-либо, нуждающемуся в потоке, но вы можете передать возвращаемое значение метода get, чтобы сделать это.

Это работает для вас?

8
13.12.2008 23:25:02
Как добавить PipedInputStream и PipedOutputStream в канал и подключить их?
Sabobin 22.05.2013 12:55:52

Я думаю, это довольно распространенная вещь. Смотрите этот вопрос.

Простой способ записать содержимое Java InputStream в OutputStream

3
23.05.2017 11:46:58
Я знаю о PipedXxxStream ... но я хотел создать только один объект Pipe, который мог бы быть передан как InputStream одному потоку и OutputStream другому. Я надеялся, что мог что-то пропустить.
Baginsss 13.12.2008 07:35:19
Не было бы очень сложно написать что-то, что может дать вам входной поток, который направляет к выходному потоку, но вы не сможете расширить и InputStream, и OutputStream. Наследование, плохо. Композиция хорошая.
Apocalisp 13.12.2008 08:14:43

java.io.PipedOutputStream и java.io.PipedInputStream выглядят как классы для использования в этом сценарии. Они предназначены для совместного использования для передачи данных между потоками.

Если вы действительно хотите, чтобы какой-то один объект проходил вокруг, он должен содержать один из них и выставлять их через геттеры.

5
13.12.2008 12:22:06

Вы не можете создать класс, который является производным от InputStreamи OutputStreamпотому, что они не являются интерфейсами и имеют общие методы, а Java не допускает множественное наследование (компилятор не знает, вызывать InputStream.close()или OutputStream.close()вызывать close()ваш новый объект) ,

Другая проблема - буфер. Java хочет выделить статический буфер для данных (который не изменяется). Это означает, что когда вы используете `java.io.PipedXxxStream ', запись данных в него в конечном итоге блокируется, если вы не используете два разных потока.

Таким образом, ответ от Apocalisp правильный: вы должны написать цикл копирования.

Я предлагаю вам включить Apache commons-io в ваш проект, который содержит множество вспомогательных процедур только для таких задач, как это (копирование данных между потоками, файлами, строками и всеми их комбинациями).

1
13.12.2008 09:23:02
Вы можете использовать внутренние классы для достижения чего-то похожего на один класс, открывающий поток ввода и вывода при совместном использовании буфера.
Charlie 8.10.2015 06:08:08

Мне пришлось реализовать фильтр для медленных соединений с сервлетами, поэтому в основном я обернул выходной поток сервлета в QueueOutputStream, который будет добавлять каждый байт (в небольших буферах) в очередь, а затем выводить эти небольшие буферы во 2-й поток вывода, поэтому в некотором смысле это действует как поток ввода / вывода, ИМХО это лучше, чем каналы JDK, которые не будут так хорошо масштабироваться, в основном в стандартной реализации JDK слишком много переключений контекста (для чтения / записи), очередь блокировки просто идеально подходит для сценария одного производителя / потребителя:

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.*;

public class QueueOutputStream extends OutputStream
{
  private static final int DEFAULT_BUFFER_SIZE=1024;
  private static final byte[] END_SIGNAL=new byte[]{};

  private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>();
  private final byte[] buffer;

  private boolean closed=false;
  private int count=0;

  public QueueOutputStream()
  {
    this(DEFAULT_BUFFER_SIZE);
  }

  public QueueOutputStream(final int bufferSize)
  {
    if(bufferSize<=0){
      throw new IllegalArgumentException("Buffer size <= 0");
    }
    this.buffer=new byte[bufferSize];
  }

  private synchronized void flushBuffer()
  {
    if(count>0){
      final byte[] copy=new byte[count];
      System.arraycopy(buffer,0,copy,0,count);
      queue.offer(copy);
      count=0;
    }
  }

  @Override
  public synchronized void write(final int b) throws IOException
  {
    if(closed){
      throw new IllegalStateException("Stream is closed");
    }
    if(count>=buffer.length){
      flushBuffer();
    }
    buffer[count++]=(byte)b;
  }

  @Override
  public synchronized void write(final byte[] b, final int off, final int len) throws IOException
  {
    super.write(b,off,len);
  }

  @Override
  public synchronized void close() throws IOException
  {
    flushBuffer();
    queue.offer(END_SIGNAL);
    closed=true;
  }

  public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream)
  {
    return executor.submit(
            new Callable<Void>()
            {
              @Override
              public Void call() throws Exception
              {
                try{
                  byte[] buffer=queue.take();
                  while(buffer!=END_SIGNAL){
                    outputStream.write(buffer);
                    buffer=queue.take();
                  }
                  outputStream.flush();
                } catch(Exception e){
                  close();
                  throw e;
                } finally{
                  outputStream.close();
                }
                return null;
              }
            }
    );
  }
0
19.10.2013 12:23:21