package org.opencord.olt.impl;

import com.google.common.base.Strings;
import java.util.ArrayList;
import java.util.Dictionary;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.onlab.packet.VlanId;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.event.AbstractListenerManager;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Port;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.opencord.olt.AccessDeviceEvent;
import org.opencord.olt.AccessDeviceListener;
import org.opencord.olt.AccessDevicePort;
import org.opencord.olt.AccessDeviceService;
import org.opencord.olt.DiscoveredSubscriber;
import org.opencord.olt.FlowOperation;
import org.opencord.olt.OltDeviceServiceInterface;
import org.opencord.olt.OltFlowServiceInterface;
import org.opencord.olt.OltMeterServiceInterface;
import org.opencord.olt.ServiceKey;
import org.opencord.sadis.BaseInformationService;
import org.opencord.sadis.SadisService;
import org.opencord.sadis.SubscriberAndDeviceInformation;
import org.opencord.sadis.UniTagInformation;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Modified;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(immediate = true, property = {"defaultBpId:String=Default", "multicastServiceName:String=MC", "flowProcessingThreads:Integer=32", "flowExecutorQueueSize:Integer=128", "subscriberProcessingThreads:Integer=24", "requeueDelay:Integer=500"})
/* loaded from: input_file:org/opencord/olt/impl/Olt.class */
public class Olt extends AbstractListenerManager<AccessDeviceEvent, AccessDeviceListener> implements AccessDeviceService {

    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    protected DeviceService deviceService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    protected ComponentConfigService cfgService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    protected MastershipService mastershipService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    protected ClusterService clusterService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    protected LeadershipService leadershipService;

    @Reference(cardinality = ReferenceCardinality.OPTIONAL, bind = "bindSadisService", unbind = "unbindSadisService", policy = ReferencePolicy.DYNAMIC)
    protected volatile SadisService sadisService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    protected OltDeviceServiceInterface oltDeviceService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    protected OltFlowServiceInterface oltFlowService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    protected OltMeterServiceInterface oltMeterService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    protected StorageService storageService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    protected CoreService coreService;
    protected ApplicationId appId;
    private static final String ONOS_OLT_SERVICE = "onos/olt-service";
    protected Map<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>> eventsQueues;
    protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
    protected ExecutorService flowsExecutor;
    protected ExecutorService subscriberExecutor;
    private static final String APP_NAME = "org.opencord.olt";
    protected String defaultBpId = OsgiPropertyConstants.DEFAULT_BP_ID_DEFAULT;
    protected String multicastServiceName = OsgiPropertyConstants.DEFAULT_MCAST_SERVICE_NAME_DEFAULT;
    protected int flowProcessingThreads = 32;
    protected int flowExecutorQueueSize = OsgiPropertyConstants.FLOW_EXECUTOR_QUEUE_SIZE_DEFAULT;
    protected int subscriberProcessingThreads = 24;
    protected int requeueDelay = OsgiPropertyConstants.REQUEUE_DELAY_DEFAULT;
    private final Logger log = LoggerFactory.getLogger(getClass());
    protected OltDeviceListener deviceListener = new OltDeviceListener();
    protected ScheduledExecutorService discoveredSubscriberExecutor = Executors.newSingleThreadScheduledExecutor(Tools.groupedThreads("onos/olt", "discovered-cp-%d", this.log));
    protected ScheduledExecutorService queueExecutor = Executors.newSingleThreadScheduledExecutor(Tools.groupedThreads("onos/olt", "discovered-cp-restore-%d", this.log));
    private final ReentrantReadWriteLock queueLock = new ReentrantReadWriteLock();
    private final Lock queueWriteLock = this.queueLock.writeLock();
    private final Lock queueReadLock = this.queueLock.readLock();

    /* renamed from: org.opencord.olt.impl.Olt$1, reason: invalid class name */
    /* loaded from: input_file:org/opencord/olt/impl/Olt$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$onosproject$net$device$DeviceEvent$Type = new int[DeviceEvent.Type.values().length];

        static {
            try {
                $SwitchMap$org$onosproject$net$device$DeviceEvent$Type[DeviceEvent.Type.PORT_STATS_UPDATED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$onosproject$net$device$DeviceEvent$Type[DeviceEvent.Type.DEVICE_ADDED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$onosproject$net$device$DeviceEvent$Type[DeviceEvent.Type.PORT_ADDED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$onosproject$net$device$DeviceEvent$Type[DeviceEvent.Type.PORT_REMOVED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$onosproject$net$device$DeviceEvent$Type[DeviceEvent.Type.PORT_UPDATED.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$onosproject$net$device$DeviceEvent$Type[DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$onosproject$net$device$DeviceEvent$Type[DeviceEvent.Type.DEVICE_REMOVED.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
        }
    }

    /* loaded from: input_file:org/opencord/olt/impl/Olt$OltDeviceListener.class */
    protected class OltDeviceListener implements DeviceListener {
        private final Logger log = LoggerFactory.getLogger(getClass());
        protected ExecutorService eventExecutor;

