/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.core;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiPredicate;
import java.util.function.Supplier;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.Serializer;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextStoppedEvent;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.KafkaResourceFactory;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.ProducerPostProcessor;
import org.springframework.kafka.support.TransactionSupport;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

public class DefaultKafkaProducerFactory<K, V>
extends KafkaResourceFactory
implements ProducerFactory<K, V>,
ApplicationContextAware,
BeanNameAware,
ApplicationListener<ContextStoppedEvent>,
DisposableBean {
    private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(DefaultKafkaProducerFactory.class));
    private final Map<String, Object> configs;
    private final AtomicInteger transactionIdSuffix = new AtomicInteger();
    private final Map<String, BlockingQueue<CloseSafeProducer<K, V>>> cache = new ConcurrentHashMap<String, BlockingQueue<CloseSafeProducer<K, V>>>();
    private final Map<String, CloseSafeProducer<K, V>> consumerProducers = new HashMap<String, CloseSafeProducer<K, V>>();
    private final ThreadLocal<CloseSafeProducer<K, V>> threadBoundProducers = new ThreadLocal();
    private final AtomicInteger epoch = new AtomicInteger();
    private final AtomicInteger clientIdCounter = new AtomicInteger();
    private final List<ProducerFactory.Listener<K, V>> listeners = new ArrayList<ProducerFactory.Listener<K, V>>();
    private final List<ProducerPostProcessor<K, V>> postProcessors = new ArrayList<ProducerPostProcessor<K, V>>();
    private Supplier<Serializer<K>> keySerializerSupplier;
    private Supplier<Serializer<V>> valueSerializerSupplier;
    private Duration physicalCloseTimeout = DEFAULT_PHYSICAL_CLOSE_TIMEOUT;
    private ApplicationContext applicationContext;
    private String beanName = "not.managed.by.Spring";
    private boolean producerPerConsumerPartition = true;
    private boolean producerPerThread;
    private long maxAge;
    private volatile String transactionIdPrefix;
    private volatile String clientIdPrefix;
    private volatile CloseSafeProducer<K, V> producer;

    public DefaultKafkaProducerFactory(Map<String, Object> configs) {
        this(configs, () -> null, () -> null);
    }

    public DefaultKafkaProducerFactory(Map<String, Object> configs, @Nullable Serializer<K> keySerializer, @Nullable Serializer<V> valueSerializer) {
        this(configs, () -> keySerializer, () -> valueSerializer);
    }

    public DefaultKafkaProducerFactory(Map<String, Object> configs, @Nullable Supplier<Serializer<K>> keySerializerSupplier, @Nullable Supplier<Serializer<V>> valueSerializerSupplier) {
        String txId;
        this.configs = new HashMap<String, Object>(configs);
        this.keySerializerSupplier = keySerializerSupplier == null ? () -> null : keySerializerSupplier;
        Supplier<Object> supplier = this.valueSerializerSupplier = valueSerializerSupplier == null ? () -> null : valueSerializerSupplier;
        if (this.clientIdPrefix == null && configs.get("client.id") instanceof String) {
            this.clientIdPrefix = (String)configs.get("client.id");
        }
        if (StringUtils.hasText((String)(txId = (String)this.configs.get("transactional.id")))) {
            this.setTransactionIdPrefix(txId);
            this.configs.remove("transactional.id");
        }
        this.configs.put("internal.auto.downgrade.txn.commit", true);
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    public void setBeanName(String name) {
        this.beanName = name;
    }

    public void setKeySerializer(@Nullable Serializer<K> keySerializer) {
        this.keySerializerSupplier = () -> keySerializer;
    }

    public void setValueSerializer(@Nullable Serializer<V> valueSerializer) {
        this.valueSerializerSupplier = () -> valueSerializer;
    }

    public void setPhysicalCloseTimeout(int physicalCloseTimeout) {
        this.physicalCloseTimeout = Duration.ofSeconds(physicalCloseTimeout);
    }

    @Override
    public Duration getPhysicalCloseTimeout() {
        return this.physicalCloseTimeout;
    }

    public final void setTransactionIdPrefix(String transactionIdPrefix) {
        Assert.notNull((Object)transactionIdPrefix, (String)"'transactionIdPrefix' cannot be null");
        this.transactionIdPrefix = transactionIdPrefix;
        this.enableIdempotentBehaviour();
    }

    @Override
    @Nullable
    public String getTransactionIdPrefix() {
        return this.transactionIdPrefix;
    }

    public void setProducerPerThread(boolean producerPerThread) {
        this.producerPerThread = producerPerThread;
    }

    @Override
    public boolean isProducerPerThread() {
        return this.producerPerThread;
    }

    public void setProducerPerConsumerPartition(boolean producerPerConsumerPartition) {
        this.producerPerConsumerPartition = producerPerConsumerPartition;
    }

    @Override
    public boolean isProducerPerConsumerPartition() {
        return this.producerPerConsumerPartition;
    }

    @Override
    public Supplier<Serializer<K>> getKeySerializerSupplier() {
        return this.keySerializerSupplier;
    }

    @Override
    public Supplier<Serializer<V>> getValueSerializerSupplier() {
        return this.valueSerializerSupplier;
    }

    @Override
    public Map<String, Object> getConfigurationProperties() {
        HashMap<String, Object> configs2 = new HashMap<String, Object>(this.configs);
        this.checkBootstrap(configs2);
        return Collections.unmodifiableMap(configs2);
    }

    @Override
    public List<ProducerFactory.Listener<K, V>> getListeners() {
        return Collections.unmodifiableList(this.listeners);
    }

    @Override
    public List<ProducerPostProcessor<K, V>> getPostProcessors() {
        return Collections.unmodifiableList(this.postProcessors);
    }

    public void setMaxAge(Duration maxAge) {
        this.maxAge = maxAge.toMillis();
    }

    @Override
    public ProducerFactory<K, V> copyWithConfigurationOverride(Map<String, Object> overrideProperties) {
        Map<String, Object> producerProperties = new HashMap<String, Object>(this.getConfigurationProperties());
        producerProperties.putAll(overrideProperties);
        producerProperties = this.ensureExistingTransactionIdPrefixInProperties(producerProperties);
        DefaultKafkaProducerFactory<K, V> newFactory = new DefaultKafkaProducerFactory<K, V>(producerProperties, this.getKeySerializerSupplier(), this.getValueSerializerSupplier());
        newFactory.setPhysicalCloseTimeout((int)this.getPhysicalCloseTimeout().getSeconds());
        newFactory.setProducerPerConsumerPartition(this.isProducerPerConsumerPartition());
        newFactory.setProducerPerThread(this.isProducerPerThread());
        for (ProducerPostProcessor<K, V> producerPostProcessor : this.getPostProcessors()) {
            newFactory.addPostProcessor(producerPostProcessor);
        }
        for (ProducerFactory.Listener listener : this.getListeners()) {
            newFactory.addListener(listener);
        }
        return newFactory;
    }

    private Map<String, Object> ensureExistingTransactionIdPrefixInProperties(Map<String, Object> producerProperties) {
        String transactionIdPrefix = this.getTransactionIdPrefix();
        if (StringUtils.hasText((String)transactionIdPrefix) && !producerProperties.containsKey("transactional.id")) {
            HashMap<String, Object> producerPropertiesWithTxnId = new HashMap<String, Object>(producerProperties);
            producerPropertiesWithTxnId.put("transactional.id", transactionIdPrefix);
            return producerPropertiesWithTxnId;
        }
        return producerProperties;
    }

    @Override
    public void addListener(ProducerFactory.Listener<K, V> listener) {
        Assert.notNull(listener, (String)"'listener' cannot be null");
        this.listeners.add(listener);
    }

    @Override
    public void addListener(int index, ProducerFactory.Listener<K, V> listener) {
        Assert.notNull(listener, (String)"'listener' cannot be null");
        if (index >= this.listeners.size()) {
            this.listeners.add(listener);
        } else {
            this.listeners.add(index, listener);
        }
    }

    @Override
    public boolean removeListener(ProducerFactory.Listener<K, V> listener) {
        return this.listeners.remove(listener);
    }

    @Override
    public void addPostProcessor(ProducerPostProcessor<K, V> postProcessor) {
        Assert.notNull(postProcessor, (String)"'postProcessor' cannot be null");
        this.postProcessors.add(postProcessor);
    }

    @Override
    public boolean removePostProcessor(ProducerPostProcessor<K, V> postProcessor) {
        return this.postProcessors.remove(postProcessor);
    }

    @Override
    public void updateConfigs(Map<String, Object> updates) {
        updates.entrySet().forEach(entry -> {
            if (((String)entry.getKey()).equals("transactional.id")) {
                Assert.isTrue((boolean)(entry.getValue() instanceof String), () -> "'transactional.id' must be a String, not a " + entry.getClass().getName());
                Assert.isTrue((boolean)(this.transactionIdPrefix != null ? entry.getValue() != null : entry.getValue() == null), (String)"Cannot change transactional capability");
                this.transactionIdPrefix = (String)entry.getValue();
            } else if (((String)entry.getKey()).equals("client.id")) {
                Assert.isTrue((boolean)(entry.getValue() instanceof String), () -> "'client.id' must be a String, not a " + entry.getClass().getName());
                this.clientIdPrefix = (String)entry.getValue();
            } else {
                this.configs.put((String)entry.getKey(), entry.getValue());
            }
        });
    }

    @Override
    public void removeConfig(String configKey) {
        this.configs.remove(configKey);
    }

    private void enableIdempotentBehaviour() {
        Object previousValue = this.configs.putIfAbsent("enable.idempotence", true);
        if (Boolean.FALSE.equals(previousValue)) {
            LOGGER.debug(() -> "The 'enable.idempotence' is set to false, may result in duplicate messages");
        }
    }

    @Override
    public boolean transactionCapable() {
        return this.transactionIdPrefix != null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void destroy() {
        CloseSafeProducer<K, V> producerToClose;
        Object object = this;
        synchronized (object) {
            producerToClose = this.producer;
            this.producer = null;
        }
        if (producerToClose != null) {
            producerToClose.closeDelegate(this.physicalCloseTimeout, this.listeners);
        }
        this.cache.values().forEach(queue -> {
            CloseSafeProducer next = (CloseSafeProducer)queue.poll();
            while (next != null) {
                try {
                    next.closeDelegate(this.physicalCloseTimeout, this.listeners);
                }
                catch (Exception e) {
                    LOGGER.error((Throwable)e, (CharSequence)"Exception while closing producer");
                }
                next = (CloseSafeProducer)queue.poll();
            }
        });
        object = this.consumerProducers;
        synchronized (object) {
            this.consumerProducers.forEach((k, v) -> v.closeDelegate(this.physicalCloseTimeout, this.listeners));
            this.consumerProducers.clear();
        }
        this.epoch.incrementAndGet();
    }

    public void onApplicationEvent(ContextStoppedEvent event) {
        if (event.getApplicationContext().equals(this.applicationContext)) {
            this.reset();
        }
    }

    @Override
    public void reset() {
        try {
            this.destroy();
        }
        catch (Exception e) {
            LOGGER.error((Throwable)e, (CharSequence)"Exception while closing producer");
        }
    }

    @Override
    public Producer<K, V> createProducer() {
        return this.createProducer(this.transactionIdPrefix);
    }

    @Override
    public Producer<K, V> createProducer(@Nullable String txIdPrefixArg) {
        String txIdPrefix = txIdPrefixArg == null ? this.transactionIdPrefix : txIdPrefixArg;
        return this.doCreateProducer(txIdPrefix);
    }

    @Override
    public Producer<K, V> createNonTransactionalProducer() {
        return this.doCreateProducer(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {
        if (txIdPrefix != null) {
            if (this.producerPerConsumerPartition) {
                return this.createTransactionalProducerForPartition(txIdPrefix);
            }
            return this.createTransactionalProducer(txIdPrefix);
        }
        if (this.producerPerThread) {
            return this.getOrCreateThreadBoundProducer();
        }
        DefaultKafkaProducerFactory defaultKafkaProducerFactory = this;
        synchronized (defaultKafkaProducerFactory) {
            if (this.producer != null && this.expire(this.producer)) {
                this.producer = null;
            }
            if (this.producer == null) {
                this.producer = new CloseSafeProducer<K, V>(this.createKafkaProducer(), this::removeProducer, this.physicalCloseTimeout, this.beanName, this.epoch.get());
                this.listeners.forEach(listener -> listener.producerAdded(this.producer.clientId, this.producer));
            }
            return this.producer;
        }
    }

    private Producer<K, V> getOrCreateThreadBoundProducer() {
        CloseSafeProducer<K, V> tlProducer = this.threadBoundProducers.get();
        if (tlProducer != null && (this.epoch.get() != tlProducer.epoch || this.expire(tlProducer))) {
            this.closeThreadBoundProducer();
            tlProducer = null;
        }
        if (tlProducer == null) {
            tlProducer = new CloseSafeProducer<K, V>(this.createKafkaProducer(), this::removeProducer, this.physicalCloseTimeout, this.beanName, this.epoch.get());
            for (ProducerFactory.Listener<K, V> listener : this.listeners) {
                listener.producerAdded(tlProducer.clientId, tlProducer);
            }
            this.threadBoundProducers.set(tlProducer);
        }
        return tlProducer;
    }

    protected Producer<K, V> createKafkaProducer() {
        HashMap<String, Object> newConfigs;
        if (this.clientIdPrefix == null) {
            newConfigs = new HashMap<String, Object>(this.configs);
        } else {
            newConfigs = new HashMap<String, Object>(this.configs);
            newConfigs.put("client.id", this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
        }
        this.checkBootstrap(newConfigs);
        return this.createRawProducer(newConfigs);
    }

    protected Producer<K, V> createTransactionalProducerForPartition() {
        return this.createTransactionalProducerForPartition(this.transactionIdPrefix);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Producer<K, V> createTransactionalProducerForPartition(String txIdPrefix) {
        String suffix = TransactionSupport.getTransactionIdSuffix();
        if (suffix == null) {
            return this.createTransactionalProducer(txIdPrefix);
        }
        Map<String, CloseSafeProducer<K, V>> map = this.consumerProducers;
        synchronized (map) {
            CloseSafeProducer<K, V> consumerProducer = this.consumerProducers.get(suffix);
            if (consumerProducer == null || this.expire(consumerProducer)) {
                CloseSafeProducer<K, V> newProducer = this.doCreateTxProducer(txIdPrefix, suffix, this::removeConsumerProducer);
                this.consumerProducers.put(suffix, newProducer);
                return newProducer;
            }
            return consumerProducer;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean removeConsumerProducer(CloseSafeProducer<K, V> producerToRemove, Duration timeout) {
        if (producerToRemove.closed) {
            Map<String, CloseSafeProducer<K, V>> map = this.consumerProducers;
            synchronized (map) {
                Iterator<Map.Entry<String, CloseSafeProducer<K, V>>> iterator = this.consumerProducers.entrySet().iterator();
                while (iterator.hasNext()) {
                    if (!iterator.next().getValue().equals(producerToRemove)) continue;
                    iterator.remove();
                    producerToRemove.closeDelegate(timeout, this.listeners);
                    return true;
                }
            }
            return true;
        }
        return false;
    }

    protected final synchronized boolean removeProducer(CloseSafeProducer<K, V> producerToRemove, Duration timeout) {
        if (producerToRemove.closed) {
            if (producerToRemove.equals(this.producer)) {
                this.producer = null;
                producerToRemove.closeDelegate(timeout, this.listeners);
            }
            this.threadBoundProducers.remove();
            return true;
        }
        return false;
    }

    protected Producer<K, V> createTransactionalProducer() {
        return this.createTransactionalProducer(this.transactionIdPrefix);
    }

    protected Producer<K, V> createTransactionalProducer(String txIdPrefix) {
        BlockingQueue<CloseSafeProducer<K, V>> queue = this.getCache(txIdPrefix);
        Assert.notNull(queue, () -> "No cache found for " + txIdPrefix);
        CloseSafeProducer cachedProducer = (CloseSafeProducer)queue.poll();
        while (cachedProducer != null && this.expire(cachedProducer)) {
            cachedProducer = (CloseSafeProducer)queue.poll();
        }
        if (cachedProducer == null) {
            return this.doCreateTxProducer(txIdPrefix, "" + this.transactionIdSuffix.getAndIncrement(), this::cacheReturner);
        }
        return cachedProducer;
    }

    private boolean expire(CloseSafeProducer<K, V> producer) {
        boolean expired;
        boolean bl = expired = this.maxAge > 0L && System.currentTimeMillis() - producer.created > this.maxAge;
        if (expired) {
            producer.closeDelegate(this.physicalCloseTimeout, this.listeners);
        }
        return expired;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean cacheReturner(CloseSafeProducer<K, V> producerToRemove, Duration timeout) {
        if (producerToRemove.closed) {
            producerToRemove.closeDelegate(timeout, this.listeners);
            return true;
        }
        Map<String, BlockingQueue<CloseSafeProducer<K, V>>> map = this.cache;
        synchronized (map) {
            BlockingQueue<CloseSafeProducer<K, V>> txIdCache = this.getCache(producerToRemove.txIdPrefix);
            if (producerToRemove.epoch != this.epoch.get() || txIdCache != null && !txIdCache.contains(producerToRemove) && !txIdCache.offer(producerToRemove)) {
                producerToRemove.closeDelegate(timeout, this.listeners);
                return true;
            }
        }
        return false;
    }

    private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix, BiPredicate<CloseSafeProducer<K, V>, Duration> remover) {
        HashMap<String, Object> newProducerConfigs = new HashMap<String, Object>(this.configs);
        String txId = prefix + suffix;
        newProducerConfigs.put("transactional.id", txId);
        if (this.clientIdPrefix != null) {
            newProducerConfigs.put("client.id", this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
        }
        this.checkBootstrap(newProducerConfigs);
        Producer<K, V> newProducer = this.createRawProducer(newProducerConfigs);
        try {
            newProducer.initTransactions();
        }
        catch (RuntimeException ex) {
            try {
                newProducer.close(this.physicalCloseTimeout);
            }
            catch (RuntimeException ex2) {
                KafkaException newEx = new KafkaException("initTransactions() failed and then close() failed", ex);
                newEx.addSuppressed(ex2);
                throw newEx;
            }
            throw new KafkaException("initTransactions() failed", ex);
        }
        CloseSafeProducer closeSafeProducer = new CloseSafeProducer(newProducer, remover, prefix, this.physicalCloseTimeout, this.beanName, this.epoch.get());
        this.listeners.forEach(listener -> listener.producerAdded(closeSafeProducer.clientId, closeSafeProducer));
        return closeSafeProducer;
    }

    protected Producer<K, V> createRawProducer(Map<String, Object> rawConfigs) {
        KafkaProducer kafkaProducer = new KafkaProducer(rawConfigs, this.keySerializerSupplier.get(), this.valueSerializerSupplier.get());
        for (ProducerPostProcessor<K, V> pp : this.postProcessors) {
            kafkaProducer = (Producer)pp.apply(kafkaProducer);
        }
        return kafkaProducer;
    }

    @Nullable
    protected BlockingQueue<CloseSafeProducer<K, V>> getCache() {
        return this.getCache(this.transactionIdPrefix);
    }

    @Nullable
    protected BlockingQueue<CloseSafeProducer<K, V>> getCache(String txIdPrefix) {
        if (txIdPrefix == null) {
            return null;
        }
        return this.cache.computeIfAbsent(txIdPrefix, txId -> new LinkedBlockingQueue());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void closeProducerFor(String suffix) {
        if (this.producerPerConsumerPartition) {
            Map<String, CloseSafeProducer<K, V>> map = this.consumerProducers;
            synchronized (map) {
                CloseSafeProducer<K, V> removed = this.consumerProducers.remove(suffix);
                if (removed != null) {
                    removed.closeDelegate(this.physicalCloseTimeout, this.listeners);
                }
            }
        }
    }

    @Override
    public void closeThreadBoundProducer() {
        CloseSafeProducer<K, V> tlProducer = this.threadBoundProducers.get();
        if (tlProducer != null) {
            this.threadBoundProducers.remove();
            tlProducer.closeDelegate(this.physicalCloseTimeout, this.listeners);
        }
    }

    protected static class CloseSafeProducer<K, V>
    implements Producer<K, V> {
        private static final Duration CLOSE_TIMEOUT_AFTER_TX_TIMEOUT = Duration.ofMillis(0L);
        private final Producer<K, V> delegate;
        private final BiPredicate<CloseSafeProducer<K, V>, Duration> removeProducer;
        final String txIdPrefix;
        final long created;
        private final Duration closeTimeout;
        final String clientId;
        final int epoch;
        private volatile Exception producerFailed;
        volatile boolean closed;

        CloseSafeProducer(Producer<K, V> delegate, BiPredicate<CloseSafeProducer<K, V>, Duration> removeConsumerProducer, Duration closeTimeout, String factoryName, int epoch) {
            this(delegate, removeConsumerProducer, null, closeTimeout, factoryName, epoch);
        }

        CloseSafeProducer(Producer<K, V> delegate, BiPredicate<CloseSafeProducer<K, V>, Duration> removeProducer, @Nullable String txIdPrefix, Duration closeTimeout, String factoryName, int epoch) {
            Assert.isTrue((!(delegate instanceof CloseSafeProducer) ? 1 : 0) != 0, (String)"Cannot double-wrap a producer");
            this.delegate = delegate;
            this.removeProducer = removeProducer;
            this.txIdPrefix = txIdPrefix;
            this.closeTimeout = closeTimeout;
            Map metrics = delegate.metrics();
            Iterator metricIterator = metrics.keySet().iterator();
            String id = metricIterator.hasNext() ? (String)((MetricName)metricIterator.next()).tags().get("client-id") : "unknown";
            this.clientId = factoryName + "." + id;
            this.created = System.currentTimeMillis();
            this.epoch = epoch;
            LOGGER.debug(() -> "Created new Producer: " + this);
        }

        Producer<K, V> getDelegate() {
            return this.delegate;
        }

        public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
            LOGGER.trace(() -> this.toString() + " send(" + record + ")");
            return this.delegate.send(record);
        }

        public Future<RecordMetadata> send(ProducerRecord<K, V> record, final Callback callback) {
            LOGGER.trace(() -> this.toString() + " send(" + record + ")");
            return this.delegate.send(record, new Callback(){

                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception instanceof OutOfOrderSequenceException) {
                        producerFailed = exception;
                        this.close(closeTimeout);
                    }
                    callback.onCompletion(metadata, exception);
                }
            });
        }

        public void flush() {
            LOGGER.trace(() -> this.toString() + " flush()");
            this.delegate.flush();
        }

        public List<PartitionInfo> partitionsFor(String topic) {
            return this.delegate.partitionsFor(topic);
        }

        public Map<MetricName, ? extends Metric> metrics() {
            return this.delegate.metrics();
        }

        public void initTransactions() {
            this.delegate.initTransactions();
        }

        public void beginTransaction() throws ProducerFencedException {
            LOGGER.debug(() -> this.toString() + " beginTransaction()");
            try {
                this.delegate.beginTransaction();
            }
            catch (RuntimeException e) {
                LOGGER.error((Throwable)e, () -> "beginTransaction failed: " + this);
                this.producerFailed = e;
                throw e;
            }
        }

        public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
            LOGGER.trace(() -> this.toString() + " sendOffsetsToTransaction(" + offsets + ", " + consumerGroupId + ")");
            this.delegate.sendOffsetsToTransaction(offsets, consumerGroupId);
        }

        public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, ConsumerGroupMetadata groupMetadata) throws ProducerFencedException {
            LOGGER.trace(() -> this.toString() + " sendOffsetsToTransaction(" + offsets + ", " + groupMetadata + ")");
            this.delegate.sendOffsetsToTransaction(offsets, groupMetadata);
        }

        public void commitTransaction() throws ProducerFencedException {
            LOGGER.debug(() -> this.toString() + " commitTransaction()");
            try {
                this.delegate.commitTransaction();
            }
            catch (RuntimeException e) {
                LOGGER.error((Throwable)e, () -> "commitTransaction failed: " + this);
                this.producerFailed = e;
                throw e;
            }
        }

        public void abortTransaction() throws ProducerFencedException {
            LOGGER.debug(() -> this.toString() + " abortTransaction()");
            if (this.producerFailed != null) {
                LOGGER.debug(() -> "abortTransaction ignored - previous txFailed: " + this.producerFailed.getMessage() + ": " + this);
            } else {
                try {
                    this.delegate.abortTransaction();
                }
                catch (RuntimeException e) {
                    LOGGER.error((Throwable)e, () -> "Abort failed: " + this);
                    this.producerFailed = e;
                    throw e;
                }
            }
        }

        public void close() {
            this.close(null);
        }

        public void close(@Nullable Duration timeout) {
            LOGGER.trace(() -> this.toString() + " close(" + (timeout == null ? "null" : timeout) + ")");
            if (!this.closed) {
                if (this.producerFailed != null) {
                    LOGGER.warn(() -> "Error during some operation; producer removed from cache: " + this);
                    this.closed = true;
                    this.removeProducer.test(this, this.producerFailed instanceof TimeoutException ? CLOSE_TIMEOUT_AFTER_TX_TIMEOUT : timeout);
                } else {
                    this.closed = this.removeProducer.test(this, timeout);
                }
            }
        }

        void closeDelegate(Duration timeout, List<ProducerFactory.Listener<K, V>> listeners) {
            this.delegate.close(timeout == null ? this.closeTimeout : timeout);
            listeners.forEach(listener -> listener.producerRemoved(this.clientId, this));
            this.closed = true;
        }

        public String toString() {
            return "CloseSafeProducer [delegate=" + this.delegate + "]";
        }
    }
}

