package org.elasticsearch.xpack.ml.datafeed;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJob;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.notifications.Auditor;

/* loaded from: input_file:org/elasticsearch/xpack/ml/datafeed/DatafeedManager.class */
public class DatafeedManager {
    private static final Logger logger = LogManager.getLogger(DatafeedManager.class);
    private final Client client;
    private final ClusterService clusterService;
    private final ThreadPool threadPool;
    private final Supplier<Long> currentTimeSupplier;
    private final Auditor auditor;
    private final DatafeedJobBuilder datafeedJobBuilder;
    private final AutodetectProcessManager autodetectProcessManager;
    private final ConcurrentMap<Long, Holder> runningDatafeedsOnThisNode = new ConcurrentHashMap();
    private final TaskRunner taskRunner = new TaskRunner();

    /* loaded from: input_file:org/elasticsearch/xpack/ml/datafeed/DatafeedManager$Holder.class */
    public class Holder {
        private final TransportStartDatafeedAction.DatafeedTask task;
        private final long allocationId;
        private final String datafeedId;
        private final ReentrantLock datafeedJobLock = new ReentrantLock(true);
        private final DatafeedJob datafeedJob;
        private final boolean autoCloseJob;
        private final ProblemTracker problemTracker;
        private final Consumer<Exception> finishHandler;
        volatile Scheduler.Cancellable cancellable;
        private volatile boolean isNodeShuttingDown;

        Holder(TransportStartDatafeedAction.DatafeedTask datafeedTask, String str, DatafeedJob datafeedJob, ProblemTracker problemTracker, Consumer<Exception> consumer) {
            this.task = datafeedTask;
            this.allocationId = datafeedTask.getAllocationId();
            this.datafeedId = str;
            this.datafeedJob = datafeedJob;
            this.autoCloseJob = datafeedTask.isLookbackOnly();
            this.problemTracker = problemTracker;
            this.finishHandler = consumer;
        }

        String getJobId() {
            return this.datafeedJob.getJobId();
        }

        boolean isRunning() {
            return this.datafeedJob.isRunning();
        }

        boolean isIsolated() {
            return this.datafeedJob.isIsolated();
        }