        public OltDeviceListener() {
            this.eventExecutor = Executors.newFixedThreadPool(Olt.this.flowProcessingThreads, Tools.groupedThreads("onos/olt-device-listener-event", "event-%d", this.log));
        }

        public void deactivate() {
            this.eventExecutor.shutdown();
        }

        public void event(DeviceEvent deviceEvent) {
            if (this.log.isTraceEnabled()) {
                Logger logger = this.log;
                Object[] objArr = new Object[3];
                objArr[0] = deviceEvent.type();
                objArr[1] = ((Device) deviceEvent.subject()).id();
                objArr[2] = deviceEvent.port() != null ? deviceEvent.port().number() : null;
                logger.trace("OltListener receives event {} for: {}/{}", objArr);
            }
            this.eventExecutor.execute(() -> {
                if (this.log.isTraceEnabled()) {
                    Logger logger2 = this.log;
                    Object[] objArr2 = new Object[3];
                    objArr2[0] = deviceEvent.type();
                    objArr2[1] = ((Device) deviceEvent.subject()).id();
                    objArr2[2] = deviceEvent.port() != null ? deviceEvent.port().number() : null;
                    logger2.trace("OltListener Executor receives event {} for: {}/{}", objArr2);
                }
                boolean isOlt = Olt.this.oltDeviceService.isOlt((Device) deviceEvent.subject());
                DeviceId id = ((Device) deviceEvent.subject()).id();
                switch (AnonymousClass1.$SwitchMap$org$onosproject$net$device$DeviceEvent$Type[deviceEvent.type().ordinal()]) {
                    case 1:
                    case 2:
                        return;
                    case OsgiPropertyConstants.ZERO_REFERENCE_METER_COUNT_DEFAULT /* 3 */:
                    case 4:
                        if (!isOlt) {
                            this.log.trace("Ignoring event {}, this is not an OLT device", id);
                            return;
                        } else if (Olt.this.oltDeviceService.isLocalLeader(id)) {
                            handleOltPort((DeviceEvent.Type) deviceEvent.type(), (Device) deviceEvent.subject(), deviceEvent.port());
                            return;
                        } else {
                            this.log.trace("Device {} is not local to this node", id);
                            return;
                        }
                    case OsgiPropertyConstants.REQUIRED_DRIVERS_PROPERTY_DELAY_DEFAULT /* 5 */:
                        if (!isOlt) {
                            this.log.trace("Ignoring event {}, this is not an OLT device", id);
                            return;
                        }
                        if (!Olt.this.deviceService.isAvailable(id)) {
                            this.log.debug("Ignoring port event {} on {} as it is disconnected", deviceEvent, id);
                            return;
                        } else if (Olt.this.oltDeviceService.isLocalLeader(id)) {
                            handleOltPort((DeviceEvent.Type) deviceEvent.type(), (Device) deviceEvent.subject(), deviceEvent.port());
                            return;
                        } else {
                            this.log.trace("Device {} is not local to this node", id);
                            return;
                        }
                    case 6:
                        if (!isOlt) {
                            this.log.trace("Ignoring event {}, this is not an OLT device", id);
                            return;
                        }
                        if (Olt.this.deviceService.isAvailable(id)) {
                            if (Olt.this.oltDeviceService.isLocalLeader(id)) {
                                this.log.info("Handling available device: {}", id);
                                handleExistingPorts();
                                return;
                            } else {
                                if (this.log.isTraceEnabled()) {
                                    this.log.trace("Device {} is not local to this node, not handling available device", id);
                                    return;
                                }
                                return;
                            }
                        }
                        if (Olt.this.deviceService.isAvailable(id) || !Olt.this.deviceService.getPorts(id).isEmpty()) {
                            this.log.info("Device {} availability changed to false, but ports are still available, assuming temporary disconnection.", id);
                            if (this.log.isTraceEnabled()) {
                                this.log.trace("Available ports: {}", Olt.this.deviceService.getPorts(id));
                                return;
                            }
                            return;
                        }
                        this.log.info("Device {} availability changed to false and ports are empty, purging meters and flows", id);
                        Olt.this.oltFlowService.purgeDeviceFlows(id);
                        Olt.this.oltMeterService.purgeDeviceMeters(id);
                        if (Olt.this.oltDeviceService.isLocalLeader(id)) {
                            this.log.debug("Master, clearing cp status for {}", id);
                            clearQueueForDevice(id);
                            return;
                        }
                        return;
                    case 7:
                        if (!isOlt) {
                            this.log.trace("Ignoring event {}, this is not an OLT device", id);
                            return;
                        }
                        this.log.info("Device {} Removed, purging meters and flows", id);
                        Olt.this.oltFlowService.purgeDeviceFlows(id);
                        Olt.this.oltMeterService.purgeDeviceMeters(id);
                        if (Olt.this.oltDeviceService.isLocalLeader(id)) {
                            this.log.debug("Master, clearing cp status for {}", id);
                            clearQueueForDevice(id);
                            return;
                        }
                        return;
                    default:
                        if (this.log.isTraceEnabled()) {
                            this.log.trace("Not handling event: {}, ", deviceEvent);
                            return;
                        }
                        return;
                }
            });
        }

