package rx.operators;

import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.helpers.UtilLoggingLevel;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import rx.Observable;
import rx.Observer;
import rx.Subscription;

/* loaded from: input_file:WEB-INF/lib/rxjava-core-0.14.2.jar:rx/operators/SynchronizedObserver.class */
public final class SynchronizedObserver<T> implements Observer<T> {
    private final Observer<? super T> observer;
    private final SafeObservableSubscription subscription;
    private volatile boolean finishRequested;
    private volatile boolean finished;
    private volatile Object lock;

    /* loaded from: input_file:WEB-INF/lib/rxjava-core-0.14.2.jar:rx/operators/SynchronizedObserver$UnitTest.class */
    public static class UnitTest {

        @Mock
        Observer<String> aObserver;

        /* loaded from: input_file:WEB-INF/lib/rxjava-core-0.14.2.jar:rx/operators/SynchronizedObserver$UnitTest$BusyObserver.class */
        private static class BusyObserver implements Observer<String> {
            volatile boolean onCompleted;
            volatile boolean onError;
            AtomicInteger onNextCount;
            AtomicInteger threadsRunning;
            AtomicInteger maxConcurrentThreads;

            private BusyObserver() {
                this.onCompleted = false;
                this.onError = false;
                this.onNextCount = new AtomicInteger();
                this.threadsRunning = new AtomicInteger();
                this.maxConcurrentThreads = new AtomicInteger();
            }

            @Override // rx.Observer
            public void onCompleted() {
                this.threadsRunning.incrementAndGet();
                System.out.println(">>> BusyObserver received onCompleted");
                this.onCompleted = true;
                int i = this.threadsRunning.get();
                int i2 = this.maxConcurrentThreads.get();
                if (i > i2) {
                    this.maxConcurrentThreads.compareAndSet(i2, i);
                }
                this.threadsRunning.decrementAndGet();
            }

            @Override // rx.Observer
            public void onError(Throwable th) {
                this.threadsRunning.incrementAndGet();
                System.out.println(">>> BusyObserver received onError: " + th.getMessage());
                this.onError = true;
                int i = this.threadsRunning.get();
                int i2 = this.maxConcurrentThreads.get();
                if (i > i2) {
                    this.maxConcurrentThreads.compareAndSet(i2, i);
                }
                this.threadsRunning.decrementAndGet();
            }

            @Override // rx.Observer
            public void onNext(String str) {
                this.threadsRunning.incrementAndGet();
                try {
                    this.onNextCount.incrementAndGet();
                    System.out.println(">>> BusyObserver received onNext: " + str);
                    try {
                        Thread.sleep(200L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } finally {
                    int i = this.threadsRunning.get();
                    int i2 = this.maxConcurrentThreads.get();
                    if (i > i2) {
                        this.maxConcurrentThreads.compareAndSet(i2, i);
                    }
                    this.threadsRunning.decrementAndGet();
                }
            }
        }

        /* loaded from: input_file:WEB-INF/lib/rxjava-core-0.14.2.jar:rx/operators/SynchronizedObserver$UnitTest$CompletionThread.class */
        public static class CompletionThread implements Runnable {
            private final Observer<String> Observer;
            private final TestConcurrencyObserverEvent event;
            private final Future<?>[] waitOnThese;

            CompletionThread(Observer<String> observer, TestConcurrencyObserverEvent testConcurrencyObserverEvent, Future<?>... futureArr) {
                this.Observer = observer;
                this.event = testConcurrencyObserverEvent;
                this.waitOnThese = futureArr;
            }

            @Override // java.lang.Runnable
            public void run() {
                if (this.waitOnThese != null) {
                    for (Future<?> future : this.waitOnThese) {
                        try {
                            future.get();
                        } catch (Throwable th) {
                            System.err.println("Error while waiting on future in CompletionThread");
                        }
                    }
                }
                if (this.event == TestConcurrencyObserverEvent.onError) {
                    this.Observer.onError(new RuntimeException("mocked exception"));
                } else {
                    if (this.event != TestConcurrencyObserverEvent.onCompleted) {
                        throw new IllegalArgumentException("Expecting either onError or onCompleted");
                    }
                    this.Observer.onCompleted();
                }
            }
        }

        /* loaded from: input_file:WEB-INF/lib/rxjava-core-0.14.2.jar:rx/operators/SynchronizedObserver$UnitTest$ExternalBusyThread.class */
        private static class ExternalBusyThread extends Thread {
            private BusyObserver observer;
            private Object lock;
            private int lockTimes;
            private int waitTime;
            public volatile boolean fail = false;

            public ExternalBusyThread(BusyObserver busyObserver, Object obj, int i, int i2) {
                this.observer = busyObserver;
                this.lock = obj;
                this.lockTimes = i;
                this.waitTime = i2;
            }

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                Random random = new Random();
                for (int i = 0; i < this.lockTimes; i++) {
                    synchronized (this.lock) {
                        int i2 = this.observer.onNextCount.get();
                        boolean z = this.observer.onCompleted;
                        boolean z2 = this.observer.onError;
                        try {
                            Thread.sleep(random.nextInt(this.waitTime));
                        } catch (InterruptedException e) {
                        }
                        int i3 = this.observer.onNextCount.get();
                        boolean z3 = this.observer.onCompleted;
                        boolean z4 = this.observer.onError;
                        if (i2 != i3) {
                            System.out.println(">>> ExternalBusyThread received different onNextCount: " + i2 + " -> " + i3);
                            this.fail = true;
                            return;
                        } else if (z != z3) {
                            System.out.println(">>> ExternalBusyThread received different onCompleted: " + z + " -> " + z3);
                            this.fail = true;
                            return;
                        } else if (z2 != z4) {
                            System.out.println(">>> ExternalBusyThread received different onError: " + z2 + " -> " + z4);
                            this.fail = true;
                            return;
                        }
                    }
                }
            }
        }

