001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.io.input; 018 019import static org.apache.commons.io.IOUtils.EOF; 020 021import org.apache.commons.io.output.QueueOutputStream; 022 023import java.io.InputStream; 024import java.io.PipedInputStream; 025import java.io.PipedOutputStream; 026import java.util.Objects; 027import java.util.concurrent.BlockingQueue; 028import java.util.concurrent.LinkedBlockingQueue; 029 030/** 031 * Simple alternative to JDK {@link java.io.PipedInputStream}; queue input stream provides what's written in queue 032 * output stream. 033 * 034 * <p> 035 * Example usage: 036 * </p> 037 * <pre> 038 * QueueInputStream inputStream = new QueueInputStream(); 039 * QueueOutputStream outputStream = inputStream.newQueueOutputStream(); 040 * 041 * outputStream.write("hello world".getBytes(UTF_8)); 042 * inputStream.read(); 043 * </pre> 044 * <p> 045 * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a 046 * single thread or multiple threads. Also, unlike JDK classes, no special meaning is attached to initial or current 047 * thread. Instances can be used longer after initial threads exited. 048 * </p> 049 * <p> 050 * Closing a {@code QueueInputStream} has no effect. The methods in this class can be called after the stream has been 051 * closed without generating an {@code IOException}. 052 * </p> 053 * 054 * @see QueueOutputStream 055 * @since 2.9.0 056 */ 057public class QueueInputStream extends InputStream { 058 059 private final BlockingQueue<Integer> blockingQueue; 060 061 /** 062 * Constructs a new instance with no limit to its internal buffer size. 063 */ 064 public QueueInputStream() { 065 this(new LinkedBlockingQueue<>()); 066 } 067 068 /** 069 * Constructs a new instance with given buffer 070 * 071 * @param blockingQueue backing queue for the stream 072 */ 073 public QueueInputStream(final BlockingQueue<Integer> blockingQueue) { 074 this.blockingQueue = Objects.requireNonNull(blockingQueue, "blockingQueue"); 075 } 076 077 /** 078 * Creates a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this 079 * input stream. 080 * 081 * @return QueueOutputStream connected to this stream 082 */ 083 public QueueOutputStream newQueueOutputStream() { 084 return new QueueOutputStream(blockingQueue); 085 } 086 087 /** 088 * Reads and returns a single byte. 089 * 090 * @return either the byte read or {@code -1} if the end of the stream has been reached 091 */ 092 @Override 093 public int read() { 094 final Integer value = blockingQueue.poll(); 095 return value == null ? EOF : ((0xFF) & value); 096 } 097 098}