package io.vertx.proton.streams.impl;

import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.proton.ProtonLinkOptions;
import io.vertx.proton.ProtonSender;
import io.vertx.proton.impl.ProtonConnectionImpl;
import io.vertx.proton.impl.ProtonDeliveryImpl;
import io.vertx.proton.streams.ProtonSubscriber;
import io.vertx.proton.streams.ProtonSubscriberOptions;
import io.vertx.proton.streams.Tracker;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.proton.amqp.transport.Source;
import org.apache.qpid.proton.amqp.transport.Target;
import org.reactivestreams.Subscription;

/* loaded from: input_file:BOOT-INF/lib/vertx-proton-3.9.1.jar:io/vertx/proton/streams/impl/ProtonSubscriberImpl.class */
public class ProtonSubscriberImpl implements ProtonSubscriber<Tracker> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ProtonSubscriberImpl.class);
    private Subscription sub;
    private Context connCtx;
    private ProtonConnectionImpl conn;
    private ProtonSender sender;
    private final AtomicBoolean subscribed;
    private final AtomicBoolean completed;
    private final AtomicBoolean cancelledSub;
    private boolean emitOnConnectionEnd;
    private long outstandingRequests;

    public ProtonSubscriberImpl(String str, ProtonConnectionImpl protonConnectionImpl) {
        this(str, protonConnectionImpl, new ProtonSubscriberOptions());
    }

    public ProtonSubscriberImpl(String str, ProtonConnectionImpl protonConnectionImpl, ProtonSubscriberOptions protonSubscriberOptions) {
        this.subscribed = new AtomicBoolean();
        this.completed = new AtomicBoolean();
        this.cancelledSub = new AtomicBoolean();
        this.emitOnConnectionEnd = true;
        this.outstandingRequests = 0L;
        this.connCtx = protonConnectionImpl.getContext();
        this.conn = protonConnectionImpl;
        ProtonLinkOptions protonLinkOptions = new ProtonLinkOptions();
        if (protonSubscriberOptions.getLinkName() != null) {
            protonLinkOptions.setLinkName(protonSubscriberOptions.getLinkName());
        }
        this.sender = protonConnectionImpl.createSender(str, protonLinkOptions);
        this.sender.setAutoDrained(false);
    }

    @Override // org.reactivestreams.Subscriber
    public void onSubscribe(Subscription subscription) {
        Objects.requireNonNull(subscription, "A subscription must be supplied");
        if (this.subscribed.getAndSet(true)) {
            LOG.trace("Only a single Subscription is supported and already subscribed, cancelling new subscriber.");
            subscription.cancel();
        } else {
            this.sub = subscription;
            this.connCtx.runOnContext(r4 -> {
                this.conn.addEndHandler(r3 -> {
                    if (this.emitOnConnectionEnd) {
                        cancelSub();
                    }
                });
                this.sender.sendQueueDrainHandler(protonSender -> {
                    if (this.completed.get() || this.cancelledSub.get()) {
                        return;
                    }
                    long credit = protonSender.getCredit() - this.outstandingRequests;
                    if (credit > 0) {
                        this.outstandingRequests += credit;
                        this.sub.request(credit);
                    }
                });
                this.sender.detachHandler(asyncResult -> {
                    cancelSub();
                    this.sender.detach();
                });
                this.sender.closeHandler(asyncResult2 -> {
                    cancelSub();
                    this.sender.close();
                });
                this.sender.openHandler(asyncResult3 -> {
                    LOG.trace("Attach received");
                });
                this.sender.open();
            });
        }
    }

    private void cancelSub() {
        if (this.cancelledSub.getAndSet(true)) {
            return;
        }
        this.sub.cancel();
    }

    @Override // org.reactivestreams.Subscriber
    public void onNext(Tracker tracker) {
        Objects.requireNonNull(tracker, "An element must be supplied when calling onNext");
        if (this.completed.get()) {
            return;
        }
        this.connCtx.runOnContext(r8 -> {
            this.outstandingRequests--;
            TrackerImpl trackerImpl = (TrackerImpl) tracker;
            trackerImpl.setDelivery((ProtonDeliveryImpl) this.sender.send(tracker.message(), protonDelivery -> {
                Handler<Tracker> handler = trackerImpl.handler();
                if (handler != null) {
                    handler.handle(trackerImpl);
                }
            }));
        });
    }

    @Override // org.reactivestreams.Subscriber
    public void onError(Throwable th) {
        Objects.requireNonNull(th, "An error must be supplied when calling onError");
        if (this.completed.getAndSet(true)) {
            return;
        }
        this.connCtx.runOnContext(r4 -> {
            this.sender.sendQueueDrainHandler(null);
            this.sender.detachHandler(null);
            this.sender.closeHandler(null);
            this.sender.close();
        });
    }

    @Override // org.reactivestreams.Subscriber
    public void onComplete() {
        if (this.completed.getAndSet(true)) {
            return;
        }
        this.connCtx.runOnContext(r4 -> {
            this.sender.sendQueueDrainHandler(null);
            this.sender.detachHandler(null);
            this.sender.closeHandler(null);
            this.sender.close();
        });
    }

    @Override // io.vertx.proton.streams.ProtonSubscriber
    public ProtonSubscriber<Tracker> setSource(Source source) {
        this.sender.setSource(source);
        return this;
    }

    @Override // io.vertx.proton.streams.ProtonSubscriber
    public Source getSource() {
        return this.sender.getSource();
    }

    @Override // io.vertx.proton.streams.ProtonSubscriber
    public ProtonSubscriber<Tracker> setTarget(Target target) {
        this.sender.setTarget(target);
        return this;
    }

    @Override // io.vertx.proton.streams.ProtonSubscriber
    public Target getTarget() {
        return this.sender.getTarget();
    }

    public Source getRemoteSource() {
        return this.sender.getRemoteSource();
    }

    public Target getRemoteTarget() {
        return this.sender.getRemoteTarget();
    }

    public boolean isEmitOnConnectionEnd() {
        return this.emitOnConnectionEnd;
    }

    public void setEmitOnConnectionEnd(boolean z) {
        this.emitOnConnectionEnd = z;
    }

    public ProtonSender getLink() {
        return this.sender;
    }
}
