package com.netflix.conductor.contribs.listener;

import com.netflix.conductor.contribs.listener.RestClientManager;
import com.netflix.conductor.core.dal.ExecutionDAOFacade;
import com.netflix.conductor.core.listener.TaskStatusListener;
import com.netflix.conductor.model.TaskModel;
import java.io.IOException;
import java.lang.Thread;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:com/netflix/conductor/contribs/listener/TaskStatusPublisher.class */
public class TaskStatusPublisher implements TaskStatusListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(TaskStatusPublisher.class);
    private static final Integer QDEPTH = Integer.valueOf(Integer.parseInt(System.getenv().getOrDefault("ENV_TASK_NOTIFICATION_QUEUE_SIZE", "50")));
    private BlockingQueue<TaskModel> blockingQueue = new LinkedBlockingDeque(QDEPTH.intValue());
    private RestClientManager rcm;
    private ExecutionDAOFacade executionDAOFacade;
    private List<String> subscribedTaskStatusList;

    /* loaded from: input_file:com/netflix/conductor/contribs/listener/TaskStatusPublisher$ConsumerThread.class */
    class ConsumerThread extends Thread {
        ConsumerThread() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            setUncaughtExceptionHandler(new ExceptionHandler());
            TaskStatusPublisher.LOGGER.info("{}: Starting consumer thread", Thread.currentThread().getName());
            TaskModel taskModel = null;
            TaskNotification taskNotification = null;
            while (true) {
                try {
                    taskModel = TaskStatusPublisher.this.blockingQueue.take();
                    taskNotification = new TaskNotification(taskModel.toTask());
                    TaskStatusPublisher.LOGGER.info("Publishing TaskNotification: {}", taskNotification.toJsonString());
                    if (taskNotification.getTaskType().equals("SUB_WORKFLOW")) {
                        TaskStatusPublisher.LOGGER.info("Skip task '{}' notification. Task type is SUB_WORKFLOW.", taskNotification.getTaskId());
                    } else {
                        TaskStatusPublisher.this.publishTaskNotification(taskNotification);
                        TaskStatusPublisher.LOGGER.debug("Task {} publish is successful.", taskNotification.getTaskId());
                        Thread.sleep(5L);
                    }
                } catch (Exception e) {
                    if (taskNotification != null) {
                        TaskStatusPublisher.LOGGER.error("Error while publishing task. Hence updating elastic search index taskId {} taskname {}", taskModel.getTaskId(), taskModel.getTaskDefName());
                    } else {
                        TaskStatusPublisher.LOGGER.error("Failed to publish task: Task is NULL");
                    }
                    TaskStatusPublisher.LOGGER.error("Error on publishing ", e);
                }
            }
        }
    }

    /* loaded from: input_file:com/netflix/conductor/contribs/listener/TaskStatusPublisher$ExceptionHandler.class */
    class ExceptionHandler implements Thread.UncaughtExceptionHandler {
        ExceptionHandler() {
        }

        @Override // java.lang.Thread.UncaughtExceptionHandler
        public void uncaughtException(Thread thread, Throwable th) {
            TaskStatusPublisher.LOGGER.info("An exception has been captured\n");
            TaskStatusPublisher.LOGGER.info("Thread: {}\n", thread.getName());
            TaskStatusPublisher.LOGGER.info("Exception: {}: {}\n", th.getClass().getName(), th.getMessage());
            TaskStatusPublisher.LOGGER.info("Stack Trace: \n");
            th.printStackTrace(System.out);
            TaskStatusPublisher.LOGGER.info("Thread status: {}\n", thread.getState());
            new ConsumerThread().start();
        }
    }

    @Inject
    public TaskStatusPublisher(RestClientManager restClientManager, ExecutionDAOFacade executionDAOFacade, List<String> list) {
        this.rcm = restClientManager;
        this.executionDAOFacade = executionDAOFacade;
        this.subscribedTaskStatusList = list;
        validateSubscribedTaskStatuses(list);
        new ConsumerThread().start();
    }

    private void validateSubscribedTaskStatuses(List<String> list) {
        for (String str : list) {
            if (!str.equals("SCHEDULED")) {
                LOGGER.error("Task Status Type {} will only push notificaitons when updated through the API. Automatic notifications only work for SCHEDULED type.", str);
            }
        }
    }

    private void enqueueTask(TaskModel taskModel) {
        try {
            this.blockingQueue.put(taskModel);
        } catch (Exception e) {
            LOGGER.debug("Failed to enqueue task: Id {} Type {} of workflow {} ", new Object[]{taskModel.getTaskId(), taskModel.getTaskType(), taskModel.getWorkflowInstanceId()});
            LOGGER.debug(e.toString());
        }
    }

    public void onTaskScheduled(TaskModel taskModel) {
        if (this.subscribedTaskStatusList.contains(TaskModel.Status.SCHEDULED.name())) {
            enqueueTask(taskModel);
        }
    }

    public void onTaskCanceled(TaskModel taskModel) {
        if (this.subscribedTaskStatusList.contains(TaskModel.Status.CANCELED.name())) {
            enqueueTask(taskModel);
        }
    }

    public void onTaskCompleted(TaskModel taskModel) {
        if (this.subscribedTaskStatusList.contains(TaskModel.Status.COMPLETED.name())) {
            enqueueTask(taskModel);
        }
    }

    public void onTaskCompletedWithErrors(TaskModel taskModel) {
        if (this.subscribedTaskStatusList.contains(TaskModel.Status.COMPLETED_WITH_ERRORS.name())) {
            enqueueTask(taskModel);
        }
    }

    public void onTaskFailed(TaskModel taskModel) {
        if (this.subscribedTaskStatusList.contains(TaskModel.Status.FAILED.name())) {
            enqueueTask(taskModel);
        }
    }

    public void onTaskFailedWithTerminalError(TaskModel taskModel) {
        if (this.subscribedTaskStatusList.contains(TaskModel.Status.FAILED_WITH_TERMINAL_ERROR.name())) {
            enqueueTask(taskModel);
        }
    }

    public void onTaskInProgress(TaskModel taskModel) {
        if (this.subscribedTaskStatusList.contains(TaskModel.Status.IN_PROGRESS.name())) {
            enqueueTask(taskModel);
        }
    }

    public void onTaskSkipped(TaskModel taskModel) {
        if (this.subscribedTaskStatusList.contains(TaskModel.Status.SKIPPED.name())) {
            enqueueTask(taskModel);
        }
    }

    public void onTaskTimedOut(TaskModel taskModel) {
        if (this.subscribedTaskStatusList.contains(TaskModel.Status.TIMED_OUT.name())) {
            enqueueTask(taskModel);
        }
    }

    private void publishTaskNotification(TaskNotification taskNotification) throws IOException {
        this.rcm.postNotification(RestClientManager.NotificationType.TASK, taskNotification.toJsonStringWithInputOutput(), taskNotification.getTaskId(), null);
    }
}
