package com.mongodb.stitch.core.services.mongodb.remote.sync.internal;

import com.circleblue.ecrmodel.storage.MongoDBOperation;
import com.mongodb.MongoNamespace;
import com.mongodb.stitch.core.internal.common.AuthMonitor;
import com.mongodb.stitch.core.internal.common.BsonUtils;
import com.mongodb.stitch.core.internal.common.Callback;
import com.mongodb.stitch.core.internal.common.OperationResult;
import com.mongodb.stitch.core.internal.net.NetworkMonitor;
import com.mongodb.stitch.core.internal.net.StitchEvent;
import com.mongodb.stitch.core.internal.net.Stream;
import com.mongodb.stitch.core.services.internal.CoreStitchServiceClient;
import com.mongodb.stitch.core.services.mongodb.remote.CompactChangeEvent;
import com.mongodb.stitch.core.services.mongodb.remote.internal.ResultDecoders;
import java.io.Closeable;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import javax.annotation.Nullable;
import org.bson.BsonDocument;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.codecs.BsonDocumentCodec;
import org.bson.codecs.Codec;
import org.bson.diagnostics.Logger;
import org.bson.diagnostics.Loggers;

/* loaded from: classes3.dex */
public class NamespaceChangeStreamListener implements Closeable {
    private static final Codec<BsonDocument> BSON_DOCUMENT_CODEC = new BsonDocumentCodec();
    private final AuthMonitor authMonitor;
    private Stream<CompactChangeEvent<BsonDocument>> currentStream;
    private final Logger logger;
    private final MongoNamespace namespace;
    private final NetworkMonitor networkMonitor;
    private final NamespaceSynchronizationConfig nsConfig;
    private ReadWriteLock nsLock;
    private Thread runnerThread;
    private final CoreStitchServiceClient service;
    private final Map<BsonValue, CompactChangeEvent<BsonDocument>> events = new HashMap();
    private final Set<Callback<CompactChangeEvent<BsonDocument>, Object>> watchers = new HashSet();

    /* JADX INFO: Access modifiers changed from: package-private */
    public NamespaceChangeStreamListener(MongoNamespace mongoNamespace, NamespaceSynchronizationConfig namespaceSynchronizationConfig, CoreStitchServiceClient coreStitchServiceClient, NetworkMonitor networkMonitor, AuthMonitor authMonitor, ReadWriteLock readWriteLock) {
        this.namespace = mongoNamespace;
        this.nsConfig = namespaceSynchronizationConfig;
        this.service = coreStitchServiceClient;
        this.networkMonitor = networkMonitor;
        this.authMonitor = authMonitor;
        this.nsLock = readWriteLock;
        this.logger = Loggers.getLogger(String.format("NamespaceChangeStreamListener-%s", mongoNamespace.toString()));
    }

    private void cancel() {
        Stream<CompactChangeEvent<BsonDocument>> stream = this.currentStream;
        if (stream != null) {
            stream.cancel();
        }
    }

