/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import com.typesafe.scalalogging.Logger;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicReference;
import kafka.server.ControllerInformation;
import kafka.server.ControllerNodeProvider;
import kafka.server.KafkaConfig;
import kafka.server.NodeToControllerQueueItem;
import kafka.utils.Logging;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.util.InterBrokerSendThread;
import org.apache.kafka.server.util.RequestAndCompletionHandler;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Some;
import scala.collection.Seq;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

@ScalaSignature(bytes="\u0006\u0005\u0005-e\u0001\u0002\u000e\u001c\u0001\u0001B\u0001b\r\u0001\u0003\u0002\u0003\u0006I\u0001\u000e\u0005\tu\u0001\u0011\t\u0011)A\u0005w!Aa\b\u0001B\u0001B\u0003%q\b\u0003\u0005D\u0001\t\u0005\t\u0015!\u0003E\u0011!9\u0005A!A!\u0002\u0013A\u0005\u0002C(\u0001\u0005\u0003\u0005\u000b\u0011\u0002)\t\u0011u\u0003!\u0011!Q\u0001\nyCQA\u0019\u0001\u0005\u0002\rDq\u0001\u001c\u0001C\u0002\u0013%Q\u000e\u0003\u0004{\u0001\u0001\u0006IA\u001c\u0005\bw\u0002\u0011\r\u0011\"\u0003}\u0011\u001d\ty\u0001\u0001Q\u0001\nuD!\"!\u0005\u0001\u0001\u0004%\taGA\n\u0011)\tY\u0002\u0001a\u0001\n\u0003Y\u0012Q\u0004\u0005\t\u0003S\u0001\u0001\u0015)\u0003\u0002\u0016!9\u00111\u0007\u0001\u0005\u0002\u0005U\u0002bBA\u001f\u0001\u0011%\u0011q\b\u0005\b\u0003\u000b\u0002A\u0011AA$\u0011\u001d\ti\u0005\u0001C\u0001\u0003\u001fBq!a\u0016\u0001\t\u0003\nI\u0006\u0003\u0005\u0002j\u0001!\taGA6\u0011\u001d\ti\b\u0001C\u0005\u0003\u007fBq!!!\u0001\t\u0003\ny\bC\u0004\u0002\u0004\u0002!\t%a \t\u0019\u0005\u0015\u0005\u0001%A\u0001\u0002\u0003%\t!a\"\u0003;9{G-\u001a+p\u0007>tGO]8mY\u0016\u0014(+Z9vKN$H\u000b\u001b:fC\u0012T!\u0001H\u000f\u0002\rM,'O^3s\u0015\u0005q\u0012!B6bM.\f7\u0001A\n\u0004\u0001\u0005j\u0003C\u0001\u0012,\u001b\u0005\u0019#B\u0001\u0013&\u0003\u0011)H/\u001b7\u000b\u0005q1#B\u0001\u0010(\u0015\tA\u0013&\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002U\u0005\u0019qN]4\n\u00051\u001a#!F%oi\u0016\u0014(I]8lKJ\u001cVM\u001c3UQJ,\u0017\r\u001a\t\u0003]Ej\u0011a\f\u0006\u0003au\tQ!\u001e;jYNL!AM\u0018\u0003\u000f1{wmZ5oO\u0006!\u0012N\\5uS\u0006dg*\u001a;x_J\\7\t\\5f]R\u0004\"!\u000e\u001d\u000e\u0003YR!a\u000e\u0014\u0002\u000f\rd\u0017.\u001a8ug&\u0011\u0011H\u000e\u0002\f\u0017\u000647.Y\"mS\u0016tG/A\bnKR\fG-\u0019;b+B$\u0017\r^3s!\t)D(\u0003\u0002>m\t)R*\u00198vC2lU\r^1eCR\fW\u000b\u001d3bi\u0016\u0014\u0018AF2p]R\u0014x\u000e\u001c7fe:{G-\u001a)s_ZLG-\u001a:\u0011\u0005\u0001\u000bU\"A\u000e\n\u0005\t[\"AF\"p]R\u0014x\u000e\u001c7fe:{G-\u001a)s_ZLG-\u001a:\u0002\r\r|gNZ5h!\t\u0001U)\u0003\u0002G7\tY1*\u00194lC\u000e{gNZ5h\u0003\u0011!\u0018.\\3\u0011\u0005%kU\"\u0001&\u000b\u0005AZ%B\u0001''\u0003\u0019\u0019w.\\7p]&\u0011aJ\u0013\u0002\u0005)&lW-\u0001\u0006uQJ,\u0017\r\u001a(b[\u0016\u0004\"!\u0015.\u000f\u0005IC\u0006CA*W\u001b\u0005!&BA+ \u0003\u0019a$o\\8u})\tq+A\u0003tG\u0006d\u0017-\u0003\u0002Z-\u00061\u0001K]3eK\u001aL!a\u0017/\u0003\rM#(/\u001b8h\u0015\tIf+\u0001\bsKR\u0014\u0018\u0010V5nK>,H/T:\u0011\u0005}\u0003W\"\u0001,\n\u0005\u00054&\u0001\u0002'p]\u001e\fa\u0001P5oSRtD\u0003\u00033fM\u001eD\u0017N[6\u0011\u0005\u0001\u0003\u0001\"B\u001a\t\u0001\u0004!\u0004\"\u0002\u001e\t\u0001\u0004Y\u0004\"\u0002 \t\u0001\u0004y\u0004\"B\"\t\u0001\u0004!\u0005\"B$\t\u0001\u0004A\u0005\"B(\t\u0001\u0004\u0001\u0006\"B/\t\u0001\u0004q\u0016\u0001\u0004:fcV,7\u000f^)vKV,W#\u00018\u0011\u0007=,x/D\u0001q\u0015\t\t(/\u0001\u0006d_:\u001cWO\u001d:f]RT!\u0001J:\u000b\u0003Q\fAA[1wC&\u0011a\u000f\u001d\u0002\u0014\u0019&t7.\u001a3CY>\u001c7.\u001b8h\t\u0016\fX/\u001a\t\u0003\u0001bL!!_\u000e\u000339{G-\u001a+p\u0007>tGO]8mY\u0016\u0014\u0018+^3vK&#X-\\\u0001\u000ee\u0016\fX/Z:u#V,W/\u001a\u0011\u0002!\u0005\u001cG/\u001b<f\u0007>tGO]8mY\u0016\u0014X#A?\u0011\u000by\f\u0019!a\u0002\u000e\u0003}T1!!\u0001q\u0003\u0019\tGo\\7jG&\u0019\u0011QA@\u0003\u001f\u0005#x.\\5d%\u00164WM]3oG\u0016\u0004B!!\u0003\u0002\f5\t1*C\u0002\u0002\u000e-\u0013AAT8eK\u0006\t\u0012m\u0019;jm\u0016\u001cuN\u001c;s_2dWM\u001d\u0011\u0002\u000fM$\u0018M\u001d;fIV\u0011\u0011Q\u0003\t\u0004?\u0006]\u0011bAA\r-\n9!i\\8mK\u0006t\u0017aC:uCJ$X\rZ0%KF$B!a\b\u0002&A\u0019q,!\t\n\u0007\u0005\rbK\u0001\u0003V]&$\b\"CA\u0014\u001d\u0005\u0005\t\u0019AA\u000b\u0003\rAH%M\u0001\tgR\f'\u000f^3eA!\u001aq\"!\f\u0011\u0007}\u000by#C\u0002\u00022Y\u0013\u0001B^8mCRLG.Z\u0001\u0018C\u000e$\u0018N^3D_:$(o\u001c7mKJ\fE\r\u001a:fgN$\"!a\u000e\u0011\u000b}\u000bI$a\u0002\n\u0007\u0005mbK\u0001\u0004PaRLwN\\\u0001\u0018kB$\u0017\r^3D_:$(o\u001c7mKJ\fE\r\u001a:fgN$B!a\b\u0002B!9\u00111I\tA\u0002\u0005\u001d\u0011a\u00058fo\u0006\u001bG/\u001b<f\u0007>tGO]8mY\u0016\u0014\u0018aB3ocV,W/\u001a\u000b\u0005\u0003?\tI\u0005\u0003\u0004\u0002LI\u0001\ra^\u0001\be\u0016\fX/Z:u\u0003%\tX/Z;f'&TX-\u0006\u0002\u0002RA\u0019q,a\u0015\n\u0007\u0005UcKA\u0002J]R\f\u0001cZ3oKJ\fG/\u001a*fcV,7\u000f^:\u0015\u0005\u0005m\u0003CBA/\u0003?\n\u0019'D\u0001s\u0013\r\t\tG\u001d\u0002\u000b\u0007>dG.Z2uS>t\u0007c\u0001\u0012\u0002f%\u0019\u0011qM\u0012\u00037I+\u0017/^3ti\u0006sGmQ8na2,G/[8o\u0011\u0006tG\r\\3s\u00039A\u0017M\u001c3mKJ+7\u000f]8og\u0016$B!!\u001c\u0002zQ!\u0011qDA8\u0011\u001d\t\t(\u0006a\u0001\u0003g\n\u0001B]3ta>t7/\u001a\t\u0004k\u0005U\u0014bAA<m\tq1\t\\5f]R\u0014Vm\u001d9p]N,\u0007BBA>+\u0001\u0007q/A\u0005rk\u0016,X-\u0013;f[\u0006\u0011S.Y=cK\u0012K7oY8o]\u0016\u001cG/\u00118e+B$\u0017\r^3D_:$(o\u001c7mKJ$\"!a\b\u0002\r\u0011|wk\u001c:l\u0003\u0015\u0019H/\u0019:u\u0003]\u0001(o\u001c;fGR,G\r\n8fi^|'o[\"mS\u0016tG\u000fF\u00025\u0003\u0013C\u0001\"a\n\u001a\u0003\u0003\u0005\r\u0001\u001a")
public class NodeToControllerRequestThread
extends InterBrokerSendThread
implements Logging {
    private final ManualMetadataUpdater metadataUpdater;
    private final ControllerNodeProvider controllerNodeProvider;
    private final Time time;
    private final long retryTimeoutMs;
    private final LinkedBlockingDeque<NodeToControllerQueueItem> requestQueue;
    private final AtomicReference<Node> activeController;
    private volatile boolean started;
    private Logger logger;
    private String logIdent;
    private volatile boolean bitmap$0;

    @Override
    public String loggerName() {
        return Logging.loggerName$(this);
    }

    @Override
    public String msgWithLogIdent(String msg) {
        return Logging.msgWithLogIdent$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg) {
        Logging.trace$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg, Function0<Throwable> e) {
        Logging.trace$(this, msg, e);
    }

    @Override
    public boolean isDebugEnabled() {
        return Logging.isDebugEnabled$(this);
    }

    @Override
    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    @Override
    public void debug(Function0<String> msg) {
        Logging.debug$(this, msg);
    }

    @Override
    public void debug(Function0<String> msg, Function0<Throwable> e) {
        Logging.debug$(this, msg, e);
    }

    @Override
    public void info(Function0<String> msg) {
        Logging.info$(this, msg);
    }

    @Override
    public void info(Function0<String> msg, Function0<Throwable> e) {
        Logging.info$(this, msg, e);
    }

    @Override
    public void warn(Function0<String> msg) {
        Logging.warn$(this, msg);
    }

    @Override
    public void warn(Function0<String> msg, Function0<Throwable> e) {
        Logging.warn$(this, msg, e);
    }

    @Override
    public void error(Function0<String> msg) {
        Logging.error$(this, msg);
    }

    @Override
    public void error(Function0<String> msg, Function0<Throwable> e) {
        Logging.error$(this, msg, e);
    }

    @Override
    public void fatal(Function0<String> msg) {
        Logging.fatal$(this, msg);
    }

    @Override
    public void fatal(Function0<String> msg, Function0<Throwable> e) {
        Logging.fatal$(this, msg, e);
    }

    private Logger logger$lzycompute() {
        synchronized (this) {
            if (!this.bitmap$0) {
                this.logger = Logging.logger$(this);
                this.bitmap$0 = true;
            }
        }
        return this.logger;
    }

    @Override
    public Logger logger() {
        if (!this.bitmap$0) {
            return this.logger$lzycompute();
        }
        return this.logger;
    }

    @Override
    public String logIdent() {
        return this.logIdent;
    }

    @Override
    public void logIdent_$eq(String x$1) {
        this.logIdent = x$1;
    }

    public /* synthetic */ KafkaClient protected$networkClient(NodeToControllerRequestThread x$1) {
        return x$1.networkClient;
    }

    private LinkedBlockingDeque<NodeToControllerQueueItem> requestQueue() {
        return this.requestQueue;
    }

    private AtomicReference<Node> activeController() {
        return this.activeController;
    }

    public boolean started() {
        return this.started;
    }

    public void started_$eq(boolean x$1) {
        this.started = x$1;
    }

    public Option<Node> activeControllerAddress() {
        return Option$.MODULE$.apply((Object)this.activeController().get());
    }

    private void updateControllerAddress(Node newActiveController) {
        this.activeController().set(newActiveController);
    }

    public void enqueue(NodeToControllerQueueItem request) {
        if (!this.started()) {
            throw new IllegalStateException("Cannot enqueue a request if the request thread is not running");
        }
        this.requestQueue().add(request);
        if (this.activeControllerAddress().isDefined()) {
            this.wakeup();
            return;
        }
    }

    public int queueSize() {
        return this.requestQueue().size();
    }

    public Collection<RequestAndCompletionHandler> generateRequests() {
        long currentTimeMs = this.time.milliseconds();
        Iterator<NodeToControllerQueueItem> requestIter = this.requestQueue().iterator();
        while (requestIter.hasNext()) {
            NodeToControllerQueueItem request = requestIter.next();
            if (currentTimeMs - request.createdTimeMs() >= this.retryTimeoutMs) {
                requestIter.remove();
                request.callback().onTimeout();
                continue;
            }
            Option<Node> controllerAddress = this.activeControllerAddress();
            if (!controllerAddress.isDefined()) continue;
            requestIter.remove();
            return Collections.singletonList(new RequestAndCompletionHandler(this.time.milliseconds(), (Node)controllerAddress.get(), request.request(), response -> this.handleResponse(request, response)));
        }
        return Collections.emptyList();
    }

    public void handleResponse(NodeToControllerQueueItem queueItem, ClientResponse response) {
        this.debug((Function0<String>)(Function0 & Serializable)() -> "Request " + queueItem.request() + " received " + response);
        if (response.authenticationException() != null) {
            this.error((Function0<String>)(Function0 & Serializable)() -> "Request " + queueItem.request() + " failed due to authentication error with controller. Disconnecting the connection to the stale controller " + this.activeControllerAddress().map((Function1 & Serializable)x$1 -> x$1.idString()).getOrElse((Function0 & Serializable)() -> "null"), (Function0<Throwable>)(Function0 & Serializable)() -> response.authenticationException());
            this.maybeDisconnectAndUpdateController();
            queueItem.callback().onComplete(response);
            return;
        }
        if (response.versionMismatch() != null) {
            this.error((Function0<String>)(Function0 & Serializable)() -> "Request " + queueItem.request() + " failed due to unsupported version error", (Function0<Throwable>)(Function0 & Serializable)() -> response.versionMismatch());
            queueItem.callback().onComplete(response);
            return;
        }
        if (response.wasDisconnected()) {
            this.updateControllerAddress(null);
            this.requestQueue().putFirst(queueItem);
            return;
        }
        if (response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
            this.debug((Function0<String>)(Function0 & Serializable)() -> "Request " + queueItem.request() + " received NOT_CONTROLLER exception. Disconnecting the connection to the stale controller " + this.activeControllerAddress().map((Function1 & Serializable)x$2 -> x$2.idString()).getOrElse((Function0 & Serializable)() -> "null"));
            this.maybeDisconnectAndUpdateController();
            this.requestQueue().putFirst(queueItem);
            return;
        }
        queueItem.callback().onComplete(response);
    }

    private void maybeDisconnectAndUpdateController() {
        this.activeControllerAddress().foreach((Function1 & Serializable)controllerAddress -> {
            NodeToControllerRequestThread.$anonfun$maybeDisconnectAndUpdateController$1(this, controllerAddress);
            return BoxedUnit.UNIT;
        });
    }

    public void doWork() {
        ControllerInformation controllerInformation = this.controllerNodeProvider.getControllerInfo();
        if (this.activeControllerAddress().isDefined()) {
            super.pollOnce(Long.MAX_VALUE);
            return;
        }
        this.debug((Function0<String>)(Function0 & Serializable)() -> "Controller isn't cached, looking for local metadata changes");
        Option<Node> option = controllerInformation.node();
        if (option instanceof Some) {
            Node controllerNode = (Node)((Some)option).value();
            this.info((Function0<String>)(Function0 & Serializable)() -> "Recorded new KRaft controller, from now on will use node " + controllerNode);
            this.updateControllerAddress(controllerNode);
            this.metadataUpdater.setNodes(CollectionConverters$.MODULE$.SeqHasAsJava((Seq)new .colon.colon((Object)controllerNode, (List)Nil$.MODULE$)).asJava());
            return;
        }
        if (None$.MODULE$.equals(option)) {
            this.debug((Function0<String>)(Function0 & Serializable)() -> "No controller provided, retrying after backoff");
            super.pollOnce(100L);
            return;
        }
        throw new MatchError(option);
    }

    public void start() {
        super.start();
        this.started_$eq(true);
    }

    public static final /* synthetic */ void $anonfun$maybeDisconnectAndUpdateController$1(NodeToControllerRequestThread $this, Node controllerAddress) {
        try {
            $this.protected$networkClient($this).disconnect(controllerAddress.idString());
        }
        catch (Throwable t) {
            $this.error((Function0<String>)(Function0 & Serializable)() -> "Had an error while disconnecting from NetworkClient.", (Function0<Throwable>)(Function0 & Serializable)() -> t);
        }
        $this.updateControllerAddress(null);
    }

    public NodeToControllerRequestThread(KafkaClient initialNetworkClient, ManualMetadataUpdater metadataUpdater, ControllerNodeProvider controllerNodeProvider, KafkaConfig config, Time time, String threadName, long retryTimeoutMs) {
        this.metadataUpdater = metadataUpdater;
        this.controllerNodeProvider = controllerNodeProvider;
        this.time = time;
        this.retryTimeoutMs = retryTimeoutMs;
        super(threadName, initialNetworkClient, (int)Math.min(Integer.MAX_VALUE, Math.min((long)config.controllerSocketTimeoutMs(), retryTimeoutMs)), time, false);
        this.logIdent_$eq(this.logPrefix);
        this.requestQueue = new LinkedBlockingDeque();
        this.activeController = new AtomicReference<Object>(null);
        this.started = false;
    }
}

