/*
 * Decompiled with CFR 0.152.
 */
package com.bruce.tool.mq.rocket.core;

import com.bruce.tool.common.exception.BaseRuntimeException;
import com.bruce.tool.mq.rocket.config.RocketConfig;
import com.bruce.tool.mq.rocket.constant.RocketCode;
import com.bruce.tool.mq.rocket.constant.RocketPool;
import com.bruce.tool.mq.rocket.domain.RocketResponse;
import com.bruce.tool.mq.rocket.handler.RocketHandler;
import java.util.Objects;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class RocketReceiver {
    @Autowired
    private RocketPool pool;

    public void origin(String code, RocketHandler handler) {
        this.execute(code, handler, true);
    }

    public void execute(String code, RocketHandler handler) {
        this.execute(code, handler, false);
    }

    private void execute(String code, RocketHandler handler, boolean isBase) {
        DefaultMQPushConsumer consumer = this.pool.getReceivers().get(code);
        if (Objects.isNull(consumer)) {
            throw new BaseRuntimeException((Object)RocketCode.MQERROR_RECEIVE_NULL.getCode(), RocketCode.MQERROR_RECEIVE_NULL.getMessage());
        }
        RocketConfig config = this.pool.getConfigs().get(code);
        if (Objects.isNull(config)) {
            throw new BaseRuntimeException((Object)RocketCode.MQERROR_SENDER_NULL.getCode(), RocketCode.MQERROR_SENDER_NULL.getMessage());
        }
        consumer.unsubscribe(config.getTopic());
        consumer.shutdown();
        try {
            consumer.subscribe(config.getTopic(), config.getTags());
            if (isBase) {
                handler.handle(consumer);
                consumer.start();
                return;
            }
            consumer.registerMessageListener((list, context) -> {
                ConsumeConcurrentlyStatus status = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                for (MessageExt message : list) {
                    RocketResponse response = RocketResponse.builder().messageId(message.getMsgId()).topic(message.getTopic()).body(message.getBody()).timestamp(message.getBornTimestamp()).build();
                    status = handler.handle(response);
                }
                return status;
            });
            consumer.start();
        }
        catch (MQClientException e) {
            throw new BaseRuntimeException((Object)RocketCode.MQERROR_RECEIVE_EXCEPTION.getCode(), RocketCode.MQERROR_RECEIVE_EXCEPTION.getMessage());
        }
    }
}

