package org.apache.sling.distribution.journal.impl.subscriber;

import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.jackrabbit.util.Text;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.commons.metrics.Timer;
import org.apache.sling.commons.osgi.PropertiesUtil;
import org.apache.sling.distribution.ImportPostProcessException;
import org.apache.sling.distribution.agent.DistributionAgentState;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.JournalAvailable;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.RunnableUtil;
import org.apache.sling.distribution.journal.bookkeeper.BookKeeper;
import org.apache.sling.distribution.journal.bookkeeper.BookKeeperConfig;
import org.apache.sling.distribution.journal.bookkeeper.BookKeeperFactory;
import org.apache.sling.distribution.journal.impl.precondition.Precondition;
import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.journal.shared.Delay;
import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.shared.Strings;
import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.apache.sling.settings.SlingSettingsService;
import org.osgi.framework.BundleContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.metatype.annotations.Designate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ParametersAreNonnullByDefault
@Designate(ocd = SubscriberConfiguration.class, factory = true)
@Component(service = {}, immediate = true, property = {"announceDelay=10000"}, configurationPid = {"org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriberFactory"})
/* loaded from: input_file:org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.class */
public class DistributionSubscriber {
    private static final long PRECONDITION_TIMEOUT = TimeUnit.SECONDS.toMillis(60);
    static long RETRY_DELAY = TimeUnit.SECONDS.toMillis(5);
    static long MAX_RETRY_DELAY = TimeUnit.MINUTES.toMillis(15);
    static long QUEUE_FETCH_DELAY = TimeUnit.SECONDS.toMillis(1);
    private static final Supplier<LongSupplier> catchAllDelays = () -> {
        return Delay.exponential(RETRY_DELAY, MAX_RETRY_DELAY);
    };
    private static final Logger LOG = LoggerFactory.getLogger(DistributionSubscriber.class);

    @Reference(name = "packageBuilder")
    private DistributionPackageBuilder packageBuilder;

    @Reference
    private SlingSettingsService slingSettings;

    @Reference
    private MessagingProvider messagingProvider;

    @Reference
    private Topics topics;

    @Reference
    private JournalAvailable journalAvailable;

    @Reference(name = "precondition")
    private Precondition precondition;

    @Reference
    private DistributionMetricsService distributionMetricsService;

    @Reference
    BookKeeperFactory bookKeeperFactory;

    @Reference
    private SubscriberReadyStore subscriberReadyStore;
    private volatile Closeable idleReadyCheck;
    private volatile IdleCheck idleCheck;
    private Closeable packagePoller;
    private volatile CommandPoller commandPoller;
    private BookKeeper bookKeeper;
    private Announcer announcer;
    private String subAgentName;
    private String pkgType;
    private Thread queueThread;
    private final BlockingQueue<FullMessage<PackageMessage>> messageBuffer = new LinkedBlockingQueue(8);
    private Set<String> queueNames = Collections.emptySet();
    private volatile boolean running = true;
    private LongSupplier catchAllDelay = catchAllDelays.get();
    private final Delay delay = new Delay();

