package com.control4.core.broker;

import android.annotation.SuppressLint;
import androidx.annotation.Nullable;
import com.control4.api.project.ProjectService;
import com.control4.api.project.data.item.Subscription;
import com.control4.core.project.variable.Variable;
import com.control4.log.Log;
import com.control4.rx.RxUtil;
import io.reactivex.Completable;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.ObservableSource;
import io.reactivex.Single;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.reactivex.schedulers.Schedulers;
import io.socket.client.Socket;
import io.socket.emitter.Emitter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.json.JSONArray;
import org.json.JSONException;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: classes.dex */
public class BrokerVariableOnSubscribe implements ObservableOnSubscribe<Object>, Disposable {
    static final String EVENT_END_SUBSCRIPTION = "endSubscription";
    static final String EVENT_START_SUBSCRIPTION = "startSubscription";
    public static final String TAG = "BrokerVariableOnSubscribe";
    private Subscription brokerSubscription;
    private Disposable brokerUnsubscribeDelay;
    private final long cacheRemoveDelay;
    private final TimeUnit cacheRemoveUnit;
    private String clientId;
    private final Observable<String> clientIdStream;
    private Disposable clientIdSubscription;
    private CompositeDisposable disposables;
    private final Emitter.Listener emitterListener;
    private final boolean isDataToUI;
    private boolean isSubscribed;
    private final List<Long> itemIds;
    private final Action removeFromCache;
    private final ProjectService service;
    private final Observable<Socket> socketStream;
    private ObservableEmitter<Object> subscriber;
    private Map<Long, Variable> variableCache;
    private VariableMapFunc variableMapper;
    private final List<String> variableNames;

