001/*
002 * Licensed under the Apache License, Version 2.0 (the "License");
003 * you may not use this file except in compliance with the License.
004 * You may obtain a copy of the License at
005 *
006 *     http://www.apache.org/licenses/LICENSE-2.0
007 *
008 * Unless required by applicable law or agreed to in writing, software
009 * distributed under the License is distributed on an "AS IS" BASIS,
010 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
011 * See the License for the specific language governing permissions and
012 * limitations under the License.
013 */
014package org.apache.commons.io.input;
015
016import static org.apache.commons.io.IOUtils.EOF;
017
018// import javax.annotation.concurrent.GuardedBy;
019import java.io.EOFException;
020import java.io.IOException;
021import java.io.InputStream;
022import java.io.InterruptedIOException;
023import java.nio.ByteBuffer;
024import java.util.Objects;
025import java.util.concurrent.ExecutorService;
026import java.util.concurrent.Executors;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.atomic.AtomicBoolean;
029import java.util.concurrent.locks.Condition;
030import java.util.concurrent.locks.ReentrantLock;
031
032/**
033 * Implements {@link InputStream} to asynchronously read ahead from an underlying input stream when a specified amount
034 * of data has been read from the current buffer. It does so by maintaining two buffers: an active buffer and a read
035 * ahead buffer. The active buffer contains data which should be returned when a read() call is issued. The read ahead
036 * buffer is used to asynchronously read from the underlying input stream. When the current active buffer is exhausted,
037 * we flip the two buffers so that we can start reading from the read ahead buffer without being blocked by disk I/O.
038 * <p>
039 * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19.
040 * </p>
041 *
042 * @since 2.9.0
043 */
044public class ReadAheadInputStream extends InputStream {
045
046    private static final ThreadLocal<byte[]> oneByte = ThreadLocal.withInitial(() -> new byte[1]);
047
048    /**
049     * Creates a new daemon executor service.
050     *
051     * @return a new daemon executor service.
052     */
053    private static ExecutorService newExecutorService() {
054        return Executors.newSingleThreadExecutor(ReadAheadInputStream::newThread);
055    }
056
057    /**
058     * Creates a new daemon thread.
059     *
060     * @param r the thread's runnable.
061     * @return a new daemon thread.
062     */
063    private static Thread newThread(final Runnable r) {
064        final Thread thread = new Thread(r, "commons-io-read-ahead");
065        thread.setDaemon(true);
066        return thread;
067    }
068
069    private final ReentrantLock stateChangeLock = new ReentrantLock();
070
071    // @GuardedBy("stateChangeLock")
072    private ByteBuffer activeBuffer;
073
074    // @GuardedBy("stateChangeLock")
075    private ByteBuffer readAheadBuffer;
076
077    // @GuardedBy("stateChangeLock")
078    private boolean endOfStream;
079
080    // @GuardedBy("stateChangeLock")
081    // true if async read is in progress
082    private boolean readInProgress;
083
084    // @GuardedBy("stateChangeLock")
085    // true if read is aborted due to an exception in reading from underlying input stream.
086    private boolean readAborted;
087
088    // @GuardedBy("stateChangeLock")
089    private Throwable readException;
090
091    // @GuardedBy("stateChangeLock")
092    // whether the close method is called.
093    private boolean isClosed;
094
095    // @GuardedBy("stateChangeLock")
096    // true when the close method will close the underlying input stream. This is valid only if
097    // `isClosed` is true.
098    private boolean isUnderlyingInputStreamBeingClosed;
099
100    // @GuardedBy("stateChangeLock")
101    // whether there is a read ahead task running,
102    private boolean isReading;
103
104    // Whether there is a reader waiting for data.
105    private final AtomicBoolean isWaiting = new AtomicBoolean(false);
106
107    private final InputStream underlyingInputStream;
108
109    private final ExecutorService executorService;
110
111    private final boolean shutdownExecutorService;
112
113    private final Condition asyncReadComplete = stateChangeLock.newCondition();
114
115    /**
116     * Creates an instance with the specified buffer size and read-ahead threshold
117     *
118     * @param inputStream The underlying input stream.
119     * @param bufferSizeInBytes The buffer size.
120     */
121    public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes) {
122        this(inputStream, bufferSizeInBytes, newExecutorService(), true);
123    }
124
125    /**
126     * Creates an instance with the specified buffer size and read-ahead threshold
127     *
128     * @param inputStream The underlying input stream.
129     * @param bufferSizeInBytes The buffer size.
130     * @param executorService An executor service for the read-ahead thread.
131     */
132    public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes,
133        final ExecutorService executorService) {
134        this(inputStream, bufferSizeInBytes, executorService, false);
135    }
136
137    /**
138     * Creates an instance with the specified buffer size and read-ahead threshold
139     *
140     * @param inputStream The underlying input stream.
141     * @param bufferSizeInBytes The buffer size.
142     * @param executorService An executor service for the read-ahead thread.
143     * @param shutdownExecutorService Whether or not to shutdown the given ExecutorService on close.
144     */
145    private ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes,
146        final ExecutorService executorService, final boolean shutdownExecutorService) {
147        if (bufferSizeInBytes <= 0) {
148            throw new IllegalArgumentException(
149                "bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes);
150        }
151        this.executorService = Objects.requireNonNull(executorService, "executorService");
152        this.underlyingInputStream = Objects.requireNonNull(inputStream, "inputStream");
153        this.shutdownExecutorService = shutdownExecutorService;
154        this.activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
155        this.readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
156        this.activeBuffer.flip();
157        this.readAheadBuffer.flip();
158    }
159
160    @Override
161    public int available() throws IOException {
162        stateChangeLock.lock();
163        // Make sure we have no integer overflow.
164        try {
165            return (int) Math.min(Integer.MAX_VALUE, (long) activeBuffer.remaining() + readAheadBuffer.remaining());
166        } finally {
167            stateChangeLock.unlock();
168        }
169    }
170
171    private void checkReadException() throws IOException {
172        if (readAborted) {
173            if (readException instanceof IOException) {
174                throw (IOException) readException;
175            }
176            throw new IOException(readException);
177        }
178    }
179
180    @Override
181    public void close() throws IOException {
182        boolean isSafeToCloseUnderlyingInputStream = false;
183        stateChangeLock.lock();
184        try {
185            if (isClosed) {
186                return;
187            }
188            isClosed = true;
189            if (!isReading) {
190                // Nobody is reading, so we can close the underlying input stream in this method.
191                isSafeToCloseUnderlyingInputStream = true;
192                // Flip this to make sure the read ahead task will not close the underlying input stream.
193                isUnderlyingInputStreamBeingClosed = true;
194            }
195        } finally {
196            stateChangeLock.unlock();
197        }
198
199        if (shutdownExecutorService) {
200            try {
201                executorService.shutdownNow();
202                executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
203            } catch (final InterruptedException e) {
204                final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
205                iio.initCause(e);
206                throw iio;
207            } finally {
208                if (isSafeToCloseUnderlyingInputStream) {
209                    underlyingInputStream.close();
210                }
211            }
212        }
213    }
214
215    private void closeUnderlyingInputStreamIfNecessary() {
216        boolean needToCloseUnderlyingInputStream = false;
217        stateChangeLock.lock();
218        try {
219            isReading = false;
220            if (isClosed && !isUnderlyingInputStreamBeingClosed) {
221                // close method cannot close underlyingInputStream because we were reading.
222                needToCloseUnderlyingInputStream = true;
223            }
224        } finally {
225            stateChangeLock.unlock();
226        }
227        if (needToCloseUnderlyingInputStream) {
228            try {
229                underlyingInputStream.close();
230            } catch (final IOException e) {
231                // TODO ?
232            }
233        }
234    }
235
236    private boolean isEndOfStream() {
237        return !activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream;
238    }
239
240    @Override
241    public int read() throws IOException {
242        if (activeBuffer.hasRemaining()) {
243            // short path - just get one byte.
244            return activeBuffer.get() & 0xFF;
245        }
246        final byte[] oneByteArray = oneByte.get();
247        return read(oneByteArray, 0, 1) == EOF ? -1 : oneByteArray[0] & 0xFF;
248    }
249
250    @Override
251    public int read(final byte[] b, final int offset, int len) throws IOException {
252        if (offset < 0 || len < 0 || len > b.length - offset) {
253            throw new IndexOutOfBoundsException();
254        }
255        if (len == 0) {
256            return 0;
257        }
258
259        if (!activeBuffer.hasRemaining()) {
260            // No remaining in active buffer - lock and switch to write ahead buffer.
261            stateChangeLock.lock();
262            try {
263                waitForAsyncReadComplete();
264                if (!readAheadBuffer.hasRemaining()) {
265                    // The first read.
266                    readAsync();
267                    waitForAsyncReadComplete();
268                    if (isEndOfStream()) {
269                        return EOF;
270                    }
271                }
272                // Swap the newly read read ahead buffer in place of empty active buffer.
273                swapBuffers();
274                // After swapping buffers, trigger another async read for read ahead buffer.
275                readAsync();
276            } finally {
277                stateChangeLock.unlock();
278            }
279        }
280        len = Math.min(len, activeBuffer.remaining());
281        activeBuffer.get(b, offset, len);
282
283        return len;
284    }
285
286    /** Read data from underlyingInputStream to readAheadBuffer asynchronously. */
287    private void readAsync() throws IOException {
288        stateChangeLock.lock();
289        final byte[] arr;
290        try {
291            arr = readAheadBuffer.array();
292            if (endOfStream || readInProgress) {
293                return;
294            }
295            checkReadException();
296            readAheadBuffer.position(0);
297            readAheadBuffer.flip();
298            readInProgress = true;
299        } finally {
300            stateChangeLock.unlock();
301        }
302        executorService.execute(() -> {
303            stateChangeLock.lock();
304            try {
305                if (isClosed) {
306                    readInProgress = false;
307                    return;
308                }
309                // Flip this so that the close method will not close the underlying input stream when we
310                // are reading.
311                isReading = true;
312            } finally {
313                stateChangeLock.unlock();
314            }
315
316            // Please note that it is safe to release the lock and read into the read ahead buffer
317            // because either of following two conditions will hold:
318            //
319            // 1. The active buffer has data available to read so the reader will not read from the read ahead buffer.
320            //
321            // 2. This is the first time read is called or the active buffer is exhausted, in that case the reader waits
322            // for this async read to complete.
323            //
324            // So there is no race condition in both the situations.
325            int read = 0;
326            int off = 0, len = arr.length;
327            Throwable exception = null;
328            try {
329                // try to fill the read ahead buffer.
330                // if a reader is waiting, possibly return early.
331                do {
332                    read = underlyingInputStream.read(arr, off, len);
333                    if (read <= 0) {
334                        break;
335                    }
336                    off += read;
337                    len -= read;
338                } while (len > 0 && !isWaiting.get());
339            } catch (final Throwable ex) {
340                exception = ex;
341                if (ex instanceof Error) {
342                    // `readException` may not be reported to the user. Rethrow Error to make sure at least
343                    // The user can see Error in UncaughtExceptionHandler.
344                    throw (Error) ex;
345                }
346            } finally {
347                stateChangeLock.lock();
348                try {
349                    readAheadBuffer.limit(off);
350                    if (read < 0 || (exception instanceof EOFException)) {
351                        endOfStream = true;
352                    } else if (exception != null) {
353                        readAborted = true;
354                        readException = exception;
355                    }
356                    readInProgress = false;
357                    signalAsyncReadComplete();
358                } finally {
359                    stateChangeLock.unlock();
360                }
361                closeUnderlyingInputStreamIfNecessary();
362            }
363        });
364    }
365
366    private void signalAsyncReadComplete() {
367        stateChangeLock.lock();
368        try {
369            asyncReadComplete.signalAll();
370        } finally {
371            stateChangeLock.unlock();
372        }
373    }
374
375    @Override
376    public long skip(final long n) throws IOException {
377        if (n <= 0L) {
378            return 0L;
379        }
380        if (n <= activeBuffer.remaining()) {
381            // Only skipping from active buffer is sufficient
382            activeBuffer.position((int) n + activeBuffer.position());
383            return n;
384        }
385        stateChangeLock.lock();
386        long skipped;
387        try {
388            skipped = skipInternal(n);
389        } finally {
390            stateChangeLock.unlock();
391        }
392        return skipped;
393    }
394
395    /**
396     * Internal skip function which should be called only from skip(). The assumption is that the stateChangeLock is
397     * already acquired in the caller before calling this function.
398     *
399     * @param n the number of bytes to be skipped.
400     * @return the actual number of bytes skipped.
401     */
402    private long skipInternal(final long n) throws IOException {
403        assert stateChangeLock.isLocked();
404        waitForAsyncReadComplete();
405        if (isEndOfStream()) {
406            return 0;
407        }
408        if (available() >= n) {
409            // we can skip from the internal buffers
410            int toSkip = (int) n;
411            // We need to skip from both active buffer and read ahead buffer
412            toSkip -= activeBuffer.remaining();
413            assert toSkip > 0; // skipping from activeBuffer already handled.
414            activeBuffer.position(0);
415            activeBuffer.flip();
416            readAheadBuffer.position(toSkip + readAheadBuffer.position());
417            swapBuffers();
418            // Trigger async read to emptied read ahead buffer.
419            readAsync();
420            return n;
421        }
422        final int skippedBytes = available();
423        final long toSkip = n - skippedBytes;
424        activeBuffer.position(0);
425        activeBuffer.flip();
426        readAheadBuffer.position(0);
427        readAheadBuffer.flip();
428        final long skippedFromInputStream = underlyingInputStream.skip(toSkip);
429        readAsync();
430        return skippedBytes + skippedFromInputStream;
431    }
432
433    /**
434     * Flips the active and read ahead buffer
435     */
436    private void swapBuffers() {
437        final ByteBuffer temp = activeBuffer;
438        activeBuffer = readAheadBuffer;
439        readAheadBuffer = temp;
440    }
441
442    private void waitForAsyncReadComplete() throws IOException {
443        stateChangeLock.lock();
444        try {
445            isWaiting.set(true);
446            // There is only one reader, and one writer, so the writer should signal only once,
447            // but a while loop checking the wake up condition is still needed to avoid spurious wakeups.
448            while (readInProgress) {
449                asyncReadComplete.await();
450            }
451        } catch (final InterruptedException e) {
452            final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
453            iio.initCause(e);
454            throw iio;
455        } finally {
456            isWaiting.set(false);
457            stateChangeLock.unlock();
458        }
459        checkReadException();
460    }
461}