/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder;

import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import org.aopalliance.aop.Advice;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.aop.Advisor;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.aop.support.NameMatchMethodPointcutAdvisor;
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
import org.springframework.cloud.stream.binder.PollableMessageSource;
import org.springframework.cloud.stream.binder.RequeueCurrentMessageException;
import org.springframework.context.Lifecycle;
import org.springframework.core.AttributeAccessor;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.retry.RetryException;
import org.springframework.core.retry.RetryTemplate;
import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.acks.AckUtils;
import org.springframework.integration.acks.AcknowledgmentCallback;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.core.RecoveryCallback;
import org.springframework.integration.support.DefaultErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;

public class DefaultPollableMessageSource
implements PollableMessageSource,
Lifecycle {
    private static final Log log = LogFactory.getLog(DefaultPollableMessageSource.class);
    protected static final ThreadLocal<AttributeAccessor> ATTRIBUTES_HOLDER = new ThreadLocal();
    private static final DirectChannel DUMMY_CHANNEL = new DirectChannel();
    private final List<ChannelInterceptor> interceptors = new ArrayList<ChannelInterceptor>();
    private final MessagingTemplate messagingTemplate = new MessagingTemplate();
    private final SmartMessageConverter messageConverter;
    private MessageSource<?> source;
    private RetryTemplate retryTemplate;
    private RecoveryCallback<Object> recoveryCallback;
    private MessageChannel errorChannel;
    private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy();
    private BiConsumer<AttributeAccessor, Message<?>> attributesProvider;
    private boolean running;

    public DefaultPollableMessageSource(@Nullable SmartMessageConverter messageConverter) {
        this.messageConverter = messageConverter;
    }

    public void setSource(MessageSource<?> source) {
        ProxyFactory pf = new ProxyFactory(source);
        class ReceiveAdvice
        implements MethodInterceptor {
            private final List<ChannelInterceptor> interceptors = new ArrayList<ChannelInterceptor>();

            ReceiveAdvice() {
            }

            public Object invoke(MethodInvocation invocation) throws Throwable {
                Object result = invocation.proceed();
                if (result instanceof Message) {
                    Message received = (Message)result;
                    for (ChannelInterceptor interceptor : this.interceptors) {
                        received = interceptor.preSend(received, (MessageChannel)DUMMY_CHANNEL);
                        if (received != null) continue;
                        return null;
                    }
                    return received;
                }
                return result;
            }
        }
        ReceiveAdvice advice = new ReceiveAdvice();
        advice.interceptors.addAll(this.interceptors);
        NameMatchMethodPointcutAdvisor sourceAdvisor = new NameMatchMethodPointcutAdvisor((Advice)advice);
        sourceAdvisor.addMethodName("receive");
        pf.addAdvisor((Advisor)sourceAdvisor);
        this.source = (MessageSource)pf.getProxy();
    }

    public void setRetryTemplate(RetryTemplate retryTemplate) {
        this.retryTemplate = retryTemplate;
    }

    public void setRecoveryCallback(RecoveryCallback<Object> recoveryCallback) {
        this.recoveryCallback = (context, cause) -> {
            if (!this.shouldRequeue((Exception)((MessagingException)cause))) {
                return recoveryCallback.recover(context, cause);
            }
            throw (MessagingException)cause;
        };
    }

    public void setErrorChannel(MessageChannel errorChannel) {
        this.errorChannel = errorChannel;
    }

    public void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) {
        Assert.notNull((Object)errorMessageStrategy, (String)"'errorMessageStrategy' cannot be null");
        this.errorMessageStrategy = errorMessageStrategy;
    }

    public void setAttributesProvider(BiConsumer<AttributeAccessor, Message<?>> attributesProvider) {
        this.attributesProvider = attributesProvider;
    }

    public void addInterceptor(ChannelInterceptor interceptor) {
        this.interceptors.add(interceptor);
    }

    public void addInterceptor(int index, ChannelInterceptor interceptor) {
        this.interceptors.add(index, interceptor);
    }

    public synchronized boolean isRunning() {
        return this.running;
    }

    public synchronized void start() {
        MessageSource<?> messageSource;
        if (!this.running && (messageSource = this.source) instanceof Lifecycle) {
            Lifecycle sourceWithLifecycle = (Lifecycle)messageSource;
            sourceWithLifecycle.start();
        }
        this.running = true;
    }

    public synchronized void stop() {
        MessageSource<?> messageSource;
        if (this.running && (messageSource = this.source) instanceof Lifecycle) {
            Lifecycle sourceWithLifeCycle = (Lifecycle)messageSource;
            sourceWithLifeCycle.stop();
        }
        this.running = false;
    }

    @Override
    public boolean poll(MessageHandler handler) {
        return this.poll(handler, (ParameterizedTypeReference<?>)null);
    }

    @Override
    public boolean poll(MessageHandler handler, ParameterizedTypeReference<?> type) {
        Message<?> message = this.receive(type);
        if (message == null) {
            return false;
        }
        AcknowledgmentCallback ackCallback = StaticMessageHeaderAccessor.getAcknowledgmentCallback(message);
        if (ackCallback == null) {
            ackCallback = status -> log.warn((Object)("No AcknowledgementCallback defined. Status: " + status.name() + " " + String.valueOf(message)));
        }
        try {
            this.setAttributesIfNecessary(message);
            if (this.retryTemplate == null) {
                this.handle(message, handler);
            } else {
                try {
                    this.retryTemplate.execute(() -> {
                        this.handle(message, handler);
                        return null;
                    });
                }
                catch (RetryException ex) {
                    if (this.recoveryCallback != null) {
                        AttributeAccessor attributeAccessor = ATTRIBUTES_HOLDER.get();
                        if (attributeAccessor == null) {
                            attributeAccessor = ErrorMessageUtils.getAttributeAccessor(message, null);
                        }
                        this.recoveryCallback.recover(attributeAccessor, ex.getCause());
                    }
                    ReflectionUtils.rethrowRuntimeException((Throwable)ex.getCause());
                }
            }
            boolean ex = true;
            return ex;
        }
        catch (MessagingException e) {
            if (this.retryTemplate == null && !this.shouldRequeue((Exception)((Object)e))) {
                try {
                    this.messagingTemplate.send((Object)this.errorChannel, (Message)this.errorMessageStrategy.buildErrorMessage((Throwable)e, ATTRIBUTES_HOLDER.get()));
                }
                catch (MessagingException e1) {
                    this.requeueOrNack(message, ackCallback, e1);
                }
                boolean e1 = true;
                return e1;
            }
            this.requeueOrNack(message, ackCallback, e);
            boolean e1 = true;
            return e1;
        }
        catch (Exception e) {
            MessageHandlingException messageHandlingException;
            AckUtils.autoNack((AcknowledgmentCallback)ackCallback);
            if (e instanceof MessageHandlingException && (messageHandlingException = (MessageHandlingException)e).getFailedMessage().equals(message)) {
                throw (MessageHandlingException)e;
            }
            throw new MessageHandlingException(message, (Throwable)e);
        }
        finally {
            ATTRIBUTES_HOLDER.remove();
            AckUtils.autoAck((AcknowledgmentCallback)ackCallback);
        }
    }

    private void requeueOrNack(Message<?> message, AcknowledgmentCallback ackCallback, MessagingException e) {
        if (ackCallback.isAcknowledged() || !this.shouldRequeue((Exception)((Object)e))) {
            AckUtils.autoNack((AcknowledgmentCallback)ackCallback);
            if (e.getFailedMessage().equals(message)) {
                throw e;
            }
            throw new MessageHandlingException(message, (Throwable)e);
        }
        AckUtils.requeue((AcknowledgmentCallback)ackCallback);
    }

    protected boolean shouldRequeue(Exception e) {
        boolean requeue = false;
        for (Throwable t = e.getCause(); t != null && !requeue; t = t.getCause()) {
            requeue = t instanceof RequeueCurrentMessageException;
        }
        return requeue;
    }

    private Message<?> receive(ParameterizedTypeReference<?> type) {
        Message message = this.source.receive();
        if (message != null && type != null && this.messageConverter != null) {
            Object payload = FunctionTypeUtils.isTypeCollection((Type)type.getType()) || FunctionTypeUtils.isTypeMap((Type)type.getType()) ? this.messageConverter.fromMessage(message, FunctionTypeUtils.getRawType((Type)type.getType()), (Object)type.getType()) : this.messageConverter.fromMessage(message, (Class)type.getType());
            if (payload == null) {
                throw new MessageConversionException(message, "No converter could convert Message");
            }
            message = MessageBuilder.withPayload((Object)payload).copyHeaders((Map)message.getHeaders()).build();
        }
        return message;
    }

    private void doHandleMessage(MessageHandler handler, Message<?> message) {
        try {
            handler.handleMessage(message);
        }
        catch (Throwable t) {
            throw new MessageHandlingException(message, t);
        }
    }

    private void setAttributesIfNecessary(Message<?> message) {
        boolean needHolder;
        boolean bl = needHolder = this.errorChannel != null || this.retryTemplate != null;
        if (needHolder) {
            AttributeAccessor attributes = ATTRIBUTES_HOLDER.get();
            if (attributes == null) {
                attributes = ErrorMessageUtils.getAttributeAccessor(null, null);
                ATTRIBUTES_HOLDER.set(attributes);
            }
            attributes.setAttribute("inputMessage", message);
            if (this.attributesProvider != null) {
                this.attributesProvider.accept(attributes, message);
            }
        }
    }

    private void handle(Message<?> message, MessageHandler handler) {
        this.doHandleMessage(handler, message);
    }

    static {
        DUMMY_CHANNEL.setBeanName("dummy.required.by.nonnull.api");
    }
}

