package rx.operators;

import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
import rx.Notification;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subjects.PublishSubject;
import rx.subscriptions.Subscriptions;
import rx.util.Exceptions;

/* loaded from: input_file:WEB-INF/lib/rxjava-core-0.14.2.jar:rx/operators/OperationNext.class */
public final class OperationNext {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/rxjava-core-0.14.2.jar:rx/operators/OperationNext$NextIterator.class */
    public static class NextIterator<T> implements Iterator<T> {
        private final NextObserver<? extends T> observer;

        private NextIterator(NextObserver<? extends T> nextObserver) {
            this.observer = nextObserver;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return !this.observer.isCompleted(false);
        }

        @Override // java.util.Iterator
        public T next() {
            if (this.observer.isCompleted(true)) {
                throw new IllegalStateException("Observable is completed");
            }
            this.observer.await();
            try {
                return this.observer.takeNext();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw Exceptions.propagate(e);
            }
        }

        @Override // java.util.Iterator
        public void remove() {
            throw new UnsupportedOperationException("Read only iterator");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/rxjava-core-0.14.2.jar:rx/operators/OperationNext$NextObserver.class */
    public static class NextObserver<T> implements Observer<Notification<? extends T>> {
        private final BlockingQueue<Notification<? extends T>> buf;
        private final AtomicBoolean waiting;

        private NextObserver() {
            this.buf = new ArrayBlockingQueue(1);
            this.waiting = new AtomicBoolean(false);
        }

        @Override // rx.Observer
        public void onCompleted() {
        }

        @Override // rx.Observer
        public void onError(Throwable th) {
        }

        @Override // rx.Observer
        public void onNext(Notification<? extends T> notification) {
            if (this.waiting.getAndSet(false) || !notification.isOnNext()) {
                Notification<? extends T> notification2 = notification;
                while (!this.buf.offer(notification2)) {
                    Notification<? extends T> poll = this.buf.poll();
                    if (!poll.isOnNext()) {
                        notification2 = poll;
                    }
                }
            }
        }

        public void await() {
            this.waiting.set(true);
        }

        public boolean isCompleted(boolean z) {
            Notification<? extends T> peek = this.buf.peek();
            if (peek == null) {
                return false;
            }
            if (!peek.isOnError()) {
                return peek.isOnCompleted();
            }
            if (z) {
                throw Exceptions.propagate(peek.getThrowable());
            }
            return true;
        }

        public T takeNext() throws InterruptedException {
            Notification<? extends T> take = this.buf.take();
            if (take.isOnError()) {
                throw Exceptions.propagate(take.getThrowable());
            }
            if (take.isOnCompleted()) {
                throw new IllegalStateException("Observable is completed");
            }
            return take.getValue();
        }
    }

    /* loaded from: input_file:WEB-INF/lib/rxjava-core-0.14.2.jar:rx/operators/OperationNext$UnitTest.class */
    public static class UnitTest {
        private final ExecutorService executor = Executors.newSingleThreadExecutor();

        /* loaded from: input_file:WEB-INF/lib/rxjava-core-0.14.2.jar:rx/operators/OperationNext$UnitTest$TestException.class */
        private static class TestException extends RuntimeException {
            private TestException() {
            }
        }

        @Test
        public void testNext() throws Throwable {
            PublishSubject create = PublishSubject.create();
            Iterator<String> it = OperationNext.next(create).iterator();
            Assert.assertTrue(it.hasNext());
            Future<String> nextAsync = nextAsync(it);
            Thread.sleep(100L);
            create.onNext("one");
            Assert.assertEquals("one", nextAsync.get());
            Assert.assertTrue(it.hasNext());
            Future<String> nextAsync2 = nextAsync(it);
            Thread.sleep(100L);
            create.onNext("two");
            Assert.assertEquals("two", nextAsync2.get());
            Assert.assertTrue(it.hasNext());
            create.onCompleted();
            Assert.assertFalse(it.hasNext());
        }

        @Test(expected = TestException.class)
        public void testOnError() throws Throwable {
            PublishSubject create = PublishSubject.create();
            Iterator<String> it = OperationNext.next(create).iterator();
            Assert.assertTrue(it.hasNext());
            Future<String> nextAsync = nextAsync(it);
            Thread.sleep(100L);
            create.onNext("one");
            Assert.assertEquals("one", nextAsync.get());
            Assert.assertTrue(it.hasNext());
            Future<String> nextAsync2 = nextAsync(it);
            Thread.sleep(100L);
            create.onError(new TestException());
            try {
                nextAsync2.get();
            } catch (ExecutionException e) {
                throw e.getCause();
            }
        }

        @Test
        public void testOnErrorViaHasNext() throws Throwable {
            PublishSubject create = PublishSubject.create();
            Iterator<String> it = OperationNext.next(create).iterator();
            Assert.assertTrue(it.hasNext());
            Future<String> nextAsync = nextAsync(it);
            Thread.sleep(100L);
            create.onNext("one");
            Assert.assertEquals("one", nextAsync.get());
            Assert.assertTrue(it.hasNext());
            nextAsync(it);
            Thread.sleep(100L);
            create.onError(new TestException());
            try {
                Assert.assertFalse(it.hasNext());
            } catch (Throwable th) {
                Assert.fail("should not have received exception");
                th.printStackTrace();
            }
        }

        private Future<String> nextAsync(final Iterator<String> it) throws Throwable {
            return this.executor.submit(new Callable<String>() { // from class: rx.operators.OperationNext.UnitTest.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public String call() throws Exception {
                    return (String) it.next();
                }
            });
        }

        @Test
        public void testNoBufferingOrBlockingOfSequence() throws Throwable {
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            final CountDownLatch countDownLatch2 = new CountDownLatch(30);
            final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
            final AtomicInteger atomicInteger = new AtomicInteger(0);
            Iterator it = OperationNext.next(Observable.create(new Observable.OnSubscribeFunc<Integer>() { // from class: rx.operators.OperationNext.UnitTest.2
                @Override // rx.Observable.OnSubscribeFunc
                public Subscription onSubscribe(final Observer<? super Integer> observer) {
                    new Thread(new Runnable() { // from class: rx.operators.OperationNext.UnitTest.2.1
                        @Override // java.lang.Runnable
                        public void run() {
                            while (atomicBoolean.get()) {
                                try {
                                    try {
                                        observer.onNext(Integer.valueOf(atomicInteger.incrementAndGet()));
                                        countDownLatch2.countDown();
                                    } catch (Throwable th) {
                                        observer.onError(th);
                                        countDownLatch.countDown();
                                        return;
                                    }
                                } catch (Throwable th2) {
                                    countDownLatch.countDown();
                                    throw th2;
                                }
                            }
                            observer.onCompleted();
                            countDownLatch.countDown();
                        }
                    }).start();
                    return Subscriptions.empty();
                }
            })).iterator();
            Assert.assertTrue(it.hasNext());
            int intValue = ((Integer) it.next()).intValue();
            Assert.assertTrue(it.hasNext());
            int intValue2 = ((Integer) it.next()).intValue();
            Assert.assertTrue("a and b should be different", intValue != intValue2);
            countDownLatch2.await(8000L, TimeUnit.MILLISECONDS);
            Assert.assertTrue(it.hasNext());
            int intValue3 = ((Integer) it.next()).intValue();
            Assert.assertTrue("c should not just be the next in sequence", intValue3 != intValue2 + 1);
            Assert.assertTrue("expected that c [" + intValue3 + "] is higher than or equal to 30", intValue3 >= 30);
            Assert.assertTrue(it.hasNext());
            atomicBoolean.set(false);
            countDownLatch.await();
            Assert.assertFalse(it.hasNext());
            System.out.println("a: " + intValue + " b: " + intValue2 + " c: " + intValue3);
        }
    }

    public static <T> Iterable<T> next(Observable<? extends T> observable) {
        NextObserver nextObserver = new NextObserver();
        final NextIterator nextIterator = new NextIterator(nextObserver);
        observable.materialize().subscribe(nextObserver);
        return new Iterable<T>() { // from class: rx.operators.OperationNext.1
            @Override // java.lang.Iterable
            public Iterator<T> iterator() {
                return NextIterator.this;
            }
        };
    }
}
