package com.netflix.conductor.contribs.listener.statuschange;

import com.netflix.conductor.contribs.listener.RestClientManager;
import com.netflix.conductor.core.dal.ExecutionDAOFacade;
import com.netflix.conductor.core.listener.WorkflowStatusListener;
import com.netflix.conductor.model.WorkflowModel;
import java.io.IOException;
import java.lang.Thread;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:com/netflix/conductor/contribs/listener/statuschange/StatusChangePublisher.class */
public class StatusChangePublisher implements WorkflowStatusListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(StatusChangePublisher.class);
    private static final Integer QDEPTH = Integer.valueOf(Integer.parseInt(System.getenv().getOrDefault("ENV_WORKFLOW_NOTIFICATION_QUEUE_SIZE", "50")));
    private BlockingQueue<WorkflowModel> blockingQueue = new LinkedBlockingDeque(QDEPTH.intValue());
    private RestClientManager rcm;
    private ExecutionDAOFacade executionDAOFacade;

    /* loaded from: input_file:com/netflix/conductor/contribs/listener/statuschange/StatusChangePublisher$ConsumerThread.class */
    class ConsumerThread extends Thread {
        ConsumerThread() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            setUncaughtExceptionHandler(new ExceptionHandler());
            StatusChangePublisher.LOGGER.info("{}: Starting consumer thread", Thread.currentThread().getName());
            StatusChangeNotification statusChangeNotification = null;
            WorkflowModel workflowModel = null;
            while (true) {
                try {
                    workflowModel = StatusChangePublisher.this.blockingQueue.take();
                    statusChangeNotification = new StatusChangeNotification(workflowModel.toWorkflow());
                    StatusChangePublisher.LOGGER.info("Publishing StatusChangeNotification: {}", statusChangeNotification.toJsonString());
                    StatusChangePublisher.this.publishStatusChangeNotification(statusChangeNotification);
                    StatusChangePublisher.LOGGER.debug("Workflow {} publish is successful.", statusChangeNotification.getWorkflowId());
                    Thread.sleep(5L);
                } catch (Exception e) {
                    if (statusChangeNotification != null) {
                        StatusChangePublisher.LOGGER.error(" Error while publishing workflow. Hence updating elastic search index workflowid {} workflowname {} correlationId {}", new Object[]{workflowModel.getWorkflowId(), workflowModel.getWorkflowName(), workflowModel.getCorrelationId()});
                    } else {
                        StatusChangePublisher.LOGGER.error("Failed to publish workflow: Workflow is NULL");
                    }
                    StatusChangePublisher.LOGGER.error("Error on publishing workflow", e);
                }
            }
        }
    }

    /* loaded from: input_file:com/netflix/conductor/contribs/listener/statuschange/StatusChangePublisher$ExceptionHandler.class */
    class ExceptionHandler implements Thread.UncaughtExceptionHandler {
        ExceptionHandler() {
        }

        @Override // java.lang.Thread.UncaughtExceptionHandler
        public void uncaughtException(Thread thread, Throwable th) {
            StatusChangePublisher.LOGGER.info("An exception has been captured\n");
            StatusChangePublisher.LOGGER.info("Thread: {}\n", thread.getName());
            StatusChangePublisher.LOGGER.info("Exception: {}: {}\n", th.getClass().getName(), th.getMessage());
            StatusChangePublisher.LOGGER.info("Stack Trace: \n");
            th.printStackTrace(System.out);
            StatusChangePublisher.LOGGER.info("Thread status: {}\n", thread.getState());
            new ConsumerThread().start();
        }
    }

    @Inject
    public StatusChangePublisher(RestClientManager restClientManager, ExecutionDAOFacade executionDAOFacade) {
        this.rcm = restClientManager;
        this.executionDAOFacade = executionDAOFacade;
        new ConsumerThread().start();
    }

    public void onWorkflowCompleted(WorkflowModel workflowModel) {
        LOGGER.debug("workflows completion {} {}", workflowModel.getWorkflowId(), workflowModel.getWorkflowName());
        try {
            this.blockingQueue.put(workflowModel);
        } catch (Exception e) {
            LOGGER.error("Failed to enqueue workflow: Id {} Name {}", workflowModel.getWorkflowId(), workflowModel.getWorkflowName());
            LOGGER.error(e.toString());
        }
    }

    public void onWorkflowTerminated(WorkflowModel workflowModel) {
        LOGGER.debug("workflows termination {} {}", workflowModel.getWorkflowId(), workflowModel.getWorkflowName());
        try {
            this.blockingQueue.put(workflowModel);
        } catch (Exception e) {
            LOGGER.error("Failed to enqueue workflow: Id {} Name {}", workflowModel.getWorkflowId(), workflowModel.getWorkflowName());
            LOGGER.error(e.getMessage());
        }
    }

    public void onWorkflowCompletedIfEnabled(WorkflowModel workflowModel) {
        onWorkflowCompleted(workflowModel);
    }

    public void onWorkflowTerminatedIfEnabled(WorkflowModel workflowModel) {
        onWorkflowTerminated(workflowModel);
    }

    private void publishStatusChangeNotification(StatusChangeNotification statusChangeNotification) throws IOException {
        this.rcm.postNotification(RestClientManager.NotificationType.WORKFLOW, statusChangeNotification.toJsonStringWithInputOutput(), statusChangeNotification.getWorkflowId(), statusChangeNotification.getStatusNotifier());
    }
}