    /* JADX INFO: Access modifiers changed from: package-private */
    public BrokerVariableOnSubscribe(boolean z, Observable<String> observable, List<Long> list, List<String> list2, Observable<Socket> observable2, ProjectService projectService, Action action, long j, TimeUnit timeUnit) {
        this(z, observable, list, list2, observable2, projectService, action, j, timeUnit, null, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @SuppressLint({"UseSparseArrays"})
    public BrokerVariableOnSubscribe(boolean z, Observable<String> observable, List<Long> list, List<String> list2, Observable<Socket> observable2, ProjectService projectService, Action action, long j, TimeUnit timeUnit, @Nullable VariableMapFunc variableMapFunc, Map<Long, Variable> map) {
        this.disposables = new CompositeDisposable();
        this.emitterListener = new Emitter.Listener() { // from class: com.control4.core.broker.-$$Lambda$BrokerVariableOnSubscribe$umm3BVu2hfTtm9JhzAaQVdOVlTE
            @Override // io.socket.emitter.Emitter.Listener
            public final void call(Object[] objArr) {
                BrokerVariableOnSubscribe.this.emitValues(objArr);
            }
        };
        this.isDataToUI = z;
        this.clientIdStream = observable;
        this.itemIds = new ArrayList(list);
        this.variableMapper = variableMapFunc;
        this.variableCache = map;
        this.variableNames = !z ? new ArrayList(list2) : null;
        this.socketStream = observable2;
        this.service = projectService;
        this.removeFromCache = action;
        this.cacheRemoveDelay = j;
        this.cacheRemoveUnit = timeUnit;
    }

    private void cancelUnsubscribeDelay() {
        Disposable disposable = this.brokerUnsubscribeDelay;
        if (disposable != null && !disposable.isDisposed()) {
            this.brokerUnsubscribeDelay.dispose();
        }
        this.brokerUnsubscribeDelay = null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    @SuppressLint({"CheckResult"})
    public void cleanupSubscription() {
        Disposable disposable = this.clientIdSubscription;
        if (disposable != null && !disposable.isDisposed()) {
            this.clientIdSubscription.dispose();
            this.clientIdSubscription = null;
        }
        if (this.brokerSubscription != null) {
            this.socketStream.take(1L).subscribe(new Consumer() { // from class: com.control4.core.broker.-$$Lambda$BrokerVariableOnSubscribe$KjbjRpIhjDNscERNmfGIr9-0HA4
                @Override // io.reactivex.functions.Consumer
                public final void accept(Object obj) {
                    BrokerVariableOnSubscribe.this.lambda$cleanupSubscription$5$BrokerVariableOnSubscribe((Socket) obj);
                }
            }, new Consumer() { // from class: com.control4.core.broker.-$$Lambda$BrokerVariableOnSubscribe$m-iL8flYQfIR93QaMx-Se76CrAM
                @Override // io.reactivex.functions.Consumer
                public final void accept(Object obj) {
                    Log.error(BrokerVariableOnSubscribe.TAG, "Cannot unsubscribe from broker", (Throwable) obj);
                }
            });
            this.brokerSubscription = null;
        }
        try {
            this.removeFromCache.run();
        } catch (Exception e) {
            Log.error(TAG, "Failed to remove from cache.", e);
        }
        this.disposables.clear();
        cancelUnsubscribeDelay();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean clientIdChanged(String str) {
        String str2 = this.clientId;
        return str2 == null || !str2.equals(str);
    }

    private void emitArray(JSONArray jSONArray) {
        for (int i = 0; i < jSONArray.length(); i++) {
            try {
                emitOnNext(jSONArray.get(i));
            } catch (JSONException e) {
                Log.debug(TAG, "Error processing variable array: " + e.getMessage());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void emitOnError(Throwable th) {
        if (this.isSubscribed && !this.subscriber.isDisposed()) {
            this.subscriber.tryOnError(th);
        }
        this.isSubscribed = false;
        cleanupSubscription();
    }

    private void emitOnNext(Object obj) {
        if (this.isDataToUI) {
            if (this.isSubscribed) {
                this.subscriber.onNext(obj);
                return;
            }
            return;
        }
        try {
            Variable apply = this.variableMapper.apply(obj);
            this.variableCache.put(Long.valueOf(apply.id), apply);
            if (this.isSubscribed) {
                this.subscriber.onNext(apply);
            }
        } catch (Exception e) {
            emitOnError(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void emitValues(Object[] objArr) {
        if (objArr[0] instanceof JSONArray) {
            emitArray((JSONArray) objArr[0]);
        } else if (objArr[0] != null) {
            emitOnNext(objArr[0]);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Subscription getSubscription() throws IOException {
        return this.itemIds.size() == 1 ? this.isDataToUI ? this.service.getItemDataToUiSubscription(this.itemIds.get(0).longValue(), this.clientId) : this.service.getItemVariableSubscription(this.itemIds.get(0).longValue(), this.clientId, this.variableNames) : this.isDataToUI ? this.service.getDataToUiSubscription(this.itemIds, this.clientId) : this.service.getVariableSubscription(this.itemIds, this.clientId, this.variableNames);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static /* synthetic */ void lambda$subscribeToBroker$4(Subscription subscription) throws Exception {
    }

    private void onUnsubscribe() {
        this.brokerUnsubscribeDelay = Completable.timer(this.cacheRemoveDelay, this.cacheRemoveUnit).subscribe(new Action() { // from class: com.control4.core.broker.-$$Lambda$BrokerVariableOnSubscribe$yHkWLKfbwmVJt-V1LDRGzm7j0LI
            @Override // io.reactivex.functions.Action
            public final void run() {
                BrokerVariableOnSubscribe.this.cleanupSubscription();
            }
        });
    }

    private void subscribe() {
        cancelUnsubscribeDelay();
        this.clientIdSubscription = this.clientIdStream.filter(new Predicate() { // from class: com.control4.core.broker.-$$Lambda$BrokerVariableOnSubscribe$KwRbPfTkOaSV0qEmUaz0m7PfvEg
            @Override // io.reactivex.functions.Predicate
            public final boolean test(Object obj) {
                boolean clientIdChanged;
                clientIdChanged = BrokerVariableOnSubscribe.this.clientIdChanged((String) obj);
                return clientIdChanged;
            }
        }).subscribe(new Consumer() { // from class: com.control4.core.broker.-$$Lambda$BrokerVariableOnSubscribe$fI0WJEYDDdwkBgJ4BG5mGZRdOOg
            @Override // io.reactivex.functions.Consumer
            public final void accept(Object obj) {
                BrokerVariableOnSubscribe.this.lambda$subscribe$0$BrokerVariableOnSubscribe((String) obj);
            }
        });
    }

    private void subscribeToBroker() {
        if (this.brokerSubscription != null) {
            return;
        }
        this.disposables.add(this.socketStream.take(1L).observeOn(Schedulers.io()).doOnError(new Consumer() { // from class: com.control4.core.broker.-$$Lambda$BrokerVariableOnSubscribe$ePdbhXyCnimP0WNUwe7R_QfCJg4
            @Override // io.reactivex.functions.Consumer
            public final void accept(Object obj) {
                Log.error(BrokerVariableOnSubscribe.TAG, "Unable to get valid socket", (Throwable) obj);
            }
        }).switchMap(new Function() { // from class: com.control4.core.broker.-$$Lambda$BrokerVariableOnSubscribe$hhME2mHoaSwXuIf90iYhu-XfnAQ
            @Override // io.reactivex.functions.Function
            public final Object apply(Object obj) {
                return BrokerVariableOnSubscribe.this.lambda$subscribeToBroker$3$BrokerVariableOnSubscribe((Socket) obj);
            }
        }).subscribe(new Consumer() { // from class: com.control4.core.broker.-$$Lambda$BrokerVariableOnSubscribe$K_iOf2sBRa4fN3pmdFscKLxL8ZM
            @Override // io.reactivex.functions.Consumer
            public final void accept(Object obj) {
                BrokerVariableOnSubscribe.lambda$subscribeToBroker$4((Subscription) obj);
            }
        }, new Consumer() { // from class: com.control4.core.broker.-$$Lambda$BrokerVariableOnSubscribe$6iTMcijQTmpcn9BZjKEEGhjcoNU
            @Override // io.reactivex.functions.Consumer
            public final void accept(Object obj) {
                BrokerVariableOnSubscribe.this.emitOnError((Throwable) obj);
            }
        }));
    }

    @Override // io.reactivex.disposables.Disposable
    public void dispose() {
        if (this.isSubscribed) {
            onUnsubscribe();
            this.isSubscribed = false;
        }
    }

    @Override // io.reactivex.disposables.Disposable
    public boolean isDisposed() {
        return !this.isSubscribed;
    }

    public /* synthetic */ void lambda$cleanupSubscription$5$BrokerVariableOnSubscribe(Socket socket) throws Exception {
        socket.emit(EVENT_END_SUBSCRIPTION, this.brokerSubscription.subscriptionId);
        socket.off(this.brokerSubscription.subscriptionId, this.emitterListener);
    }

    public /* synthetic */ void lambda$null$2$BrokerVariableOnSubscribe(Socket socket, Subscription subscription) throws Exception {
        this.brokerSubscription = subscription;
        String str = subscription.subscriptionId;
        socket.on(str, this.emitterListener);
        socket.emit(EVENT_START_SUBSCRIPTION, str);
        Log.debug(TAG, "Starting subscription with id " + str);
    }

    public /* synthetic */ void lambda$subscribe$0$BrokerVariableOnSubscribe(String str) throws Exception {
        if (str != null) {
            this.clientId = str;
            this.brokerSubscription = null;
            subscribeToBroker();
        }
    }

    public /* synthetic */ ObservableSource lambda$subscribeToBroker$3$BrokerVariableOnSubscribe(final Socket socket) throws Exception {
        return Single.fromCallable(new Callable() { // from class: com.control4.core.broker.-$$Lambda$BrokerVariableOnSubscribe$McXDC9tpKupEVQDpBy40XSypZ24
            @Override // java.util.concurrent.Callable
            public final Object call() {
                Subscription subscription;
                subscription = BrokerVariableOnSubscribe.this.getSubscription();
                return subscription;
            }
        }).retryWhen(RxUtil.exponentialBackoff()).doOnSuccess(new Consumer() { // from class: com.control4.core.broker.-$$Lambda$BrokerVariableOnSubscribe$5k_szUQnYBsS_DIptfUfa7f3kHo
            @Override // io.reactivex.functions.Consumer
            public final void accept(Object obj) {
                BrokerVariableOnSubscribe.this.lambda$null$2$BrokerVariableOnSubscribe(socket, (Subscription) obj);
            }
        }).toObservable();
    }

    @Override // io.reactivex.ObservableOnSubscribe
    public void subscribe(ObservableEmitter<Object> observableEmitter) {
        this.isSubscribed = true;
        this.subscriber = observableEmitter;
        observableEmitter.setDisposable(this);
        subscribe();
    }
}