    private void clearWatchers() {
        Iterator<Callback<CompactChangeEvent<BsonDocument>, Object>> it = this.watchers.iterator();
        while (it.hasNext()) {
            it.next().onComplete(OperationResult.failedResultOf(null));
        }
        this.watchers.clear();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addWatcher(Callback<CompactChangeEvent<BsonDocument>, Object> callback) {
        this.watchers.add(callback);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        Stream<CompactChangeEvent<BsonDocument>> stream = this.currentStream;
        if (stream != null) {
            try {
                try {
                    stream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            } finally {
                this.currentStream = null;
            }
        }
        clearWatchers();
    }

    public Map<BsonValue, CompactChangeEvent<BsonDocument>> getEvents() {
        this.nsLock.readLock().lock();
        try {
            HashMap hashMap = new HashMap(this.events);
            this.nsLock.readLock().unlock();
            this.nsLock.writeLock().lock();
            try {
                this.events.clear();
                return hashMap;
            } finally {
                this.nsLock.writeLock().unlock();
            }
        } catch (Throwable th) {
            this.nsLock.readLock().unlock();
            throw th;
        }
    }

    @Nullable
    public CompactChangeEvent<BsonDocument> getUnprocessedEventForDocumentId(BsonValue bsonValue) {
        this.nsLock.readLock().lock();
        try {
            CompactChangeEvent<BsonDocument> compactChangeEvent = this.events.get(bsonValue);
            this.nsLock.readLock().unlock();
            this.nsLock.writeLock().lock();
            try {
                this.events.remove(bsonValue);
                return compactChangeEvent;
            } finally {
                this.nsLock.writeLock().unlock();
            }
        } catch (Throwable th) {
            this.nsLock.readLock().unlock();
            throw th;
        }
    }

    public synchronized boolean isOpen() {
        boolean z;
        Stream<CompactChangeEvent<BsonDocument>> stream = this.currentStream;
        if (stream != null) {
            z = stream.isOpen();
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean openStream() throws InterruptedException, IOException {
        this.logger.info("stream START");
        Set<BsonValue> synchronizedDocumentIds = this.nsConfig.getSynchronizedDocumentIds();
        boolean z = false;
        if (!this.networkMonitor.isConnected()) {
            this.logger.info("stream END - Network disconnected");
            return false;
        }
        if (synchronizedDocumentIds.isEmpty()) {
            this.logger.info("stream END - No synchronized documents");
            return false;
        }
        this.nsLock.writeLock().lockInterruptibly();
        try {
            if (!this.authMonitor.isLoggedIn()) {
                this.logger.info("stream END - Logged out");
                return false;
            }
            Document document = new Document();
            document.put("database", (Object) this.namespace.getDatabaseName());
            document.put(MongoDBOperation.FN_COLLECTION, (Object) this.namespace.getCollectionName());
            document.put("ids", (Object) synchronizedDocumentIds);
            document.put("useCompactEvents", (Object) true);
            Stream<CompactChangeEvent<BsonDocument>> streamFunction = this.service.streamFunction("watch", Collections.singletonList(document), ResultDecoders.compactChangeEventDecoder(BSON_DOCUMENT_CODEC));
            this.currentStream = streamFunction;
            if (streamFunction != null && streamFunction.isOpen()) {
                this.nsConfig.setStale(true);
                z = true;
            }
            return z;
        } finally {
            this.nsLock.writeLock().unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeWatcher(Callback<CompactChangeEvent<BsonDocument>, Object> callback) {
        this.watchers.remove(callback);
    }

    public void start() {
        this.nsLock.writeLock().lock();
        try {
            if (this.runnerThread != null) {
                return;
            }
            Thread thread = new Thread(new NamespaceChangeStreamRunner(new WeakReference(this), this.networkMonitor, this.logger));
            this.runnerThread = thread;
            thread.start();
        } finally {
            this.nsLock.writeLock().unlock();
        }
    }

    public void stop() {
        Thread thread = this.runnerThread;
        if (thread == null) {
            return;
        }
        thread.interrupt();
        this.nsLock.writeLock().lock();
        try {
            try {
            } finally {
                this.nsLock.writeLock().unlock();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        if (this.runnerThread != null) {
            cancel();
            close();
            while (this.runnerThread.isAlive()) {
                this.runnerThread.interrupt();
                try {
                    this.runnerThread.join(1000L);
                } catch (Exception e2) {
                    e2.printStackTrace();
                }
            }
            this.runnerThread = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void storeNextEvent() {
        StitchEvent<CompactChangeEvent<BsonDocument>> nextEvent;
        try {
            Stream<CompactChangeEvent<BsonDocument>> stream = this.currentStream;
            if (stream == null || !stream.isOpen() || (nextEvent = this.currentStream.nextEvent()) == null) {
                return;
            }
            if (nextEvent.getError() != null) {
                throw nextEvent.getError();
            }
            if (nextEvent.getData() == null) {
                return;
            }
            this.logger.debug(String.format(Locale.US, "NamespaceChangeStreamListener::stream ns=%s event found: op=%s documentKey=%s", this.nsConfig.getNamespace(), nextEvent.getData().getOperationType(), nextEvent.getData().getDocumentKey().toJson()));
            this.nsLock.writeLock().lockInterruptibly();
            try {
                this.events.put(BsonUtils.getDocumentId(nextEvent.getData().getDocumentKey()), nextEvent.getData());
                this.nsLock.writeLock().unlock();
                Iterator<Callback<CompactChangeEvent<BsonDocument>, Object>> it = this.watchers.iterator();
                while (it.hasNext()) {
                    it.next().onComplete(OperationResult.successfulResultOf(nextEvent.getData()));
                }
            } catch (Throwable th) {
                this.nsLock.writeLock().unlock();
                throw th;
            }
        } catch (IOException e) {
            e = e;
            this.logger.info(String.format(Locale.US, "NamespaceChangeStreamListener::stream ns=%s interrupted on fetching next event: %s", this.nsConfig.getNamespace(), e));
            this.logger.info("stream END – INTERRUPTED");
            Thread.currentThread().interrupt();
        } catch (InterruptedException e2) {
            e = e2;
            this.logger.info(String.format(Locale.US, "NamespaceChangeStreamListener::stream ns=%s interrupted on fetching next event: %s", this.nsConfig.getNamespace(), e));
            this.logger.info("stream END – INTERRUPTED");
            Thread.currentThread().interrupt();
        } catch (Exception e3) {
            this.logger.error(String.format(Locale.US, "NamespaceChangeStreamListener::stream ns=%s exception on fetching next event: %s", this.nsConfig.getNamespace(), e3), e3);
            this.logger.info("stream END – EXCEPTION");
            boolean isInterrupted = Thread.currentThread().isInterrupted();
            close();
            if (isInterrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }
}
