/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka.streams;

import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.streams.processor.api.Record;
import org.springframework.cloud.stream.binder.kafka.streams.DltPublishingContext;
import org.springframework.cloud.stream.binder.kafka.streams.RecordRecoverableProcessor;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

public class DltAwareProcessor<KIn, VIn, KOut, VOut>
extends RecordRecoverableProcessor<KIn, VIn, KOut, VOut> {
    private static final Log LOG = LogFactory.getLog(DltAwareProcessor.class);
    private final String dltDestination;
    private final DltPublishingContext dltPublishingContext;
    private BiConsumer<Record<KIn, VIn>, Exception> processorRecordRecoverer;

    public DltAwareProcessor(Function<Record<KIn, VIn>, Record<KOut, VOut>> delegateFunction, String dltDestination, DltPublishingContext dltPublishingContext) {
        super(delegateFunction);
        Assert.isTrue((boolean)StringUtils.hasText((String)dltDestination), (String)"DLT Destination topic must be provided.");
        this.dltDestination = dltDestination;
        Assert.notNull((Object)dltPublishingContext, (String)"DltSenderContext cannot be null");
        this.dltPublishingContext = dltPublishingContext;
    }

    @Override
    protected BiConsumer<Record<KIn, VIn>, Exception> defaultProcessorRecordRecoverer() {
        return (r, e) -> {
            StreamBridge streamBridge = this.dltPublishingContext.getStreamBridge();
            if (streamBridge != null) {
                Message message = MessageBuilder.withPayload((Object)r.value()).setHeader("kafka_messageKey", r.key()).build();
                LOG.trace((Object)"Recovered from Exception: ", (Throwable)e);
                streamBridge.send(this.dltDestination, (Object)message);
            }
        };
    }
}

