package com.lambdaworks.redis.pubsub;

import com.lambdaworks.redis.RedisReactiveCommandsImpl;
import com.lambdaworks.redis.api.rx.Success;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandArgs;
import com.lambdaworks.redis.protocol.CommandType;
import com.lambdaworks.redis.pubsub.api.rx.ChannelMessage;
import com.lambdaworks.redis.pubsub.api.rx.PatternMessage;
import com.lambdaworks.redis.pubsub.api.rx.RedisPubSubReactiveCommands;
import rx.Observable;
import rx.Subscriber;

/* loaded from: input_file:com/lambdaworks/redis/pubsub/RedisPubSubReactiveCommandsImpl.class */
public class RedisPubSubReactiveCommandsImpl<K, V> extends RedisReactiveCommandsImpl<K, V> implements RedisPubSubReactiveCommands<K, V> {

    /* loaded from: input_file:com/lambdaworks/redis/pubsub/RedisPubSubReactiveCommandsImpl$PubSubObservable.class */
    private class PubSubObservable<T> implements Observable.OnSubscribe<T> {
        private SubscriptionPubSubListener<K, V, T> listener;

        public PubSubObservable(SubscriptionPubSubListener<K, V, T> subscriptionPubSubListener) {
            this.listener = subscriptionPubSubListener;
        }

        public void call(Subscriber<? super T> subscriber) {
            this.listener.activate(subscriber);
            subscriber.onStart();
            RedisPubSubReactiveCommandsImpl.this.addListener(this.listener);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/lambdaworks/redis/pubsub/RedisPubSubReactiveCommandsImpl$SubscriptionPubSubListener.class */
    public static class SubscriptionPubSubListener<K, V, T> extends RedisPubSubAdapter<K, V> {
        protected Subscriber<? super T> subscriber;

        private SubscriptionPubSubListener() {
        }

        public void activate(Subscriber<? super T> subscriber) {
            this.subscriber = subscriber;
        }
    }

    public RedisPubSubReactiveCommandsImpl(StatefulRedisPubSubConnection<K, V> statefulRedisPubSubConnection, RedisCodec<K, V> redisCodec) {
        super(statefulRedisPubSubConnection, redisCodec);
        this.connection = statefulRedisPubSubConnection;
    }

    @Override // com.lambdaworks.redis.pubsub.api.rx.RedisPubSubReactiveCommands
    public void addListener(RedisPubSubListener<K, V> redisPubSubListener) {
        getStatefulConnection().addListener(redisPubSubListener);
    }

    @Override // com.lambdaworks.redis.pubsub.api.rx.RedisPubSubReactiveCommands
    public Observable<PatternMessage<K, V>> observePatterns() {
        return Observable.create(new PubSubObservable(new SubscriptionPubSubListener<K, V, PatternMessage<K, V>>() { // from class: com.lambdaworks.redis.pubsub.RedisPubSubReactiveCommandsImpl.1
            @Override // com.lambdaworks.redis.pubsub.RedisPubSubAdapter, com.lambdaworks.redis.pubsub.RedisPubSubListener
            public void message(K k, K k2, V v) {
                if (this.subscriber == 0) {
                    return;
                }
                if (!this.subscriber.isUnsubscribed()) {
                    this.subscriber.onNext(new PatternMessage(k, k2, v));
                    return;
                }
                this.subscriber.onCompleted();
                RedisPubSubReactiveCommandsImpl.this.removeListener(this);
                this.subscriber = null;
            }
        }));
    }

    @Override // com.lambdaworks.redis.pubsub.api.rx.RedisPubSubReactiveCommands
    public Observable<ChannelMessage<K, V>> observeChannels() {
        return Observable.create(new PubSubObservable(new SubscriptionPubSubListener<K, V, ChannelMessage<K, V>>() { // from class: com.lambdaworks.redis.pubsub.RedisPubSubReactiveCommandsImpl.2
            @Override // com.lambdaworks.redis.pubsub.RedisPubSubAdapter, com.lambdaworks.redis.pubsub.RedisPubSubListener
            public void message(K k, V v) {
                if (this.subscriber == 0) {
                    return;
                }
                if (!this.subscriber.isUnsubscribed()) {
                    this.subscriber.onNext(new ChannelMessage(k, v));
                    return;
                }
                this.subscriber.onCompleted();
                RedisPubSubReactiveCommandsImpl.this.removeListener(this);
                this.subscriber = null;
            }
        }));
    }

    @Override // com.lambdaworks.redis.pubsub.api.rx.RedisPubSubReactiveCommands
    public void removeListener(RedisPubSubListener<K, V> redisPubSubListener) {
        getStatefulConnection().removeListener(redisPubSubListener);
    }

    @Override // com.lambdaworks.redis.pubsub.api.rx.RedisPubSubReactiveCommands
    public Observable<Success> psubscribe(K... kArr) {
        return getSuccessObservable(createObservable(CommandType.PSUBSCRIBE, new PubSubOutput(this.codec), args(kArr)));
    }

    @Override // com.lambdaworks.redis.pubsub.api.rx.RedisPubSubReactiveCommands
    public Observable<Success> punsubscribe(K... kArr) {
        return getSuccessObservable(createObservable(CommandType.PUNSUBSCRIBE, new PubSubOutput(this.codec), args(kArr)));
    }

    @Override // com.lambdaworks.redis.pubsub.api.rx.RedisPubSubReactiveCommands
    public Observable<Success> subscribe(K... kArr) {
        return getSuccessObservable(createObservable(CommandType.SUBSCRIBE, new PubSubOutput(this.codec), args(kArr)));
    }

    @Override // com.lambdaworks.redis.pubsub.api.rx.RedisPubSubReactiveCommands
    public Observable<Success> unsubscribe(K... kArr) {
        return getSuccessObservable(createObservable(CommandType.UNSUBSCRIBE, new PubSubOutput(this.codec), args(kArr)));
    }

    private CommandArgs<K, V> args(K... kArr) {
        CommandArgs<K, V> commandArgs = new CommandArgs<>(this.codec);
        commandArgs.addKeys(kArr);
        return commandArgs;
    }

    @Override // com.lambdaworks.redis.RedisReactiveCommandsImpl, com.lambdaworks.redis.api.rx.RedisReactiveCommands
    public StatefulRedisPubSubConnection<K, V> getStatefulConnection() {
        return (StatefulRedisPubSubConnection) super.getStatefulConnection();
    }
}
