package io.vertx.proton.streams.impl;

import io.vertx.core.impl.ContextInternal;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.proton.ProtonLinkOptions;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.impl.ProtonConnectionImpl;
import io.vertx.proton.streams.Delivery;
import io.vertx.proton.streams.ProtonPublisher;
import io.vertx.proton.streams.ProtonPublisherOptions;
import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transport.Target;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:BOOT-INF/lib/vertx-proton-3.9.1.jar:io/vertx/proton/streams/impl/ProtonPublisherImpl.class */
public class ProtonPublisherImpl implements ProtonPublisher<Delivery> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ProtonPublisherImpl.class);
    private static final Symbol SHARED = Symbol.valueOf("shared");
    private static final Symbol GLOBAL = Symbol.valueOf("global");
    private ContextInternal connCtx;
    private final ProtonConnectionImpl conn;
    private AmqpSubscription subscription;
    private ProtonReceiver receiver;
    private int maxOutstandingCredit;
    private boolean durable;
    private final AtomicBoolean subscribed = new AtomicBoolean();
    private boolean emitOnConnectionEnd = true;

    /* loaded from: input_file:BOOT-INF/lib/vertx-proton-3.9.1.jar:io/vertx/proton/streams/impl/ProtonPublisherImpl$AmqpSubscription.class */
    public class AmqpSubscription implements Subscription {
        private Subscriber<? super Delivery> subcriber;
        private final AtomicBoolean cancelled = new AtomicBoolean();
        private final AtomicBoolean completed = new AtomicBoolean();
        private long outstandingRequests = 0;

        public AmqpSubscription(Subscriber<? super Delivery> subscriber) {
            this.subcriber = subscriber;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean onNextWrapper(Delivery delivery) {
            int min;
            if (this.completed.get() || this.cancelled.get()) {
                ProtonPublisherImpl.LOG.trace("skipped calling onNext, already completed or cancelled");
                return false;
            }
            ProtonPublisherImpl.LOG.trace("calling onNext");
            this.subcriber.onNext(delivery);
            this.outstandingRequests--;
            if (this.cancelled.get()) {
                return true;
            }
            int credit = ProtonPublisherImpl.this.receiver.getCredit();
            if (credit >= ProtonPublisherImpl.this.maxOutstandingCredit * 0.5d || this.outstandingRequests <= credit || (min = ((int) Math.min(this.outstandingRequests, ProtonPublisherImpl.this.maxOutstandingCredit)) - credit) <= 0) {
                return true;
            }
            ProtonPublisherImpl.LOG.trace("Updating credit for outstanding requests: {0}", Integer.valueOf(min));
            flowCreditIfNeeded(min);
            return true;
        }

        @Override // org.reactivestreams.Subscription
        public void request(long j) {
            ProtonPublisherImpl.LOG.trace("Request called: {0}", Long.valueOf(j));
            if (j <= 0 && !this.cancelled.get()) {
                ProtonPublisherImpl.LOG.warn("non-positive subscription request, requests must be > 0");
                ProtonPublisherImpl.this.connCtx.runOnContext(r6 -> {
                    indicateError(new IllegalArgumentException("non-positive subscription request, requests must be > 0"));
                });
            } else {
                if (this.cancelled.get()) {
                    return;
                }
                ProtonPublisherImpl.this.connCtx.runOnContext(r11 -> {
                    ProtonPublisherImpl.LOG.trace("Processing request: {0}", Long.valueOf(j));
                    if (j == Long.MAX_VALUE) {
                        this.outstandingRequests = Long.MAX_VALUE;
                    } else {
                        try {
                            this.outstandingRequests = Math.addExact(j, this.outstandingRequests);
                        } catch (ArithmeticException e) {
                            this.outstandingRequests = Long.MAX_VALUE;
                        }
                    }
                    if (this.cancelled.get()) {
                        ProtonPublisherImpl.LOG.trace("Not sending more credit, subscription cancelled since request was originally scheduled");
                    } else {
                        flowCreditIfNeeded(j);
                    }
                });
            }
        }

        private void flowCreditIfNeeded(long j) {
            int min;
            if (ProtonPublisherImpl.this.receiver.getCredit() >= ProtonPublisherImpl.this.maxOutstandingCredit || (min = (int) Math.min(j, ProtonPublisherImpl.this.maxOutstandingCredit - r0)) <= 0) {
                return;
            }
            if (this.completed.get()) {
                ProtonPublisherImpl.LOG.trace("Skipping flowing additional credits as already completed: {0}", Integer.valueOf(min));
            } else {
                ProtonPublisherImpl.LOG.trace("Flowing additional credits : {0}", Integer.valueOf(min));
                ProtonPublisherImpl.this.receiver.flow(min);
            }
        }

        @Override // org.reactivestreams.Subscription
        public void cancel() {
            ProtonPublisherImpl.LOG.trace("Cancel called");
            if (this.cancelled.getAndSet(true)) {
                ProtonPublisherImpl.LOG.trace("Cancel no-op, already called.");
            } else {
                ProtonPublisherImpl.LOG.trace("Cancellation scheduled");
                ProtonPublisherImpl.this.connCtx.runOnContext(r4 -> {
                    ProtonPublisherImpl.LOG.trace("Cancelling");
                    ProtonPublisherImpl.this.receiver.closeHandler(asyncResult -> {
                        indicateCompletion();
                        ProtonPublisherImpl.this.receiver.close();
                    });
                    ProtonPublisherImpl.this.receiver.detachHandler(asyncResult2 -> {
                        indicateCompletion();
                        ProtonPublisherImpl.this.receiver.detach();
                    });
                    if (ProtonPublisherImpl.this.durable) {
                        ProtonPublisherImpl.this.receiver.detach();
                    } else {
                        ProtonPublisherImpl.this.receiver.close();
                    }
                });
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void indicateError(Throwable th) {
            if (this.completed.getAndSet(true)) {
                ProtonPublisherImpl.LOG.trace("indicateError no-op, already completed");
                return;
            }
            Subscriber<? super Delivery> subscriber = this.subcriber;
            this.subcriber = null;
            if (subscriber == null || this.cancelled.get()) {
                ProtonPublisherImpl.LOG.trace("Skipping error indication, no sub or already cancelled");
            } else {
                ProtonPublisherImpl.LOG.trace("Indicating error");
                subscriber.onError(th);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void indicateSubscribed() {
            if (this.completed.get()) {
                ProtonPublisherImpl.LOG.trace("indicateSubscribed no-op, already completed");
                return;
            }
            ProtonPublisherImpl.LOG.trace("Indicating subscribed");
            if (this.subcriber != null) {
                this.subcriber.onSubscribe(this);
            }
        }

        private void indicateCompletion() {
            if (this.completed.getAndSet(true)) {
                ProtonPublisherImpl.LOG.trace("indicateCompletion no-op, already completed");
                return;
            }
            Subscriber<? super Delivery> subscriber = this.subcriber;
            this.subcriber = null;
            boolean z = this.cancelled.get();
            if (subscriber == null || ((this.outstandingRequests <= 0 || !z) && z)) {
                ProtonPublisherImpl.LOG.trace("Skipping completion indication");
            } else {
                ProtonPublisherImpl.LOG.trace("Indicating completion");
                subscriber.onComplete();
            }
        }
    }

    public ProtonPublisherImpl(String str, ProtonConnectionImpl protonConnectionImpl, ProtonPublisherOptions protonPublisherOptions) {
        this.maxOutstandingCredit = 1000;
        this.connCtx = protonConnectionImpl.getContext();
        this.conn = protonConnectionImpl;
        ProtonLinkOptions protonLinkOptions = new ProtonLinkOptions();
        if (protonPublisherOptions.getLinkName() != null) {
            protonLinkOptions.setLinkName(protonPublisherOptions.getLinkName());
        }
        this.receiver = protonConnectionImpl.createReceiver(str, protonLinkOptions);
        this.receiver.setAutoAccept(false);
        this.receiver.setPrefetch(0);
        if (protonPublisherOptions.getMaxOutstandingCredit() > 0) {
            this.maxOutstandingCredit = protonPublisherOptions.getMaxOutstandingCredit();
        }
        Source source = (Source) this.receiver.getSource();
        this.durable = protonPublisherOptions.isDurable();
        if (this.durable) {
            source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
            source.setDurable(TerminusDurability.UNSETTLED_STATE);
        }
        if (protonPublisherOptions.isDynamic()) {
            source.setAddress(null);
            source.setDynamic(true);
        }
        ArrayList arrayList = new ArrayList();
        if (protonPublisherOptions.isShared()) {
            arrayList.add(SHARED);
        }
        if (protonPublisherOptions.isGlobal()) {
            arrayList.add(GLOBAL);
        }
        if (arrayList.isEmpty()) {
            return;
        }
        source.setCapabilities((Symbol[]) arrayList.toArray(new Symbol[arrayList.size()]));
    }

    @Override // org.reactivestreams.Publisher
    public void subscribe(Subscriber<? super Delivery> subscriber) {
        LOG.trace("Subscribe called");
        Objects.requireNonNull(subscriber, "A subscriber must be supplied");
        if (this.subscribed.getAndSet(true)) {
            throw new IllegalStateException("Only a single susbcriber supported, and subscribe already called.");
        }
        this.subscription = new AmqpSubscription(subscriber);
        this.connCtx.runOnContext(r4 -> {
            this.conn.addEndHandler(r7 -> {
                if (this.emitOnConnectionEnd) {
                    this.subscription.indicateError(new Exception("Connection closed: " + this.conn.getContainer()));
                }
            });
            this.receiver.closeHandler(asyncResult -> {
                this.subscription.indicateError(new Exception("Link closed unexpectedly"));
                this.receiver.close();
            });
            this.receiver.detachHandler(asyncResult2 -> {
                this.subscription.indicateError(new Exception("Link detached unexpectedly"));
                this.receiver.detach();
            });
            this.receiver.openHandler(asyncResult3 -> {
                this.subscription.indicateSubscribed();
            });
            this.receiver.handler((protonDelivery, message) -> {
                if (this.subscription.onNextWrapper(new DeliveryImpl(message, protonDelivery, this.connCtx))) {
                    return;
                }
                protonDelivery.disposition(Released.getInstance(), true);
            });
            this.receiver.open();
        });
    }

    public boolean isEmitOnConnectionEnd() {
        return this.emitOnConnectionEnd;
    }

    public void setEmitOnConnectionEnd(boolean z) {
        this.emitOnConnectionEnd = z;
    }

    public ProtonReceiver getLink() {
        return this.receiver;
    }

    @Override // io.vertx.proton.streams.ProtonPublisher
    public ProtonPublisher<Delivery> setSource(org.apache.qpid.proton.amqp.transport.Source source) {
        this.receiver.setSource(source);
        return this;
    }

    @Override // io.vertx.proton.streams.ProtonPublisher
    public org.apache.qpid.proton.amqp.transport.Source getSource() {
        return this.receiver.getSource();
    }

    @Override // io.vertx.proton.streams.ProtonPublisher
    public ProtonPublisher<Delivery> setTarget(Target target) {
        this.receiver.setTarget(target);
        return this;
    }

    @Override // io.vertx.proton.streams.ProtonPublisher
    public Target getTarget() {
        return this.receiver.getTarget();
    }

    @Override // io.vertx.proton.streams.ProtonPublisher
    public org.apache.qpid.proton.amqp.transport.Source getRemoteSource() {
        return this.receiver.getRemoteSource();
    }

    @Override // io.vertx.proton.streams.ProtonPublisher
    public Target getRemoteTarget() {
        return this.receiver.getRemoteTarget();
    }

    @Override // io.vertx.proton.streams.ProtonPublisher
    public String getRemoteAddress() {
        org.apache.qpid.proton.amqp.transport.Source remoteSource = getRemoteSource();
        if (remoteSource == null) {
            return null;
        }
        return remoteSource.getAddress();
    }
}
