package org.apache.sling.distribution.journal.impl.publisher;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.LongStream;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.lang3.StringUtils;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.bookkeeper.LocalStore;
import org.apache.sling.distribution.journal.impl.discovery.TopologyChangeHandler;
import org.apache.sling.distribution.journal.impl.discovery.TopologyViewDiff;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
import org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriber;
import org.apache.sling.distribution.journal.messages.PackageDistributedMessage;
import org.apache.sling.distribution.journal.queue.OffsetQueue;
import org.apache.sling.distribution.journal.queue.PubQueueProvider;
import org.apache.sling.distribution.journal.queue.QueueItemFactory;
import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ParametersAreNonnullByDefault
/* loaded from: input_file:org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.class */
public class PackageDistributedNotifier implements TopologyChangeHandler {
    private static final Logger LOG = LoggerFactory.getLogger(PackageDistributedNotifier.class);
    public static final String STORE_TYPE_OFFSETS = "lastRaisedEventOffset";
    private final ConcurrentMap<String, Long> lastDistributedOffsets = new ConcurrentHashMap();
    private final ConcurrentMap<String, LocalStore> localStores = new ConcurrentHashMap();
    private final EventAdmin eventAdmin;
    private final PubQueueProvider pubQueueCacheService;
    private final MessagingProvider messagingProvider;
    private final Topics topics;
    private final ResourceResolverFactory resolverFactory;
    private Consumer<PackageDistributedMessage> sender;
    private final boolean sendMsg;
    private final boolean ensureEvent;

    public PackageDistributedNotifier(EventAdmin eventAdmin, PubQueueProvider pubQueueProvider, MessagingProvider messagingProvider, Topics topics, ResourceResolverFactory resourceResolverFactory, boolean z) {
        this.eventAdmin = eventAdmin;
        this.pubQueueCacheService = pubQueueProvider;
        this.messagingProvider = messagingProvider;
        this.topics = topics;
        this.resolverFactory = resourceResolverFactory;
        this.ensureEvent = z;
        this.sendMsg = StringUtils.isNotBlank(topics.getEventTopic());
        if (this.sendMsg) {
            this.sender = messagingProvider.createSender(topics.getEventTopic());
        }
        LOG.info("Started package distributed notifier with event message topic {}", topics.getEventTopic());
    }

    @Override // org.apache.sling.distribution.journal.impl.discovery.TopologyChangeHandler
    public void changed(TopologyViewDiff topologyViewDiff) {
        topologyViewDiff.getProcessedOffsets().forEach(this::processOffsets);
    }

    private void processOffsets(String str, Supplier<LongStream> supplier) {
        long asLong = supplier.get().findFirst().getAsLong();
        if (this.ensureEvent) {
            asLong = Math.min(supplier.get().findFirst().getAsLong(), this.lastDistributedOffsets.computeIfAbsent(str, this::getLastStoredDistributedOffset).longValue());
        }
        OffsetQueue<DistributionQueueItem> offsetQueue = this.pubQueueCacheService.getOffsetQueue(str, asLong);
        LongStream longStream = supplier.get();
        Objects.requireNonNull(offsetQueue);
        longStream.mapToObj(offsetQueue::getItem).filter((v0) -> {
            return Objects.nonNull(v0);
        }).forEach(distributionQueueItem -> {
            notifyDistributed(str, distributionQueueItem);
        });
    }

    private long getLastStoredDistributedOffset(String str) {
        return ((Long) this.localStores.computeIfAbsent(str, this::newLocalStore).load(STORE_TYPE_OFFSETS, (String) Long.MAX_VALUE)).longValue();
    }

    private LocalStore newLocalStore(String str) {
        return new LocalStore(this.resolverFactory, DistributionSubscriber.escapeTopicName(this.messagingProvider.getServerUri(), this.topics.getPackageTopic()), str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void storeLastDistributedOffset() {
        for (Map.Entry<String, LocalStore> entry : this.localStores.entrySet()) {
            String key = entry.getKey();
            LocalStore value = entry.getValue();
            long longValue = this.lastDistributedOffsets.getOrDefault(key, Long.MAX_VALUE).longValue();
            if (longValue != ((Long) value.load(STORE_TYPE_OFFSETS, (String) Long.MAX_VALUE)).longValue()) {
                try {
                    value.store(STORE_TYPE_OFFSETS, Long.valueOf(longValue));
                    LOG.info("The offset={} has been stored for the pubAgentName={}", Long.valueOf(longValue), key);
                } catch (Exception e) {
                    LOG.warn("Exception when storing the last distributed offset in the repository", e);
                }
            }
        }
    }

    protected void notifyDistributed(String str, DistributionQueueItem distributionQueueItem) {
        LOG.debug("Sending distributed notifications for pubAgentName={}, pkgId={}", str, distributionQueueItem.getPackageId());
        sendEvt(str, distributionQueueItem);
        if (this.sendMsg) {
            sendMsg(str, distributionQueueItem);
        }
    }

    private void sendMsg(String str, DistributionQueueItem distributionQueueItem) {
        try {
            this.sender.accept(createDistributedMessage(str, distributionQueueItem));
        } catch (Exception e) {
            LOG.warn("Exception when sending package distributed message for pubAgentName={}, pkgId={}", new Object[]{str, distributionQueueItem.getPackageId(), e});
        }
    }

    private PackageDistributedMessage createDistributedMessage(String str, DistributionQueueItem distributionQueueItem) {
        return PackageDistributedMessage.builder().pubAgentName(str).packageId(distributionQueueItem.getPackageId()).offset(((Long) distributionQueueItem.get(QueueItemFactory.RECORD_OFFSET)).longValue()).paths((String[]) distributionQueueItem.get("request.paths")).deepPaths((String[]) distributionQueueItem.get("request.deepPaths")).build();
    }

    private void sendEvt(String str, DistributionQueueItem distributionQueueItem) {
        try {
            this.eventAdmin.sendEvent(DistributionEvent.eventPackageDistributed(distributionQueueItem, str));
            this.lastDistributedOffsets.put(str, (Long) distributionQueueItem.getOrDefault(QueueItemFactory.RECORD_OFFSET, Long.MAX_VALUE));
        } catch (Exception e) {
            LOG.warn("Exception when sending package distributed event for pubAgentName={}, pkgId={}", new Object[]{str, distributionQueueItem.getPackageId(), e});
        }
    }
}