        public void stop(String str, TimeValue timeValue, Exception exc) {
            if (this.isNodeShuttingDown) {
                return;
            }
            DatafeedManager.logger.info("[{}] attempt to stop datafeed [{}] for job [{}]", str, this.datafeedId, this.datafeedJob.getJobId());
            if (!this.datafeedJob.stop()) {
                DatafeedManager.logger.info("[{}] datafeed [{}] for job [{}] was already stopped", str, this.datafeedId, this.datafeedJob.getJobId());
                return;
            }
            boolean z = false;
            try {
                try {
                    DatafeedManager.logger.info("[{}] try lock [{}] to stop datafeed [{}] for job [{}]...", str, timeValue, this.datafeedId, this.datafeedJob.getJobId());
                    z = this.datafeedJobLock.tryLock(timeValue.millis(), TimeUnit.MILLISECONDS);
                    DatafeedManager.logger.info("[{}] stopping datafeed [{}] for job [{}], acquired [{}]...", str, this.datafeedId, this.datafeedJob.getJobId(), Boolean.valueOf(z));
                    DatafeedManager.this.runningDatafeedsOnThisNode.remove(Long.valueOf(this.allocationId));
                    if (this.cancellable != null) {
                        this.cancellable.cancel();
                    }
                    DatafeedManager.this.auditor.info(this.datafeedJob.getJobId(), Messages.getMessage(isIsolated() ? "Datafeed isolated" : "Datafeed stopped"));
                    this.finishHandler.accept(exc);
                    DatafeedManager.logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", str, this.datafeedId, this.datafeedJob.getJobId(), z ? "" : ", but there may be pending tasks as the timeout [" + timeValue.getStringRep() + "] expired");
                    if (this.autoCloseJob && !isIsolated()) {
                        closeJob();
                    }
                    if (z) {
                        this.datafeedJobLock.unlock();
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    DatafeedManager.logger.info("[{}] stopping datafeed [{}] for job [{}], acquired [{}]...", str, this.datafeedId, this.datafeedJob.getJobId(), Boolean.valueOf(z));
                    DatafeedManager.this.runningDatafeedsOnThisNode.remove(Long.valueOf(this.allocationId));
                    if (this.cancellable != null) {
                        this.cancellable.cancel();
                    }
                    DatafeedManager.this.auditor.info(this.datafeedJob.getJobId(), Messages.getMessage(isIsolated() ? "Datafeed isolated" : "Datafeed stopped"));
                    this.finishHandler.accept(exc);
                    DatafeedManager.logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", str, this.datafeedId, this.datafeedJob.getJobId(), z ? "" : ", but there may be pending tasks as the timeout [" + timeValue.getStringRep() + "] expired");
                    if (this.autoCloseJob && !isIsolated()) {
                        closeJob();
                    }
                    if (z) {
                        this.datafeedJobLock.unlock();
                    }
                }
            } catch (Throwable th) {
                DatafeedManager.logger.info("[{}] stopping datafeed [{}] for job [{}], acquired [{}]...", str, this.datafeedId, this.datafeedJob.getJobId(), Boolean.valueOf(z));
                DatafeedManager.this.runningDatafeedsOnThisNode.remove(Long.valueOf(this.allocationId));
                if (this.cancellable != null) {
                    this.cancellable.cancel();
                }
                DatafeedManager.this.auditor.info(this.datafeedJob.getJobId(), Messages.getMessage(isIsolated() ? "Datafeed isolated" : "Datafeed stopped"));
                this.finishHandler.accept(exc);
                DatafeedManager.logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", str, this.datafeedId, this.datafeedJob.getJobId(), z ? "" : ", but there may be pending tasks as the timeout [" + timeValue.getStringRep() + "] expired");
                if (this.autoCloseJob && !isIsolated()) {
                    closeJob();
                }
                if (z) {
                    this.datafeedJobLock.unlock();
                }
                throw th;
            }
        }

        public void isolateDatafeed() {
            this.datafeedJob.isolate();
        }

        public void setNodeIsShuttingDown() {
            this.isNodeShuttingDown = true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Long executeLookBack(long j, Long l) throws Exception {
            this.datafeedJobLock.lock();
            try {
                if (!isRunning() || isIsolated()) {
                    return null;
                }
                Long runLookBack = this.datafeedJob.runLookBack(j, l);
                this.datafeedJobLock.unlock();
                return runLookBack;
            } finally {
                this.datafeedJobLock.unlock();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public long executeRealTime() throws Exception {
            this.datafeedJobLock.lock();
            try {
                if (!isRunning() || isIsolated()) {
                    return -1L;
                }
                return this.datafeedJob.runRealtime();
            } finally {
                this.datafeedJobLock.unlock();
            }
        }

        private void closeJob() {
            JobState jobState = MlTasks.getJobState(getJobId(), DatafeedManager.this.clusterService.state().getMetaData().custom("persistent_tasks"));
            if (jobState != JobState.OPENED) {
                DatafeedManager.logger.debug("[{}] No need to auto-close job as job state is [{}]", getJobId(), jobState);
            } else {
                this.task.waitForPersistentTask((v0) -> {
                    return Objects.isNull(v0);
                }, TimeValue.timeValueSeconds(20L), new PersistentTasksService.WaitForPersistentTaskListener<StartDatafeedAction.DatafeedParams>() { // from class: org.elasticsearch.xpack.ml.datafeed.DatafeedManager.Holder.1
                    public void onResponse(PersistentTasksCustomMetaData.PersistentTask<StartDatafeedAction.DatafeedParams> persistentTask) {
                        CloseJobAction.Request request = new CloseJobAction.Request(Holder.this.getJobId());
                        request.setLocal(true);
                        ClientHelper.executeAsyncWithOrigin(DatafeedManager.this.client, MachineLearning.NAME, CloseJobAction.INSTANCE, request, new ActionListener<CloseJobAction.Response>() { // from class: org.elasticsearch.xpack.ml.datafeed.DatafeedManager.Holder.1.1
                            public void onResponse(CloseJobAction.Response response) {
                                if (response.isClosed()) {
                                    return;
                                }
                                DatafeedManager.logger.error("[{}] job close action was not acknowledged", Holder.this.getJobId());
                            }

                            public void onFailure(Exception exc) {
                                if ((exc instanceof ElasticsearchStatusException) && ((ElasticsearchStatusException) exc).status() == RestStatus.CONFLICT) {
                                    DatafeedManager.logger.debug("[{}] {}", Holder.this.getJobId(), exc.getMessage());
                                } else {
                                    DatafeedManager.logger.error("[" + Holder.this.getJobId() + "] failed to auto-close job", exc);
                                }
                            }
                        });
                    }

                    public void onFailure(Exception exc) {
                        DatafeedManager.logger.error("Failed to remove datafeed persistent task - will not auto close job [" + Holder.this.getJobId() + "]", exc);
                    }
                });
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsearch/xpack/ml/datafeed/DatafeedManager$TaskRunner.class */
    public class TaskRunner implements ClusterStateListener {
        private final List<TransportStartDatafeedAction.DatafeedTask> tasksToRun;

        private TaskRunner() {
            this.tasksToRun = new CopyOnWriteArrayList();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void runWhenJobIsOpened(TransportStartDatafeedAction.DatafeedTask datafeedTask) {
            PersistentTasksCustomMetaData custom = DatafeedManager.this.clusterService.state().getMetaData().custom("persistent_tasks");
            if (DatafeedManager.this.getJobState(custom, datafeedTask) == JobState.OPENED && DatafeedManager.this.jobHasOpenAutodetectCommunicator(custom, datafeedTask)) {
                runTask(datafeedTask);
            } else {
                DatafeedManager.logger.info("Datafeed [{}] is waiting for job [{}] to be opened", datafeedTask.getDatafeedId(), DatafeedManager.this.getJobId(datafeedTask));
                this.tasksToRun.add(datafeedTask);
            }
        }

        private void runTask(TransportStartDatafeedAction.DatafeedTask datafeedTask) {
            ThreadContext.StoredContext stashContext = DatafeedManager.this.threadPool.getThreadContext().stashContext();
            try {
                DatafeedManager.this.innerRun((Holder) DatafeedManager.this.runningDatafeedsOnThisNode.get(Long.valueOf(datafeedTask.getAllocationId())), datafeedTask.getDatafeedStartTime(), datafeedTask.getEndTime());
                if (stashContext != null) {
                    stashContext.close();
                }
            } catch (Throwable th) {
                if (stashContext != null) {
                    try {
                        stashContext.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }

        public void clusterChanged(ClusterChangedEvent clusterChangedEvent) {
            if (this.tasksToRun.isEmpty() || !clusterChangedEvent.metaDataChanged()) {
                return;
            }
            PersistentTasksCustomMetaData custom = clusterChangedEvent.previousState().getMetaData().custom("persistent_tasks");
            PersistentTasksCustomMetaData custom2 = clusterChangedEvent.state().getMetaData().custom("persistent_tasks");
            if (Objects.equals(custom, custom2)) {
                return;
            }
            ArrayList arrayList = new ArrayList();
            for (TransportStartDatafeedAction.DatafeedTask datafeedTask : this.tasksToRun) {
                if (DatafeedManager.this.runningDatafeedsOnThisNode.containsKey(Long.valueOf(datafeedTask.getAllocationId()))) {
                    JobState jobState = DatafeedManager.this.getJobState(custom2, datafeedTask);
                    if (jobState == JobState.OPENING || !DatafeedManager.this.jobHasOpenAutodetectCommunicator(custom2, datafeedTask)) {
                        arrayList.add(datafeedTask);
                    } else if (jobState == JobState.OPENED) {
                        runTask(datafeedTask);
                    } else {
                        DatafeedManager.logger.warn("Datafeed [{}] is stopping because job [{}] state is [{}]", datafeedTask.getDatafeedId(), DatafeedManager.this.getJobId(datafeedTask), jobState);
                        datafeedTask.stop("job_never_opened", TimeValue.timeValueSeconds(20L));
                    }
                }
            }
            this.tasksToRun.retainAll(arrayList);
        }
    }

    public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clusterService, DatafeedJobBuilder datafeedJobBuilder, Supplier<Long> supplier, Auditor auditor, AutodetectProcessManager autodetectProcessManager) {
        this.client = (Client) Objects.requireNonNull(client);
        this.clusterService = (ClusterService) Objects.requireNonNull(clusterService);
        this.threadPool = threadPool;
        this.currentTimeSupplier = (Supplier) Objects.requireNonNull(supplier);
        this.auditor = (Auditor) Objects.requireNonNull(auditor);
        this.datafeedJobBuilder = (DatafeedJobBuilder) Objects.requireNonNull(datafeedJobBuilder);
        this.autodetectProcessManager = autodetectProcessManager;
        clusterService.addListener(this.taskRunner);
    }

    public void run(TransportStartDatafeedAction.DatafeedTask datafeedTask, Consumer<Exception> consumer) {
        String datafeedId = datafeedTask.getDatafeedId();
        CheckedConsumer checkedConsumer = datafeedJob -> {
            this.runningDatafeedsOnThisNode.put(Long.valueOf(datafeedTask.getAllocationId()), new Holder(datafeedTask, datafeedId, datafeedJob, new ProblemTracker(this.auditor, datafeedJob.getJobId()), consumer));
            datafeedTask.updatePersistentTaskState(DatafeedState.STARTED, new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() { // from class: org.elasticsearch.xpack.ml.datafeed.DatafeedManager.1
                public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
                    DatafeedManager.this.taskRunner.runWhenJobIsOpened(datafeedTask);
                }

                public void onFailure(Exception exc) {
                    consumer.accept(exc);
                }
            });
        };
        Objects.requireNonNull(consumer);
        this.datafeedJobBuilder.build(datafeedId, ActionListener.wrap(checkedConsumer, (v1) -> {
            r1.accept(v1);
        }));
    }

    public void stopDatafeed(TransportStartDatafeedAction.DatafeedTask datafeedTask, String str, TimeValue timeValue) {
        logger.info("[{}] attempt to stop datafeed [{}] [{}]", str, datafeedTask.getDatafeedId(), Long.valueOf(datafeedTask.getAllocationId()));
        Holder remove = this.runningDatafeedsOnThisNode.remove(Long.valueOf(datafeedTask.getAllocationId()));
        if (remove != null) {
            remove.stop(str, timeValue, null);
        }
    }

    public void stopAllDatafeedsOnThisNode(String str) {
        int size = this.runningDatafeedsOnThisNode.size();
        if (size != 0) {
            logger.info("Closing [{}] datafeeds, because [{}]", Integer.valueOf(size), str);
            Iterator<Holder> it = this.runningDatafeedsOnThisNode.values().iterator();
            while (it.hasNext()) {
                it.next().stop(str, TimeValue.timeValueSeconds(20L), null);
            }
        }
    }

    public void isolateAllDatafeedsOnThisNodeBeforeShutdown() {
        Iterator<Holder> it = this.runningDatafeedsOnThisNode.values().iterator();
        while (it.hasNext()) {
            Holder next = it.next();
            next.isolateDatafeed();
            next.setNodeIsShuttingDown();
            it.remove();
        }
    }

    public void isolateDatafeed(long j) {
        Holder holder = this.runningDatafeedsOnThisNode.get(Long.valueOf(j));
        if (holder != null) {
            holder.isolateDatafeed();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void innerRun(final Holder holder, final long j, final Long l) {
        holder.cancellable = Scheduler.wrapAsCancellable(this.threadPool.executor(MachineLearning.DATAFEED_THREAD_POOL_NAME).submit((Runnable) new AbstractRunnable() { // from class: org.elasticsearch.xpack.ml.datafeed.DatafeedManager.2
            public void onFailure(Exception exc) {
                DatafeedManager.logger.error("Failed lookback import for job [" + holder.datafeedJob.getJobId() + "]", exc);
                holder.stop("general_lookback_failure", TimeValue.timeValueSeconds(20L), exc);
            }

            protected void doRun() {
                Long l2 = null;
                try {
                    l2 = holder.executeLookBack(j, l);
                } catch (DatafeedJob.AnalysisProblemException e) {
                    if (l == null) {
                        l2 = Long.valueOf(e.nextDelayInMsSinceEpoch);
                    }
                    holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage());
                    if (e.shouldStop) {
                        holder.stop("lookback_analysis_error", TimeValue.timeValueSeconds(20L), e);
                        return;
                    }
                } catch (DatafeedJob.EmptyDataCountException e2) {
                    if (l == null) {
                        holder.problemTracker.reportEmptyDataCount();
                        l2 = Long.valueOf(e2.nextDelayInMsSinceEpoch);
                    } else {
                        String message = Messages.getMessage("Datafeed lookback retrieved no data");
                        DatafeedManager.logger.warn("[{}] {}", holder.datafeedJob.getJobId(), message);
                        DatafeedManager.this.auditor.warning(holder.datafeedJob.getJobId(), message);
                    }
                } catch (DatafeedJob.ExtractionProblemException e3) {
                    if (l == null) {
                        l2 = Long.valueOf(e3.nextDelayInMsSinceEpoch);
                    }
                    holder.problemTracker.reportExtractionProblem(e3.getCause().getMessage());
                } catch (Exception e4) {
                    DatafeedManager.logger.error("Failed lookback import for job [" + holder.datafeedJob.getJobId() + "]", e4);
                    holder.stop("general_lookback_failure", TimeValue.timeValueSeconds(20L), e4);
                    return;
                }
                if (holder.isIsolated()) {
                    return;
                }
                if (l2 != null) {
                    DatafeedManager.this.doDatafeedRealtime(l2.longValue(), holder.datafeedJob.getJobId(), holder);
                } else {
                    holder.stop("no_realtime", TimeValue.timeValueSeconds(20L), null);
                    holder.problemTracker.finishReport();
                }
            }
        }));
    }

    void doDatafeedRealtime(long j, final String str, final Holder holder) {
        if (!holder.isRunning() || holder.isIsolated()) {
            return;
        }
        TimeValue computeNextDelay = computeNextDelay(j);
        logger.debug("Waiting [{}] before executing next realtime import for job [{}]", computeNextDelay, str);
        holder.cancellable = this.threadPool.schedule(new AbstractRunnable() { // from class: org.elasticsearch.xpack.ml.datafeed.DatafeedManager.3
            public void onFailure(Exception exc) {
                DatafeedManager.logger.error("Unexpected datafeed failure for job [" + str + "] stopping...", exc);
                holder.stop("general_realtime_error", TimeValue.timeValueSeconds(20L), exc);
            }

            protected void doRun() {
                long j2;
                try {
                    j2 = holder.executeRealTime();
                    holder.problemTracker.reportNoneEmptyCount();
                } catch (DatafeedJob.AnalysisProblemException e) {
                    j2 = e.nextDelayInMsSinceEpoch;
                    holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage());
                    if (e.shouldStop) {
                        holder.stop("realtime_analysis_error", TimeValue.timeValueSeconds(20L), e);
                        return;
                    }
                } catch (DatafeedJob.EmptyDataCountException e2) {
                    j2 = e2.nextDelayInMsSinceEpoch;
                    holder.problemTracker.reportEmptyDataCount();
                } catch (DatafeedJob.ExtractionProblemException e3) {
                    j2 = e3.nextDelayInMsSinceEpoch;
                    holder.problemTracker.reportExtractionProblem(e3.getCause().getMessage());
                } catch (Exception e4) {
                    DatafeedManager.logger.error("Unexpected datafeed failure for job [" + str + "] stopping...", e4);
                    holder.stop("general_realtime_error", TimeValue.timeValueSeconds(20L), e4);
                    return;
                }
                holder.problemTracker.finishReport();
                if (j2 >= 0) {
                    DatafeedManager.this.doDatafeedRealtime(j2, str, holder);
                }
            }
        }, computeNextDelay, MachineLearning.DATAFEED_THREAD_POOL_NAME);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getJobId(TransportStartDatafeedAction.DatafeedTask datafeedTask) {
        return this.runningDatafeedsOnThisNode.get(Long.valueOf(datafeedTask.getAllocationId())).getJobId();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public JobState getJobState(PersistentTasksCustomMetaData persistentTasksCustomMetaData, TransportStartDatafeedAction.DatafeedTask datafeedTask) {
        return MlTasks.getJobStateModifiedForReassignments(getJobId(datafeedTask), persistentTasksCustomMetaData);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean jobHasOpenAutodetectCommunicator(PersistentTasksCustomMetaData persistentTasksCustomMetaData, TransportStartDatafeedAction.DatafeedTask datafeedTask) {
        JobTaskState state;
        PersistentTasksCustomMetaData.PersistentTask jobTask = MlTasks.getJobTask(getJobId(datafeedTask), persistentTasksCustomMetaData);
        if (jobTask == null || (state = jobTask.getState()) == null || state.isStatusStale(jobTask)) {
            return false;
        }
        return this.autodetectProcessManager.hasOpenAutodetectCommunicator(jobTask.getAllocationId());
    }

    private TimeValue computeNextDelay(long j) {
        return new TimeValue(Math.max(1L, j - this.currentTimeSupplier.get().longValue()));
    }

    boolean isRunning(long j) {
        return this.runningDatafeedsOnThisNode.containsKey(Long.valueOf(j));
    }
}