        /* loaded from: input_file:WEB-INF/lib/rxjava-core-0.14.2.jar:rx/operators/SynchronizedObserver$UnitTest$OnNextThread.class */
        public static class OnNextThread implements Runnable {
            private final Observer<String> Observer;
            private final int numStringsToSend;

            OnNextThread(Observer<String> observer, int i) {
                this.Observer = observer;
                this.numStringsToSend = i;
            }

            @Override // java.lang.Runnable
            public void run() {
                for (int i = 0; i < this.numStringsToSend; i++) {
                    this.Observer.onNext("aString");
                }
            }
        }

        /* loaded from: input_file:WEB-INF/lib/rxjava-core-0.14.2.jar:rx/operators/SynchronizedObserver$UnitTest$TestConcurrencyObserver.class */
        private static class TestConcurrencyObserver implements Observer<String> {
            private final LinkedBlockingQueue<TestConcurrencyObserverEvent> events;
            private final int waitTime;

            public TestConcurrencyObserver(int i) {
                this.events = new LinkedBlockingQueue<>();
                this.waitTime = i;
            }

            public TestConcurrencyObserver() {
                this.events = new LinkedBlockingQueue<>();
                this.waitTime = 0;
            }

            @Override // rx.Observer
            public void onCompleted() {
                this.events.add(TestConcurrencyObserverEvent.onCompleted);
            }

            @Override // rx.Observer
            public void onError(Throwable th) {
                this.events.add(TestConcurrencyObserverEvent.onError);
            }

            @Override // rx.Observer
            public void onNext(String str) {
                this.events.add(TestConcurrencyObserverEvent.onNext);
                int i = 0;
                for (int i2 = 0; i2 < 20; i2++) {
                    i += i * i2;
                }
                if (this.waitTime > 0) {
                    try {
                        Thread.sleep(this.waitTime);
                    } catch (InterruptedException e) {
                    }
                }
            }

