/*
 * Decompiled with CFR 0.152.
 */
package org.refcodes.io;

import java.io.BufferedInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.concurrent.ExecutorService;
import org.refcodes.component.CloseException;
import org.refcodes.component.ConnectionStatus;
import org.refcodes.component.OpenException;
import org.refcodes.controlflow.ControlFlowUtility;
import org.refcodes.controlflow.RetryTimeoutImpl;
import org.refcodes.data.DaemonLoopSleepTime;
import org.refcodes.data.RetryLoopCount;
import org.refcodes.exception.ExceptionUtility;
import org.refcodes.io.AbstractReceiver;
import org.refcodes.io.SerializableObjectInputStreamImpl;
import org.refcodes.mixin.Disposable;
import org.refcodes.mixin.Loggable;

public abstract class AbstractPrefetchInputStreamReceiver<DATA extends Serializable>
extends AbstractReceiver<DATA>
implements Loggable {
    private ObjectInputStream _objectInputStream = null;
    private ExecutorService _executorService;
    private IoStreamReceiverDaemon _ioStreamReceiverDaemon = null;
    private boolean _isDaemonAlive = false;

    public AbstractPrefetchInputStreamReceiver() {
        this(1024, null);
    }

    public AbstractPrefetchInputStreamReceiver(ExecutorService aExecutorService) {
        this(1024, aExecutorService);
    }

    public AbstractPrefetchInputStreamReceiver(int aQueueCapacity) {
        this(aQueueCapacity, null);
    }

    public AbstractPrefetchInputStreamReceiver(int aQueueCapacity, ExecutorService aExecutorService) {
        this._executorService = aExecutorService == null ? ControlFlowUtility.createCachedExecutorService((boolean)true) : ControlFlowUtility.toManagedExecutorService((ExecutorService)aExecutorService);
    }

    @Override
    public synchronized void close() throws CloseException {
        if (!this.isClosed()) {
            super.close();
            if (this._ioStreamReceiverDaemon != null) {
                this._ioStreamReceiverDaemon.dispose();
                this._ioStreamReceiverDaemon = null;
            }
            try {
                if (this._objectInputStream != null) {
                    this._objectInputStream.close();
                }
            }
            catch (IOException e) {
                throw new CloseException("Unable to close receiver, connection status is <" + this.getConnectionStatus() + ">.", (Throwable)e);
            }
        }
    }

    protected synchronized void open(InputStream aInputStream) throws OpenException {
        block5: {
            if (this.isOpened()) {
                throw new OpenException("Unable to open the connection is is is ALREADY OPEN; connection status is " + this.getConnectionStatus() + ".");
            }
            try {
                this._objectInputStream = !(aInputStream instanceof BufferedInputStream) ? new SerializableObjectInputStreamImpl(new BufferedInputStream(aInputStream)) : new SerializableObjectInputStreamImpl(aInputStream);
            }
            catch (IOException aException) {
                if (ExceptionUtility.isThrownAsOfAlreadyClosed((IOException)aException)) break block5;
                throw new OpenException("Unable to open the I/O stream receiver as of a causing exception.", (Throwable)aException);
            }
        }
        this._ioStreamReceiverDaemon = new IoStreamReceiverDaemon();
        this.setConnectionStatus(ConnectionStatus.OPENED);
        this._executorService.execute(this._ioStreamReceiverDaemon);
        if (!this._isDaemonAlive) {
            RetryTimeoutImpl theRetryTimeout = new RetryTimeoutImpl((long)DaemonLoopSleepTime.NORM.getMilliseconds(), RetryLoopCount.NORM_NUM_RETRY_LOOPS.getNumber().intValue());
            while (!this._isDaemonAlive) {
                theRetryTimeout.nextRetry();
            }
        }
    }

    protected boolean isOpenable(InputStream aInputStream) {
        if (aInputStream == null) {
            return false;
        }
        return !this.isOpened();
    }

    private class IoStreamReceiverDaemon
    implements Runnable,
    Disposable {
        private boolean _isDisposed = false;

        private IoStreamReceiverDaemon() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                Object eObject = null;
                AbstractPrefetchInputStreamReceiver.this._isDaemonAlive = true;
                while (!this._isDisposed && AbstractPrefetchInputStreamReceiver.this.isOpened()) {
                    try {
                        eObject = AbstractPrefetchInputStreamReceiver.this._objectInputStream.readObject();
                        AbstractPrefetchInputStreamReceiver.this.pushDatagram((Serializable)eObject);
                    }
                    catch (ClassCastException | ClassNotFoundException aException) {
                        AbstractPrefetchInputStreamReceiver.this.warn("Unable to read datagram from sender as of a causing exception; connection status is " + AbstractPrefetchInputStreamReceiver.this.getConnectionStatus(), aException);
                    }
                }
            }
            catch (IOException aException) {
                AbstractPrefetchInputStreamReceiver abstractPrefetchInputStreamReceiver = AbstractPrefetchInputStreamReceiver.this;
                synchronized (abstractPrefetchInputStreamReceiver) {
                    if (AbstractPrefetchInputStreamReceiver.this.isOpened()) {
                        try {
                            if (!(aException instanceof EOFException)) {
                                AbstractPrefetchInputStreamReceiver.this.warn("Unable to read datagram from sender as of a causing exception; connection status is " + AbstractPrefetchInputStreamReceiver.this.getConnectionStatus(), aException);
                            }
                            AbstractPrefetchInputStreamReceiver.this.close();
                        }
                        catch (CloseException e) {
                            AbstractPrefetchInputStreamReceiver.this.warn("Unable to close malfunctioning connection as of: " + ExceptionUtility.toMessage((Throwable)e), e);
                        }
                    }
                }
            }
            finally {
                AbstractPrefetchInputStreamReceiver.this._isDaemonAlive = false;
                AbstractPrefetchInputStreamReceiver.this.debug("Terminating I/O stream receiver daemon <" + this.getClass().getName() + ">.");
            }
        }

        public void dispose() {
            this._isDisposed = true;
        }
    }
}