    @Activate
    public void activate(SubscriberConfiguration subscriberConfiguration, BundleContext bundleContext, Map<String, Object> map) {
        String str = (String) Objects.requireNonNull(this.slingSettings.getSlingId());
        this.subAgentName = Strings.requireNotBlank(subscriberConfiguration.name());
        Objects.requireNonNull(subscriberConfiguration);
        Objects.requireNonNull(bundleContext);
        Objects.requireNonNull(this.packageBuilder);
        Objects.requireNonNull(this.slingSettings);
        Objects.requireNonNull(this.messagingProvider);
        Objects.requireNonNull(this.topics);
        Objects.requireNonNull(this.precondition);
        Objects.requireNonNull(this.bookKeeperFactory);
        Integer num = (Integer) map.getOrDefault("idleMillies", Integer.valueOf(SubscriberIdle.DEFAULT_IDLE_TIME_MILLIS));
        if (subscriberConfiguration.editable()) {
            MessagingProvider messagingProvider = this.messagingProvider;
            Topics topics = this.topics;
            String str2 = this.subAgentName;
            Delay delay = this.delay;
            Objects.requireNonNull(delay);
            this.commandPoller = new CommandPoller(messagingProvider, topics, str, str2, delay::signal);
        }
        if (subscriberConfiguration.subscriberIdleCheck()) {
            this.idleCheck = new SubscriberIdle(num.intValue(), SubscriberIdle.DEFAULT_FORCE_IDLE_MILLIS, this.subscriberReadyStore.getReadyHolder(this.subAgentName));
            this.idleReadyCheck = new SubscriberIdleCheck(bundleContext, this.idleCheck);
        } else {
            this.idleCheck = new NoopIdle();
        }
        this.queueNames = getNotEmpty(subscriberConfiguration.agentNames());
        this.pkgType = (String) Objects.requireNonNull(this.packageBuilder.getType());
        this.bookKeeper = this.bookKeeperFactory.create(this.packageBuilder, new BookKeeperConfig(this.subAgentName, str, subscriberConfiguration.editable(), subscriberConfiguration.maxRetries(), subscriberConfiguration.packageHandling(), escapeTopicName(this.messagingProvider.getServerUri(), this.topics.getPackageTopic())), this.messagingProvider.createSender(this.topics.getStatusTopic()), this.messagingProvider.createSender(this.topics.getDiscoveryTopic()));
        long loadOffset = this.bookKeeper.loadOffset() + 1;
        this.packagePoller = this.messagingProvider.createPoller(this.topics.getPackageTopic(), Reset.latest, loadOffset > 0 ? this.messagingProvider.assignTo(loadOffset) : null, new HandlerAdapter[]{HandlerAdapter.create(PackageMessage.class, this::handlePackageMessage)});
        this.queueThread = RunnableUtil.startBackgroundThread(this::processQueue, String.format("Queue Processor for Subscriber agent %s", this.subAgentName));
        this.announcer = new Announcer(str, this.subAgentName, this.queueNames, this.messagingProvider.createSender(this.topics.getDiscoveryTopic()), this.bookKeeper, subscriberConfiguration.maxRetries(), subscriberConfiguration.editable(), PropertiesUtil.toInteger(map.get("announceDelay"), SubscriberIdle.DEFAULT_IDLE_TIME_MILLIS));
        LOG.info("Started Subscriber agent {} at offset {}, subscribed to agent names {}", new Object[]{this.subAgentName, Long.valueOf(loadOffset), this.queueNames});
    }

    public static String escapeTopicName(URI uri, String str) {
        return String.format("%s%s_%s", uri.getHost(), escape(uri.getPath()), escape(str));
    }

    private static String escape(String str) {
        return Text.escapeIllegalJcrChars(str.replace("/", "_"));
    }

    private Set<String> getNotEmpty(String[] strArr) {
        return (Set) Arrays.stream(strArr).filter((v0) -> {
            return StringUtils.isNotBlank(v0);
        }).collect(Collectors.toSet());
    }

    @Deactivate
    public void deactivate() {
        IOUtils.closeQuietly(new Closeable[]{this.announcer, this.bookKeeper, this.packagePoller, this.idleReadyCheck, this.idleCheck, this.commandPoller});
        this.running = false;
        try {
            this.queueThread.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOG.info("Join interrupted");
        }
        LOG.info("Stopped Subscriber agent {}, subscribed to Publisher agent names {} with package builder {}", new Object[]{this.subAgentName, this.queueNames, this.pkgType});
    }

    public DistributionAgentState getState() {
        return this.bookKeeper.getPackageRetries().getSum() > 0 ? DistributionAgentState.BLOCKED : this.messageBuffer.size() > 0 ? DistributionAgentState.RUNNING : DistributionAgentState.IDLE;
    }

    private void handlePackageMessage(MessageInfo messageInfo, PackageMessage packageMessage) {
        if (shouldEnqueue(messageInfo, packageMessage)) {
            enqueue(new FullMessage<>(messageInfo, packageMessage));
            return;
        }
        try {
            this.bookKeeper.skipPackage(messageInfo.getOffset());
        } catch (PersistenceException | LoginException e) {
            LOG.warn("Error marking distribution package {} at offset={} as skipped", new Object[]{packageMessage, Long.valueOf(messageInfo.getOffset()), e});
        }
    }

    private boolean shouldEnqueue(MessageInfo messageInfo, PackageMessage packageMessage) {
        if (!this.queueNames.contains(packageMessage.getPubAgentName())) {
            LOG.info("Skipping distribution package {} at offset={} (not subscribed)", packageMessage, Long.valueOf(messageInfo.getOffset()));
            return false;
        }
        if (this.pkgType.equals(packageMessage.getPkgType())) {
            return true;
        }
        LOG.warn("Skipping distribution package {} at offset={} (bad pkgType)", packageMessage, Long.valueOf(messageInfo.getOffset()));
        return false;
    }