            public int assertEvents(TestConcurrencyObserverEvent testConcurrencyObserverEvent) throws IllegalStateException {
                int i = 0;
                boolean z = false;
                Iterator<TestConcurrencyObserverEvent> it = this.events.iterator();
                while (it.hasNext()) {
                    TestConcurrencyObserverEvent next = it.next();
                    if (next == TestConcurrencyObserverEvent.onNext) {
                        if (z) {
                            throw new IllegalStateException("Received onNext but we're already finished.");
                        }
                        i++;
                    } else if (next == TestConcurrencyObserverEvent.onError) {
                        if (z) {
                            throw new IllegalStateException("Received onError but we're already finished.");
                        }
                        if (testConcurrencyObserverEvent != null && TestConcurrencyObserverEvent.onError != testConcurrencyObserverEvent) {
                            throw new IllegalStateException("Received onError ending event but expected " + testConcurrencyObserverEvent);
                        }
                        z = true;
                    } else if (next != TestConcurrencyObserverEvent.onCompleted) {
                        continue;
                    } else {
                        if (z) {
                            throw new IllegalStateException("Received onCompleted but we're already finished.");
                        }
                        if (testConcurrencyObserverEvent != null && TestConcurrencyObserverEvent.onCompleted != testConcurrencyObserverEvent) {
                            throw new IllegalStateException("Received onCompleted ending event but expected " + testConcurrencyObserverEvent);
                        }
                        z = true;
                    }
                }
                return i;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:WEB-INF/lib/rxjava-core-0.14.2.jar:rx/operators/SynchronizedObserver$UnitTest$TestConcurrencyObserverEvent.class */
        public enum TestConcurrencyObserverEvent {
            onCompleted,
            onError,
            onNext
        }

        /* loaded from: input_file:WEB-INF/lib/rxjava-core-0.14.2.jar:rx/operators/SynchronizedObserver$UnitTest$TestMultiThreadedObservable.class */
        private static class TestMultiThreadedObservable implements Observable.OnSubscribeFunc<String> {
            final Subscription s;
            final String[] values;
            Thread t = null;
            AtomicInteger threadsRunning = new AtomicInteger();
            AtomicInteger maxConcurrentThreads = new AtomicInteger();
            ExecutorService threadPool = Executors.newCachedThreadPool();

            public TestMultiThreadedObservable(Subscription subscription, String... strArr) {
                this.s = subscription;
                this.values = strArr;
            }

            @Override // rx.Observable.OnSubscribeFunc
            public Subscription onSubscribe(final Observer<? super String> observer) {
                System.out.println("TestMultiThreadedObservable subscribed to ...");
                this.t = new Thread(new Runnable() { // from class: rx.operators.SynchronizedObserver.UnitTest.TestMultiThreadedObservable.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            System.out.println("running TestMultiThreadedObservable thread");
                            for (final String str : TestMultiThreadedObservable.this.values) {
                                TestMultiThreadedObservable.this.threadPool.execute(new Runnable() { // from class: rx.operators.SynchronizedObserver.UnitTest.TestMultiThreadedObservable.1.1
                                    @Override // java.lang.Runnable
                                    public void run() {
                                        TestMultiThreadedObservable.this.threadsRunning.incrementAndGet();
                                        try {
                                            try {
                                                System.out.println("TestMultiThreadedObservable onNext: " + str);
                                                if (str == null) {
                                                    throw new NullPointerException();
                                                }
                                                observer.onNext(str);
                                                int i = TestMultiThreadedObservable.this.threadsRunning.get();
                                                int i2 = TestMultiThreadedObservable.this.maxConcurrentThreads.get();
                                                if (i > i2) {
                                                    TestMultiThreadedObservable.this.maxConcurrentThreads.compareAndSet(i2, i);
                                                }
                                                TestMultiThreadedObservable.this.threadsRunning.decrementAndGet();
                                            } catch (Throwable th) {
                                                observer.onError(th);
                                                TestMultiThreadedObservable.this.threadsRunning.decrementAndGet();
                                            }
                                        } catch (Throwable th2) {
                                            TestMultiThreadedObservable.this.threadsRunning.decrementAndGet();
                                            throw th2;
                                        }
                                    }
                                });
                            }
                            TestMultiThreadedObservable.this.threadPool.shutdown();
                            try {
                                TestMultiThreadedObservable.this.threadPool.awaitTermination(2L, TimeUnit.SECONDS);
                                observer.onCompleted();
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        } finally {
                            RuntimeException runtimeException = new RuntimeException(e);
                        }
                    }
                });
                System.out.println("starting TestMultiThreadedObservable thread");
                this.t.start();
                System.out.println("done starting TestMultiThreadedObservable thread");
                return this.s;
            }

            public void waitToFinish() {
                try {
                    this.t.join();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }

        /* loaded from: input_file:WEB-INF/lib/rxjava-core-0.14.2.jar:rx/operators/SynchronizedObserver$UnitTest$TestSingleThreadedObservable.class */
        private static class TestSingleThreadedObservable implements Observable.OnSubscribeFunc<String> {
            final Subscription s;
            final String[] values;
            private Thread t = null;

            public TestSingleThreadedObservable(Subscription subscription, String... strArr) {
                this.s = subscription;
                this.values = strArr;
            }

            @Override // rx.Observable.OnSubscribeFunc
            public Subscription onSubscribe(final Observer<? super String> observer) {
                System.out.println("TestSingleThreadedObservable subscribed to ...");
                this.t = new Thread(new Runnable() { // from class: rx.operators.SynchronizedObserver.UnitTest.TestSingleThreadedObservable.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            System.out.println("running TestSingleThreadedObservable thread");
                            for (String str : TestSingleThreadedObservable.this.values) {
                                System.out.println("TestSingleThreadedObservable onNext: " + str);
                                observer.onNext(str);
                            }
                            observer.onCompleted();
                        } catch (Throwable th) {
                            throw new RuntimeException(th);
                        }
                    }
                });
                System.out.println("starting TestSingleThreadedObservable thread");
                this.t.start();
                System.out.println("done starting TestSingleThreadedObservable thread");
                return this.s;
            }

            public void waitToFinish() {
                try {
                    this.t.join();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }

        @Before
        public void before() {
            MockitoAnnotations.initMocks(this);
        }

        @Test
        public void testSingleThreadedBasic() {
            Subscription subscription = (Subscription) Mockito.mock(Subscription.class);
            TestSingleThreadedObservable testSingleThreadedObservable = new TestSingleThreadedObservable(subscription, "one", "two", "three");
            Observable.create(testSingleThreadedObservable).subscribe(new SynchronizedObserver(this.aObserver, new SafeObservableSubscription(subscription)));
            testSingleThreadedObservable.waitToFinish();
            ((Observer) Mockito.verify(this.aObserver, Mockito.times(1))).onNext("one");
            ((Observer) Mockito.verify(this.aObserver, Mockito.times(1))).onNext("two");
            ((Observer) Mockito.verify(this.aObserver, Mockito.times(1))).onNext("three");
            ((Observer) Mockito.verify(this.aObserver, Mockito.never())).onError((Throwable) Mockito.any(Throwable.class));
            ((Observer) Mockito.verify(this.aObserver, Mockito.times(1))).onCompleted();
        }

        @Test
        public void testMultiThreadedBasic() {
            Subscription subscription = (Subscription) Mockito.mock(Subscription.class);
            TestMultiThreadedObservable testMultiThreadedObservable = new TestMultiThreadedObservable(subscription, "one", "two", "three");
            Observable create = Observable.create(testMultiThreadedObservable);
            SafeObservableSubscription safeObservableSubscription = new SafeObservableSubscription(subscription);
            BusyObserver busyObserver = new BusyObserver();
            create.subscribe(new SynchronizedObserver(busyObserver, safeObservableSubscription));
            testMultiThreadedObservable.waitToFinish();
            Assert.assertEquals(3L, busyObserver.onNextCount.get());
            Assert.assertFalse(busyObserver.onError);
            Assert.assertTrue(busyObserver.onCompleted);
            Assert.assertTrue(testMultiThreadedObservable.maxConcurrentThreads.get() > 1);
            Assert.assertEquals(1L, busyObserver.maxConcurrentThreads.get());
        }

        @Test
        public void testMultiThreadedBasicWithLock() {
            Subscription subscription = (Subscription) Mockito.mock(Subscription.class);
            TestMultiThreadedObservable testMultiThreadedObservable = new TestMultiThreadedObservable(subscription, "one", "two", "three");
            Observable create = Observable.create(testMultiThreadedObservable);
            SafeObservableSubscription safeObservableSubscription = new SafeObservableSubscription(subscription);
            BusyObserver busyObserver = new BusyObserver();
            Object obj = new Object();
            ExternalBusyThread externalBusyThread = new ExternalBusyThread(busyObserver, obj, 10, 100);
            SynchronizedObserver synchronizedObserver = new SynchronizedObserver(busyObserver, safeObservableSubscription, obj);
            externalBusyThread.start();
            create.subscribe(synchronizedObserver);
            testMultiThreadedObservable.waitToFinish();
            try {
                externalBusyThread.join(10000L);
                Assert.assertFalse(externalBusyThread.isAlive());
                Assert.assertFalse(externalBusyThread.fail);
            } catch (InterruptedException e) {
            }
            Assert.assertEquals(3L, busyObserver.onNextCount.get());
            Assert.assertFalse(busyObserver.onError);
            Assert.assertTrue(busyObserver.onCompleted);
            Assert.assertTrue(testMultiThreadedObservable.maxConcurrentThreads.get() > 1);
            Assert.assertEquals(1L, busyObserver.maxConcurrentThreads.get());
        }

        @Test
        public void testMultiThreadedWithNPE() {
            Subscription subscription = (Subscription) Mockito.mock(Subscription.class);
            TestMultiThreadedObservable testMultiThreadedObservable = new TestMultiThreadedObservable(subscription, "one", "two", "three", null);
            Observable create = Observable.create(testMultiThreadedObservable);
            SafeObservableSubscription safeObservableSubscription = new SafeObservableSubscription(subscription);
            BusyObserver busyObserver = new BusyObserver();
            create.subscribe(new SynchronizedObserver(busyObserver, safeObservableSubscription));
            testMultiThreadedObservable.waitToFinish();
            System.out.println("maxConcurrentThreads: " + testMultiThreadedObservable.maxConcurrentThreads.get());
            Assert.assertTrue(busyObserver.onNextCount.get() < 4);
            Assert.assertTrue(busyObserver.onError);
            Assert.assertFalse(busyObserver.onCompleted);
            Assert.assertTrue(testMultiThreadedObservable.maxConcurrentThreads.get() > 1);
            Assert.assertEquals(1L, busyObserver.maxConcurrentThreads.get());
        }

        @Test
        public void testMultiThreadedWithNPEAndLock() {
            Subscription subscription = (Subscription) Mockito.mock(Subscription.class);
            TestMultiThreadedObservable testMultiThreadedObservable = new TestMultiThreadedObservable(subscription, "one", "two", "three", null);
            Observable create = Observable.create(testMultiThreadedObservable);
            SafeObservableSubscription safeObservableSubscription = new SafeObservableSubscription(subscription);
            BusyObserver busyObserver = new BusyObserver();
            Object obj = new Object();
            ExternalBusyThread externalBusyThread = new ExternalBusyThread(busyObserver, obj, 10, 100);
            SynchronizedObserver synchronizedObserver = new SynchronizedObserver(busyObserver, safeObservableSubscription, obj);
            externalBusyThread.start();
            create.subscribe(synchronizedObserver);
            testMultiThreadedObservable.waitToFinish();
            try {
                externalBusyThread.join(10000L);
                Assert.assertFalse(externalBusyThread.isAlive());
                Assert.assertFalse(externalBusyThread.fail);
            } catch (InterruptedException e) {
            }
            System.out.println("maxConcurrentThreads: " + testMultiThreadedObservable.maxConcurrentThreads.get());
            Assert.assertTrue(busyObserver.onNextCount.get() < 4);
            Assert.assertTrue(busyObserver.onError);
            Assert.assertFalse(busyObserver.onCompleted);
            Assert.assertTrue(testMultiThreadedObservable.maxConcurrentThreads.get() > 1);
            Assert.assertEquals(1L, busyObserver.maxConcurrentThreads.get());
        }

        @Test
        public void testMultiThreadedWithNPEinMiddle() {
            Subscription subscription = (Subscription) Mockito.mock(Subscription.class);
            TestMultiThreadedObservable testMultiThreadedObservable = new TestMultiThreadedObservable(subscription, "one", "two", "three", null, "four", "five", "six", "seven", "eight", "nine");
            Observable create = Observable.create(testMultiThreadedObservable);
            SafeObservableSubscription safeObservableSubscription = new SafeObservableSubscription(subscription);
            BusyObserver busyObserver = new BusyObserver();
            create.subscribe(new SynchronizedObserver(busyObserver, safeObservableSubscription));
            testMultiThreadedObservable.waitToFinish();
            System.out.println("maxConcurrentThreads: " + testMultiThreadedObservable.maxConcurrentThreads.get());
            System.out.println("onNext count: " + busyObserver.onNextCount.get());
            Assert.assertTrue(busyObserver.onNextCount.get() < 9);
            Assert.assertTrue(busyObserver.onError);
            Assert.assertFalse(busyObserver.onCompleted);
            Assert.assertTrue(testMultiThreadedObservable.maxConcurrentThreads.get() > 1);
            Assert.assertEquals(1L, busyObserver.maxConcurrentThreads.get());
        }

        @Test
        public void testMultiThreadedWithNPEinMiddleAndLock() {
            Subscription subscription = (Subscription) Mockito.mock(Subscription.class);
            TestMultiThreadedObservable testMultiThreadedObservable = new TestMultiThreadedObservable(subscription, "one", "two", "three", null, "four", "five", "six", "seven", "eight", "nine");
            Observable create = Observable.create(testMultiThreadedObservable);
            SafeObservableSubscription safeObservableSubscription = new SafeObservableSubscription(subscription);
            BusyObserver busyObserver = new BusyObserver();
            Object obj = new Object();
            ExternalBusyThread externalBusyThread = new ExternalBusyThread(busyObserver, obj, 10, 100);
            SynchronizedObserver synchronizedObserver = new SynchronizedObserver(busyObserver, safeObservableSubscription, obj);
            externalBusyThread.start();
            create.subscribe(synchronizedObserver);
            testMultiThreadedObservable.waitToFinish();
            try {
                externalBusyThread.join(10000L);
                Assert.assertFalse(externalBusyThread.isAlive());
                Assert.assertFalse(externalBusyThread.fail);
            } catch (InterruptedException e) {
            }
            System.out.println("maxConcurrentThreads: " + testMultiThreadedObservable.maxConcurrentThreads.get());
            System.out.println("onNext count: " + busyObserver.onNextCount.get());
            Assert.assertTrue(busyObserver.onNextCount.get() < 9);
            Assert.assertTrue(busyObserver.onError);
            Assert.assertFalse(busyObserver.onCompleted);
            Assert.assertTrue(testMultiThreadedObservable.maxConcurrentThreads.get() > 1);
            Assert.assertEquals(1L, busyObserver.maxConcurrentThreads.get());
        }

        @Test
        public void runConcurrencyTest() {
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(20);
            try {
                try {
                    TestConcurrencyObserver testConcurrencyObserver = new TestConcurrencyObserver();
                    SynchronizedObserver synchronizedObserver = new SynchronizedObserver(testConcurrencyObserver, new SafeObservableSubscription());
                    Future<?> submit = newFixedThreadPool.submit(new OnNextThread(synchronizedObserver, UtilLoggingLevel.FINER_INT));
                    Future<?> submit2 = newFixedThreadPool.submit(new OnNextThread(synchronizedObserver, 5000));
                    Future<?> submit3 = newFixedThreadPool.submit(new OnNextThread(synchronizedObserver, 75000));
                    Future<?> submit4 = newFixedThreadPool.submit(new OnNextThread(synchronizedObserver, 13500));
                    Future<?> submit5 = newFixedThreadPool.submit(new OnNextThread(synchronizedObserver, UtilLoggingLevel.SEVERE_INT));
                    Future<?> submit6 = newFixedThreadPool.submit(new OnNextThread(synchronizedObserver, 15000));
                    Future<?> submit7 = newFixedThreadPool.submit(new OnNextThread(synchronizedObserver, 7500));
                    Future<?> submit8 = newFixedThreadPool.submit(new OnNextThread(synchronizedObserver, 23500));
                    Future<?> submit9 = newFixedThreadPool.submit(new CompletionThread(synchronizedObserver, TestConcurrencyObserverEvent.onCompleted, submit, submit2, submit3, submit4));
                    try {
                        Thread.sleep(1L);
                    } catch (InterruptedException e) {
                    }
                    waitOnThreads(submit, submit2, submit3, submit4, submit5, submit6, submit7, submit8, submit9, newFixedThreadPool.submit(new CompletionThread(synchronizedObserver, TestConcurrencyObserverEvent.onCompleted, submit4, submit6, submit7)), newFixedThreadPool.submit(new CompletionThread(synchronizedObserver, TestConcurrencyObserverEvent.onCompleted, submit4, submit6, submit7)), newFixedThreadPool.submit(new CompletionThread(synchronizedObserver, TestConcurrencyObserverEvent.onCompleted, submit4, submit6, submit7)), newFixedThreadPool.submit(new CompletionThread(synchronizedObserver, TestConcurrencyObserverEvent.onCompleted, submit4, submit6, submit7)), newFixedThreadPool.submit(new CompletionThread(synchronizedObserver, TestConcurrencyObserverEvent.onError, submit, submit2, submit3, submit4)), newFixedThreadPool.submit(new CompletionThread(synchronizedObserver, TestConcurrencyObserverEvent.onError, submit, submit2, submit3, submit4)), newFixedThreadPool.submit(new CompletionThread(synchronizedObserver, TestConcurrencyObserverEvent.onError, submit, submit2, submit3, submit4)), newFixedThreadPool.submit(new CompletionThread(synchronizedObserver, TestConcurrencyObserverEvent.onError, submit, submit2, submit3, submit4)));
                    testConcurrencyObserver.assertEvents(null);
                    newFixedThreadPool.shutdown();
                    try {
                        newFixedThreadPool.awaitTermination(5000L, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e2) {
                        e2.printStackTrace();
                    }
                } catch (Throwable th) {
                    Assert.fail("Concurrency test failed: " + th.getMessage());
                    th.printStackTrace();
                    newFixedThreadPool.shutdown();
                    try {
                        newFixedThreadPool.awaitTermination(5000L, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e3) {
                        e3.printStackTrace();
                    }
                }
            } catch (Throwable th2) {
                newFixedThreadPool.shutdown();
                try {
                    newFixedThreadPool.awaitTermination(5000L, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e4) {
                    e4.printStackTrace();
                }
                throw th2;
            }
        }

        private static void waitOnThreads(Future<?>... futureArr) {
            for (Future<?> future : futureArr) {
                try {
                    future.get(10L, TimeUnit.SECONDS);
                } catch (Throwable th) {
                    System.err.println("Failed while waiting on future.");
                    th.printStackTrace();
                }
            }
        }
    }

    public SynchronizedObserver(Observer<? super T> observer, SafeObservableSubscription safeObservableSubscription) {
        this.finishRequested = false;
        this.finished = false;
        this.observer = observer;
        this.subscription = safeObservableSubscription;
        this.lock = this;
    }

    public SynchronizedObserver(Observer<? super T> observer, SafeObservableSubscription safeObservableSubscription, Object obj) {
        this.finishRequested = false;
        this.finished = false;
        this.observer = observer;
        this.subscription = safeObservableSubscription;
        this.lock = obj;
    }

    public SynchronizedObserver(Observer<? super T> observer) {
        this(observer, new SafeObservableSubscription());
    }

    @Override // rx.Observer
    public void onNext(T t) {
        if (this.finished || this.finishRequested || this.subscription.isUnsubscribed()) {
            return;
        }
        synchronized (this.lock) {
            if (this.finished || this.finishRequested || this.subscription.isUnsubscribed()) {
                return;
            }
            this.observer.onNext(t);
        }
    }

    @Override // rx.Observer
    public void onError(Throwable th) {
        if (this.finished || this.subscription.isUnsubscribed()) {
            return;
        }
        this.finishRequested = true;
        synchronized (this.lock) {
            if (this.finished || this.subscription.isUnsubscribed()) {
                return;
            }
            this.observer.onError(th);
            this.finished = true;
        }
    }

    @Override // rx.Observer
    public void onCompleted() {
        if (this.finished || this.subscription.isUnsubscribed()) {
            return;
        }
        this.finishRequested = true;
        synchronized (this.lock) {
            if (this.finished || this.subscription.isUnsubscribed()) {
                return;
            }
            this.observer.onCompleted();
            this.finished = true;
        }
    }
}
