/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.disk.iomanager;

import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.disk.iomanager.RequestQueue;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.util.TestNotificationListener;
import org.apache.flink.runtime.util.event.NotificationListener;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;

public class AsynchronousBufferFileWriterTest {
    @Rule
    public ExpectedException exception = ExpectedException.none();
    private static final IOManager ioManager = new IOManagerAsync();
    private static final Buffer mockBuffer = (Buffer)Mockito.mock(Buffer.class);
    private AsynchronousBufferFileWriter writer;

    @AfterClass
    public static void shutdown() throws Exception {
        ioManager.close();
    }

    @Before
    public void setUp() throws IOException {
        this.writer = new AsynchronousBufferFileWriter(ioManager.createChannel(), new RequestQueue());
    }

    @Test
    public void testAddAndHandleRequest() throws Exception {
        this.addRequest();
        Assert.assertEquals((String)"Didn't increment number of outstanding requests.", (long)1L, (long)this.writer.getNumberOfOutstandingRequests());
        this.handleRequest();
        Assert.assertEquals((String)"Didn't decrement number of outstanding requests.", (long)0L, (long)this.writer.getNumberOfOutstandingRequests());
    }

    @Test
    public void testAddWithFailingWriter() throws Exception {
        AsynchronousBufferFileWriter writer = new AsynchronousBufferFileWriter(ioManager.createChannel(), new RequestQueue());
        writer.close();
        this.exception.expect(IOException.class);
        NetworkBuffer buffer = new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment((int)4096), FreeingBufferRecycler.INSTANCE);
        try {
            writer.writeBlock((Buffer)buffer);
        }
        finally {
            if (!buffer.isRecycled()) {
                buffer.recycleBuffer();
                Assert.fail((String)"buffer not recycled");
            }
            Assert.assertEquals((String)"Shouln't increment number of outstanding requests.", (long)0L, (long)writer.getNumberOfOutstandingRequests());
        }
    }

    @Test
    public void testSubscribe() throws Exception {
        TestNotificationListener listener = new TestNotificationListener();
        Assert.assertFalse((String)"Allowed to subscribe w/o any outstanding requests.", (boolean)this.writer.registerAllRequestsProcessedListener((NotificationListener)listener));
        this.addRequest();
        Assert.assertTrue((String)"Didn't allow to subscribe.", (boolean)this.writer.registerAllRequestsProcessedListener((NotificationListener)listener));
        this.handleRequest();
        Assert.assertEquals((String)"Listener was not notified.", (long)1L, (long)listener.getNumberOfNotifications());
    }

    @Test
    public void testSubscribeAndClose() throws IOException, InterruptedException {
        TestNotificationListener listener = new TestNotificationListener();
        final AtomicReference error = new AtomicReference();
        final CountDownLatch sync = new CountDownLatch(1);
        this.addRequest();
        this.addRequest();
        this.writer.registerAllRequestsProcessedListener((NotificationListener)listener);
        Thread asyncCloseThread = new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    AsynchronousBufferFileWriterTest.this.writer.close();
                }
                catch (Throwable t) {
                    error.set(t);
                }
                finally {
                    sync.countDown();
                }
            }
        });
        asyncCloseThread.start();
        this.handleRequest();
        this.handleRequest();
        sync.await();
        Assert.assertEquals((String)"Listener was not notified.", (long)1L, (long)listener.getNumberOfNotifications());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentSubscribeAndHandleRequest() throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        final TestNotificationListener listener = new TestNotificationListener();
        Callable<Boolean> subscriber = new Callable<Boolean>(){

            @Override
            public Boolean call() throws Exception {
                return AsynchronousBufferFileWriterTest.this.writer.registerAllRequestsProcessedListener((NotificationListener)listener);
            }
        };
        Callable<Void> requestHandler = new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                AsynchronousBufferFileWriterTest.this.handleRequest();
                return null;
            }
        };
        try {
            for (int i = 0; i < 50000; ++i) {
                listener.reset();
                this.addRequest();
                Future<Void> handleRequestFuture = executor.submit(requestHandler);
                Future<Boolean> subscribeFuture = executor.submit(subscriber);
                handleRequestFuture.get();
                try {
                    if (subscribeFuture.get().booleanValue()) {
                        Assert.assertEquals((String)"Race: Successfully subscribed, but was never notified.", (long)1L, (long)listener.getNumberOfNotifications());
                        continue;
                    }
                    Assert.assertEquals((String)"Race: Never subscribed successfully, but was notified.", (long)0L, (long)listener.getNumberOfNotifications());
                    continue;
                }
                catch (Throwable t) {
                    System.out.println(i);
                    Assert.fail((String)t.getMessage());
                }
            }
        }
        finally {
            executor.shutdownNow();
        }
    }

    private void addRequest() throws IOException {
        this.writer.writeBlock(mockBuffer);
    }

    private void handleRequest() {
        this.writer.handleProcessedBuffer((Object)mockBuffer, null);
    }
}