        protected void clearQueueForDevice(DeviceId deviceId) {
            try {
                Olt.this.queueWriteLock.lock();
                for (Map.Entry<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>> entry : Olt.this.eventsQueues.entrySet()) {
                    if (entry.getKey().deviceId().equals(deviceId)) {
                        Olt.this.eventsQueues.remove(entry.getKey());
                        this.log.debug("Removing key from queue {}", entry.getKey());
                    }
                }
            } finally {
                Olt.this.queueWriteLock.unlock();
            }
        }

        protected void handleOltPort(DeviceEvent.Type type, Device device, Port port) {
            SubscriberAndDeviceInformation subscriberAndDeviceInformation;
            Logger logger = this.log;
            Object[] objArr = new Object[4];
            objArr[0] = type;
            objArr[1] = OltUtils.portWithName(port);
            objArr[2] = port.isEnabled() ? "ENABLED" : "DISABLED";
            objArr[3] = device.id();
            logger.info("OltDeviceListener receives event {} for port {} with status {} on device {}", objArr);
            boolean isNniPort = Olt.this.oltDeviceService.isNniPort(device, port.number());
            if (!isNniPort && type == DeviceEvent.Type.PORT_ADDED) {
                this.log.debug("Ignoring PORT_ADD on UNI port {}", OltUtils.portWithName(port));
                return;
            }
            AccessDevicePort accessDevicePort = new AccessDevicePort(port);
            if (port.isEnabled()) {
                if (isNniPort) {
                    FlowOperation flowOperation = FlowOperation.ADD;
                    if (type == DeviceEvent.Type.PORT_REMOVED) {
                        this.log.debug("NNI port went down, ignoring event as flows will be removed in the generic device cleanup");
                        return;
                    } else {
                        Olt.this.oltFlowService.handleNniFlows(device, port, flowOperation);
                        return;
                    }
                }
                Olt.this.post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, device.id(), port));
                boolean isSubscriberServiceProvisioned = Olt.this.oltFlowService.isSubscriberServiceProvisioned(accessDevicePort);
                DiscoveredSubscriber.Status status = DiscoveredSubscriber.Status.ADDED;
                if (type == DeviceEvent.Type.PORT_REMOVED) {
                    status = DiscoveredSubscriber.Status.REMOVED;
                    subscriberAndDeviceInformation = OltUtils.getProgrammedSubscriber(Olt.this.oltFlowService, accessDevicePort);
                } else {
                    subscriberAndDeviceInformation = Olt.this.subsService.get(OltUtils.getPortName(port));
                }
                if (subscriberAndDeviceInformation == null) {
                    this.log.error("Subscriber information not found in sadis for port {}", OltUtils.portWithName(port));
                    return;
                } else {
                    Olt.this.addSubscriberToQueue(new DiscoveredSubscriber(device, port, status, isSubscriberServiceProvisioned, subscriberAndDeviceInformation));
                    return;
                }
            }
            if (isNniPort) {
                this.log.debug("NNI port went down, ignoring event as flows will be removed in the generic device cleanup");
                return;
            }
            Olt.this.post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_REMOVED, device.id(), port));
            if (Olt.this.oltFlowService.hasDefaultEapol(port)) {
                SubscriberAndDeviceInformation subscriberAndDeviceInformation2 = Olt.this.subsService.get(OltUtils.getPortName(port));
                if (subscriberAndDeviceInformation2 == null) {
                    this.log.error("Subscriber information not found in sadis for port {}", OltUtils.portWithName(port));
                    return;
                } else {
                    Olt.this.addSubscriberToQueue(new DiscoveredSubscriber(device, port, DiscoveredSubscriber.Status.REMOVED, false, subscriberAndDeviceInformation2));
                    return;
                }
            }
            if (Olt.this.oltFlowService.isSubscriberServiceProvisioned(new AccessDevicePort(port))) {
                SubscriberAndDeviceInformation programmedSubscriber = OltUtils.getProgrammedSubscriber(Olt.this.oltFlowService, accessDevicePort);
                if (programmedSubscriber == null) {
                    programmedSubscriber = (SubscriberAndDeviceInformation) Olt.this.subsService.get(OltUtils.getPortName(port));
                }
                if (programmedSubscriber == null) {
                    this.log.error("Subscriber information not found in sadis for port {}", OltUtils.portWithName(port));
                } else {
                    Olt.this.addSubscriberToQueue(new DiscoveredSubscriber(device, port, DiscoveredSubscriber.Status.REMOVED, true, programmedSubscriber));
                }
            }
        }

        protected void handleExistingPorts() {
            for (DeviceId deviceId : Olt.this.getConnectedOlts()) {
                this.log.info("Handling existing OLT Ports for device {}", deviceId);
                if (Olt.this.oltDeviceService.isLocalLeader(deviceId)) {
                    for (Port port : Olt.this.deviceService.getPorts(deviceId)) {
                        if (!PortNumber.LOCAL.equals(port.number()) && port.isEnabled()) {
                            Olt.this.deviceListener.handleOltPort(DeviceEvent.Type.PORT_UPDATED, Olt.this.deviceService.getDevice(deviceId), port);
                        }
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/opencord/olt/impl/Olt$ThreadPoolQueue.class */
    public final class ThreadPoolQueue extends ArrayBlockingQueue<Runnable> {
        public ThreadPoolQueue(int i) {
            super(i);
        }

        @Override // java.util.concurrent.ArrayBlockingQueue, java.util.Queue, java.util.concurrent.BlockingQueue
        public boolean offer(Runnable runnable) {
            if (runnable == null) {
                return false;
            }
            try {
                put(runnable);
                return true;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return false;
            }
        }
    }

    @Activate
    protected void activate(ComponentContext componentContext) {
        this.cfgService.registerProperties(getClass());
        modified(componentContext);
        this.appId = this.coreService.registerApplication(APP_NAME);
        this.eventsQueues = this.storageService.consistentMapBuilder().withName("volt-subscriber-queues").withApplicationId(this.appId).withSerializer(Serializer.using(KryoNamespace.newBuilder().register(KryoNamespaces.API).register(new Class[]{ConnectPoint.class}).register(new Class[]{DiscoveredSubscriber.class}).register(new Class[]{DiscoveredSubscriber.Status.class}).register(new Class[]{SubscriberAndDeviceInformation.class}).register(new Class[]{UniTagInformation.class}).register(new Class[]{LinkedBlockingQueue.class}).build())).build().asJavaMap();
        this.deviceService.addListener(this.deviceListener);
        this.discoveredSubscriberExecutor.execute(this::processDiscoveredSubscribers);
        this.eventDispatcher.addSink(AccessDeviceEvent.class, this.listenerRegistry);
        this.log.info("Started");
        this.deviceListener.handleExistingPorts();
    }

    @Deactivate
    protected void deactivate(ComponentContext componentContext) {
        this.cfgService.unregisterProperties(getClass(), false);
        this.discoveredSubscriberExecutor.shutdown();
        this.deviceService.removeListener(this.deviceListener);
        this.flowsExecutor.shutdown();
        this.subscriberExecutor.shutdown();
        this.deviceListener.deactivate();
        this.log.info("Stopped");
    }

    @Modified
    public void modified(ComponentContext componentContext) {
        Dictionary properties = componentContext != null ? componentContext.getProperties() : new Properties();
        if (componentContext != null) {
            String str = Tools.get(properties, OsgiPropertyConstants.DEFAULT_BP_ID);
            this.defaultBpId = Strings.isNullOrEmpty(str) ? this.defaultBpId : str;
            String str2 = Tools.get(properties, OsgiPropertyConstants.DEFAULT_MCAST_SERVICE_NAME);
            this.multicastServiceName = Strings.isNullOrEmpty(str2) ? this.multicastServiceName : str2;
            String str3 = Tools.get(properties, OsgiPropertyConstants.FLOW_PROCESSING_THREADS);
            int i = this.flowProcessingThreads;
            this.flowProcessingThreads = Strings.isNullOrEmpty(str3) ? i : Integer.parseInt(str3.trim());
            String str4 = Tools.get(properties, OsgiPropertyConstants.FLOW_EXECUTOR_QUEUE_SIZE);
            int i2 = this.flowExecutorQueueSize;
            this.flowExecutorQueueSize = Strings.isNullOrEmpty(str4) ? i2 : Integer.parseInt(str4.trim());
            if (this.flowsExecutor == null || i != this.flowProcessingThreads || i2 != this.flowExecutorQueueSize) {
                if (this.flowsExecutor != null) {
                    this.flowsExecutor.shutdown();
                }
                this.flowsExecutor = new ThreadPoolExecutor(0, this.flowProcessingThreads, 30L, TimeUnit.SECONDS, new ThreadPoolQueue(this.flowExecutorQueueSize), new ThreadPoolExecutor.DiscardPolicy());
            }
            String str5 = Tools.get(properties, OsgiPropertyConstants.SUBSCRIBER_PROCESSING_THREADS);
            int i3 = this.subscriberProcessingThreads;
            this.subscriberProcessingThreads = Strings.isNullOrEmpty(str5) ? i3 : Integer.parseInt(str5.trim());
            if (this.subscriberExecutor == null || i3 != this.subscriberProcessingThreads) {
                if (this.subscriberExecutor != null) {
                    this.subscriberExecutor.shutdown();
                }
                this.subscriberExecutor = Executors.newFixedThreadPool(this.subscriberProcessingThreads, Tools.groupedThreads(ONOS_OLT_SERVICE, "subscriber-installer-%d"));
            }
            String str6 = Tools.get(properties, OsgiPropertyConstants.REQUEUE_DELAY);
            this.requeueDelay = Strings.isNullOrEmpty(str6) ? OsgiPropertyConstants.REQUEUE_DELAY_DEFAULT : Integer.parseInt(str6.trim());
        }
        this.log.info("Modified. Values = {}: {}, {}:{}, {}:{},{}:{}, {}:{}, {}:{}", new Object[]{OsgiPropertyConstants.DEFAULT_BP_ID, this.defaultBpId, OsgiPropertyConstants.DEFAULT_MCAST_SERVICE_NAME, this.multicastServiceName, OsgiPropertyConstants.FLOW_PROCESSING_THREADS, Integer.valueOf(this.flowProcessingThreads), OsgiPropertyConstants.FLOW_EXECUTOR_QUEUE_SIZE, Integer.valueOf(this.flowExecutorQueueSize), OsgiPropertyConstants.SUBSCRIBER_PROCESSING_THREADS, Integer.valueOf(this.subscriberProcessingThreads), OsgiPropertyConstants.REQUEUE_DELAY, Integer.valueOf(this.requeueDelay)});
    }

    public boolean provisionSubscriber(ConnectPoint connectPoint) {
        this.subscriberExecutor.submit(() -> {
            Device device = this.deviceService.getDevice(connectPoint.deviceId());
            Port port = this.deviceService.getPort(device.id(), connectPoint.port());
            AccessDevicePort accessDevicePort = new AccessDevicePort(port);
            if (this.oltDeviceService.isNniPort(device, port.number())) {
                this.log.warn("will not provision a subscriber on the NNI {}", accessDevicePort);
                return false;
            }
            this.log.info("Provisioning subscriber on {}", accessDevicePort);
            if (this.oltFlowService.isSubscriberServiceProvisioned(accessDevicePort)) {
                this.log.error("Subscriber on {} is already provisioned", accessDevicePort);
                return false;
            }
            SubscriberAndDeviceInformation subscriberAndDeviceInformation = this.subsService.get(OltUtils.getPortName(port));
            if (subscriberAndDeviceInformation == null) {
                this.log.error("Subscriber information not found in sadis for port {}", accessDevicePort);
                return false;
            }
            DiscoveredSubscriber discoveredSubscriber = new DiscoveredSubscriber(device, port, DiscoveredSubscriber.Status.ADDED, true, subscriberAndDeviceInformation);
            subscriberAndDeviceInformation.uniTagList().forEach(uniTagInformation -> {
                this.oltFlowService.updateProvisionedSubscriberStatus(new ServiceKey(accessDevicePort, uniTagInformation), true);
            });
            addSubscriberToQueue(discoveredSubscriber);
            return true;
        });
        return true;
    }

    public boolean removeSubscriber(ConnectPoint connectPoint) {
        this.subscriberExecutor.submit(() -> {
            Device device = this.deviceService.getDevice(DeviceId.deviceId(connectPoint.deviceId().toString()));
            Port port = this.deviceService.getPort(device.id(), connectPoint.port());
            AccessDevicePort accessDevicePort = new AccessDevicePort(port);
            if (this.oltDeviceService.isNniPort(device, port.number())) {
                this.log.warn("will not un-provision a subscriber on the NNI {}", accessDevicePort);
                return false;
            }
            this.log.info("Un-provisioning subscriber on {}", accessDevicePort);
            if (!this.oltFlowService.isSubscriberServiceProvisioned(accessDevicePort)) {
                this.log.error("Subscriber on {} is not provisioned", accessDevicePort);
                return false;
            }
            SubscriberAndDeviceInformation programmedSubscriber = OltUtils.getProgrammedSubscriber(this.oltFlowService, accessDevicePort);
            if (programmedSubscriber == null) {
                programmedSubscriber = (SubscriberAndDeviceInformation) this.subsService.get(OltUtils.getPortName(port));
            }
            if (programmedSubscriber == null) {
                this.log.error("Subscriber information not found in programmed subscribers or sadis for port {}", accessDevicePort);
                return false;
            }
            DiscoveredSubscriber discoveredSubscriber = new DiscoveredSubscriber(device, port, DiscoveredSubscriber.Status.ADMIN_REMOVED, true, programmedSubscriber);
            programmedSubscriber.uniTagList().forEach(uniTagInformation -> {
                this.oltFlowService.updateProvisionedSubscriberStatus(new ServiceKey(accessDevicePort, uniTagInformation), false);
            });
            addSubscriberToQueue(discoveredSubscriber);
            return true;
        });
        return true;
    }

    public boolean provisionSubscriber(ConnectPoint connectPoint, VlanId vlanId, VlanId vlanId2, Integer num) {
        this.log.debug("Provisioning subscriber on {}, with cTag {}, stag {}, tpId {}", new Object[]{connectPoint, vlanId, vlanId2, num});
        Device device = this.deviceService.getDevice(connectPoint.deviceId());
        Port port = this.deviceService.getPort(device.id(), connectPoint.port());
        AccessDevicePort accessDevicePort = new AccessDevicePort(port);
        if (this.oltDeviceService.isNniPort(device, port.number())) {
            this.log.warn("will not provision a subscriber on the NNI {}", accessDevicePort);
            return false;
        }
        SubscriberAndDeviceInformation subscriberAndDeviceInformation = new SubscriberAndDeviceInformation();
        UniTagInformation uniTagInformation = getUniTagInformation(port, vlanId, vlanId2, num.intValue());
        if (uniTagInformation == null) {
            this.log.error("Can't find Information for subscriber on {}, with cTag {}, stag {}, tpId {}", new Object[]{connectPoint, vlanId, vlanId2, num});
            return false;
        }
        LinkedList linkedList = new LinkedList();
        linkedList.add(uniTagInformation);
        subscriberAndDeviceInformation.setUniTagList(linkedList);
        DiscoveredSubscriber discoveredSubscriber = new DiscoveredSubscriber(device, port, DiscoveredSubscriber.Status.ADDED, true, subscriberAndDeviceInformation);
        ServiceKey serviceKey = new ServiceKey(accessDevicePort, uniTagInformation);
        if (this.oltFlowService.isSubscriberServiceProvisioned(serviceKey)) {
            this.log.error("Subscriber on {} is already provisioned", serviceKey);
            return false;
        }
        this.oltFlowService.updateProvisionedSubscriberStatus(serviceKey, true);
        addSubscriberToQueue(discoveredSubscriber);
        return true;
    }

    public boolean removeSubscriber(ConnectPoint connectPoint, VlanId vlanId, VlanId vlanId2, Integer num) {
        this.log.debug("Un-provisioning subscriber on {} with cTag {}, stag {}, tpId {}", new Object[]{connectPoint, vlanId, vlanId2, num});
        Device device = this.deviceService.getDevice(connectPoint.deviceId());
        Port port = this.deviceService.getPort(device.id(), connectPoint.port());
        AccessDevicePort accessDevicePort = new AccessDevicePort(port);
        if (this.oltDeviceService.isNniPort(device, port.number())) {
            this.log.warn("will not un-provision a subscriber on the NNI {}", accessDevicePort);
            return false;
        }
        SubscriberAndDeviceInformation subscriberAndDeviceInformation = new SubscriberAndDeviceInformation();
        UniTagInformation uniTagInformation = getUniTagInformation(port, vlanId, vlanId2, num.intValue());
        if (uniTagInformation == null) {
            this.log.error("Can't find Information for subscriber on {}, with cTag {}, stag {}, tpId {}", new Object[]{connectPoint, vlanId, vlanId2, num});
            return false;
        }
        LinkedList linkedList = new LinkedList();
        linkedList.add(uniTagInformation);
        subscriberAndDeviceInformation.setUniTagList(linkedList);
        DiscoveredSubscriber discoveredSubscriber = new DiscoveredSubscriber(device, port, DiscoveredSubscriber.Status.ADMIN_REMOVED, true, subscriberAndDeviceInformation);
        ServiceKey serviceKey = new ServiceKey(accessDevicePort, uniTagInformation);
        if (!this.oltFlowService.isSubscriberServiceProvisioned(serviceKey)) {
            this.log.error("Subscriber on {} is not provisioned", serviceKey);
            return false;
        }
        this.oltFlowService.updateProvisionedSubscriberStatus(serviceKey, false);
        addSubscriberToQueue(discoveredSubscriber);
        return true;
    }

    public List<DeviceId> getConnectedOlts() {
        ArrayList arrayList = new ArrayList();
        for (Device device : this.deviceService.getDevices()) {
            if (this.oltDeviceService.isOlt(device)) {
                arrayList.add(device.id());
            }
        }
        return arrayList;
    }

    public ConnectPoint findSubscriberConnectPoint(String str) {
        for (Device device : this.deviceService.getDevices()) {
            for (Port port : this.deviceService.getPorts(device.id())) {
                this.log.trace("Comparing {} with {}", port.annotations().value("portName"), str);
                if (port.annotations().value("portName").equals(str)) {
                    this.log.debug("Found on {}", OltUtils.portWithName(port));
                    return new ConnectPoint(device.id(), port.number());
                }
            }
        }
        return null;
    }

    protected void processDiscoveredSubscribers() {
        this.log.info("Started processDiscoveredSubscribers loop");
        while (true) {
            try {
                try {
                    this.queueReadLock.lock();
                    HashSet hashSet = new HashSet(this.eventsQueues.keySet());
                    this.queueReadLock.unlock();
                    hashSet.forEach(connectPoint -> {
                        try {
                            try {
                                this.queueReadLock.lock();
                                LinkedBlockingQueue<DiscoveredSubscriber> linkedBlockingQueue = this.eventsQueues.get(connectPoint);
                                this.queueReadLock.unlock();
                                if (!this.oltDeviceService.isLocalLeader(connectPoint.deviceId())) {
                                    if (this.log.isTraceEnabled()) {
                                        this.log.trace("Ignoring queue on CP {} as not master of the device", connectPoint);
                                    }
                                } else {
                                    try {
                                        this.flowsExecutor.execute(() -> {
                                            DiscoveredSubscriber discoveredSubscriber;
                                            if (linkedBlockingQueue.isEmpty() || (discoveredSubscriber = (DiscoveredSubscriber) linkedBlockingQueue.peek()) == null) {
                                                return;
                                            }
                                            if (this.log.isTraceEnabled()) {
                                                this.log.trace("Processing subscriber {} on port {} with status {}, has subscriber {}", new Object[]{discoveredSubscriber, OltUtils.portWithName(discoveredSubscriber.port), discoveredSubscriber.status, Boolean.valueOf(discoveredSubscriber.hasSubscriber)});
                                            }
                                            if (discoveredSubscriber.hasSubscriber) {
                                                if (this.oltFlowService.handleSubscriberFlows(discoveredSubscriber, this.defaultBpId, this.multicastServiceName)) {
                                                    removeSubscriberFromQueue(discoveredSubscriber);
                                                }
                                            } else {
                                                if (!this.deviceService.isAvailable(discoveredSubscriber.device.id()) || this.deviceService.getPort(discoveredSubscriber.device.id(), discoveredSubscriber.port.number()) == null) {
                                                    return;
                                                }
                                                if (!this.oltFlowService.handleBasicPortFlows(discoveredSubscriber, this.defaultBpId, this.defaultBpId)) {
                                                    this.log.debug("Not handling basic port flows for {}, leaving in the queue", OltUtils.portWithName(discoveredSubscriber.port));
                                                    return;
                                                }
                                                if (this.log.isTraceEnabled()) {
                                                    this.log.trace("Processing of port {} completed", OltUtils.portWithName(discoveredSubscriber.port));
                                                }
                                                removeSubscriberFromQueue(discoveredSubscriber);
                                            }
                                        });
                                    } catch (Exception e) {
                                        this.log.error("Exception processing subscriber", e);
                                    }
                                }
                            } catch (Throwable th) {
                                this.queueReadLock.unlock();
                                throw th;
                            }
                        } catch (Exception e2) {
                            this.log.error("Cannot get key from queue map", e2);
                            this.queueReadLock.unlock();
                        }
                    });
                    try {
                        TimeUnit.MILLISECONDS.sleep(this.requeueDelay);
                    } catch (InterruptedException e) {
                        this.log.debug("Interrupted while waiting to requeue", e);
                    }
                } catch (Exception e2) {
                    this.log.error("Cannot read keys from queue map", e2);
                    this.queueReadLock.unlock();
                }
            } catch (Throwable th) {
                this.queueReadLock.unlock();
                throw th;
            }
        }
    }

    private UniTagInformation getUniTagInformation(Port port, VlanId vlanId, VlanId vlanId2, int i) {
        String portWithName = OltUtils.portWithName(port);
        this.log.debug("Getting uni tag information for {}, innerVlan: {}, outerVlan: {}, tpId: {}", new Object[]{portWithName, vlanId, vlanId2, Integer.valueOf(i)});
        Optional findFirst = this.oltFlowService.getProgrammedSubscribers().entrySet().stream().filter(entry -> {
            return ((ServiceKey) entry.getKey()).getPort().equals(new AccessDevicePort(port)) && ((UniTagInformation) entry.getValue()).getPonSTag().equals(vlanId2) && ((UniTagInformation) entry.getValue()).getPonCTag().equals(vlanId);
        }).findFirst();
        if (findFirst.isPresent()) {
            this.log.debug("Subscriber was programmed with uni tag info for {}, innerVlan: {}, outerVlan: {}, tpId: {}", new Object[]{portWithName, vlanId, vlanId2, Integer.valueOf(i)});
            return (UniTagInformation) ((Map.Entry) findFirst.get()).getValue();
        }
        SubscriberAndDeviceInformation subscriberAndDeviceInformation = this.subsService.get(portWithName);
        if (subscriberAndDeviceInformation == null) {
            this.log.warn("Subscriber information doesn't exist for {}", portWithName);
            return null;
        }
        if (subscriberAndDeviceInformation.uniTagList() == null) {
            this.log.warn("Uni tag list is not found for the subscriber {} on {}", subscriberAndDeviceInformation.id(), portWithName);
            return null;
        }
        UniTagInformation uniTagInformation = OltUtils.getUniTagInformation(subscriberAndDeviceInformation, vlanId, vlanId2, i);
        if (uniTagInformation == null) {
            this.subsService.invalidateId(portWithName);
            uniTagInformation = OltUtils.getUniTagInformation(this.subsService.get(portWithName), vlanId, vlanId2, i);
        }
        if (uniTagInformation != null) {
            return uniTagInformation;
        }
        this.log.warn("SADIS doesn't include the service with ponCtag {} ponStag {} and tpId {} on {}", new Object[]{vlanId, vlanId2, Integer.valueOf(i), portWithName});
        return null;
    }

    protected void bindSadisService(SadisService sadisService) {
        this.sadisService = sadisService;
        this.subsService = this.sadisService.getSubscriberInfoService();
        this.log.info("Sadis-service binds to onos.");
    }

    protected void unbindSadisService(SadisService sadisService) {
        this.deviceListener = null;
        this.sadisService = null;
        this.subsService = null;
        this.log.info("Sadis-service unbinds from onos.");
    }

    protected void addSubscriberToQueue(DiscoveredSubscriber discoveredSubscriber) {
        try {
            ConnectPoint connectPoint = new ConnectPoint(discoveredSubscriber.device.id(), discoveredSubscriber.port.number());
            try {
                try {
                    this.queueWriteLock.lock();
                    this.eventsQueues.compute(connectPoint, (connectPoint2, linkedBlockingQueue) -> {
                        LinkedBlockingQueue linkedBlockingQueue = linkedBlockingQueue == null ? new LinkedBlockingQueue() : linkedBlockingQueue;
                        this.log.info("Adding subscriber {} to queue: {} with existing {}", new Object[]{discoveredSubscriber, OltUtils.portWithName(discoveredSubscriber.port), linkedBlockingQueue});
                        linkedBlockingQueue.add(discoveredSubscriber);
                        return linkedBlockingQueue;
                    });
                    this.queueWriteLock.unlock();
                } catch (Throwable th) {
                    this.queueWriteLock.unlock();
                    throw th;
                }
            } catch (ClassCastException | IllegalArgumentException | NullPointerException | UnsupportedOperationException e) {
                this.log.error("Cannot add subscriber {} to queue: {}", OltUtils.portWithName(discoveredSubscriber.port), e.getMessage());
                this.queueWriteLock.unlock();
            }
        } catch (Exception e2) {
            this.log.error("Can't add {} to queue", discoveredSubscriber, e2);
        }
    }

    protected void removeSubscriberFromQueue(DiscoveredSubscriber discoveredSubscriber) {
        ConnectPoint connectPoint = new ConnectPoint(discoveredSubscriber.device.id(), discoveredSubscriber.port.number());
        try {
            try {
                this.queueWriteLock.lock();
                this.eventsQueues.compute(connectPoint, (connectPoint2, linkedBlockingQueue) -> {
                    if (this.log.isTraceEnabled()) {
                        this.log.trace("Removing subscriber {} from queue : {} with existing {}", new Object[]{discoveredSubscriber, OltUtils.portWithName(discoveredSubscriber.port), linkedBlockingQueue});
                    }
                    if (linkedBlockingQueue == null) {
                        this.log.warn("Cannot find queue for connectPoint {}", connectPoint);
                        return linkedBlockingQueue;
                    }
                    if (linkedBlockingQueue.remove(discoveredSubscriber)) {
                        this.log.debug("Subscriber {} has been removed from the queue {}", discoveredSubscriber, linkedBlockingQueue);
                        return linkedBlockingQueue;
                    }
                    this.log.warn("Subscriber {} has not been removed from queue, is it still there? {}", discoveredSubscriber, linkedBlockingQueue);
                    return linkedBlockingQueue;
                });
                this.queueWriteLock.unlock();
            } catch (ClassCastException | IllegalArgumentException | NullPointerException | UnsupportedOperationException e) {
                this.log.error("Cannot remove subscriber {} from queue: {}", discoveredSubscriber, e.getMessage());
                this.queueWriteLock.unlock();
            }
        } catch (Throwable th) {
            this.queueWriteLock.unlock();
            throw th;
        }
    }
}