    private void enqueue(FullMessage<PackageMessage> fullMessage) {
        while (this.running) {
            try {
                if (this.messageBuffer.offer(fullMessage, 1000L, TimeUnit.MILLISECONDS)) {
                    this.distributionMetricsService.getItemsBufferSize().increment();
                    return;
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        throw new RuntimeException();
    }

    private void processQueue() {
        LOG.info("Started Queue processor");
        while (this.running) {
            try {
                fetchAndProcessQueueItem();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                LOG.debug(e.getMessage());
            } catch (PreConditionTimeoutException e2) {
                LOG.info(e2.getMessage());
                this.delay.await(RETRY_DELAY);
            } catch (Exception e3) {
                LOG.error("Error processing queue item", e3);
                this.delay.await(this.catchAllDelay.getAsLong());
            }
        }
        LOG.info("Stopped Queue processor");
    }

    private void fetchAndProcessQueueItem() throws InterruptedException, IOException, LoginException, DistributionException, ImportPostProcessException {
        blockingSendStoredStatus();
        FullMessage<PackageMessage> blockingPeekQueueItem = blockingPeekQueueItem();
        Timer.Context time = this.distributionMetricsService.getProcessQueueItemDuration().time();
        try {
            processQueueItem(blockingPeekQueueItem);
            this.messageBuffer.remove();
            this.distributionMetricsService.getItemsBufferSize().decrement();
            this.catchAllDelay = catchAllDelays.get();
            if (time != null) {
                time.close();
            }
        } catch (Throwable th) {
            if (time != null) {
                try {
                    time.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void blockingSendStoredStatus() throws InterruptedException, IOException {
        Timer.Context time = this.distributionMetricsService.getSendStoredStatusDuration().time();
        int i = 0;
        while (this.running) {
            try {
                if (this.bookKeeper.sendStoredStatus(i)) {
                    if (time != null) {
                        time.close();
                        return;
                    }
                    return;
                }
                i++;
            } catch (Throwable th) {
                if (time != null) {
                    try {
                        time.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (time != null) {
            time.close();
        }
        throw new InterruptedException("Shutting down");
    }

    private FullMessage<PackageMessage> blockingPeekQueueItem() throws InterruptedException {
        while (this.running) {
            FullMessage<PackageMessage> peek = this.messageBuffer.peek();
            if (peek != null) {
                return peek;
            }
            this.delay.await(QUEUE_FETCH_DELAY);
        }
        throw new InterruptedException("Shutting down");
    }

    private void processQueueItem(FullMessage<PackageMessage> fullMessage) throws PersistenceException, LoginException, DistributionException, ImportPostProcessException {
        MessageInfo info = fullMessage.getInfo();
        PackageMessage packageMessage = (PackageMessage) fullMessage.getMessage();
        boolean shouldSkip = shouldSkip(info.getOffset());
        PackageMessage.ReqType reqType = packageMessage.getReqType();
        try {
            this.idleCheck.busy(this.bookKeeper.getRetries(packageMessage.getPubAgentName()));
            if (shouldSkip) {
                this.bookKeeper.removePackage(packageMessage, info.getOffset());
            } else if (reqType == PackageMessage.ReqType.INVALIDATE) {
                this.bookKeeper.invalidateCache(packageMessage, info.getOffset());
            } else {
                this.bookKeeper.importPackage(packageMessage, info.getOffset(), info.getCreateTime());
            }
        } finally {
            this.idleCheck.idle();
        }
    }

    private boolean shouldSkip(long j) {
        return isCleared(j) || isSkipped(j);
    }

    private boolean isCleared(long j) {
        return this.commandPoller != null && this.commandPoller.isCleared(j);
    }

    private boolean isSkipped(long j) {
        return waitPrecondition(j) == Precondition.Decision.SKIP;
    }

    private Precondition.Decision waitPrecondition(long j) {
        long currentTimeMillis = System.currentTimeMillis() + PRECONDITION_TIMEOUT;
        while (System.currentTimeMillis() < currentTimeMillis && this.running) {
            Precondition.Decision canProcess = this.precondition.canProcess(this.subAgentName, j);
            if (canProcess != Precondition.Decision.WAIT) {
                return canProcess;
            }
            this.delay.await(100L);
        }
        throw new PreConditionTimeoutException("Timeout waiting for distribution package at offset=" + j + " on status topic");
    }
}
