fix:[TD-32471]set error code to terrno if tmq_consumer_poll return NULL
This commit is contained in:
parent
42a5f69dd1
commit
5cdbebbcd8
|
@ -2356,22 +2356,21 @@ static SMqRspObj* buildRsp(SMqPollRspWrapper* pollRspWrapper){
|
|||
}
|
||||
|
||||
static int32_t processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
|
||||
int32_t code = 0;
|
||||
SMqPollRspWrapper* pollRspWrapper = &pRspWrapper->pollRsp;
|
||||
|
||||
if (pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { // for vnode transform
|
||||
int32_t code = askEp(tmq, NULL, false, true);
|
||||
code = askEp(tmq, NULL, false, true);
|
||||
if (code != 0) {
|
||||
tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
} else if (pRspWrapper->code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
|
||||
int32_t code = askEp(tmq, NULL, false, false);
|
||||
code = askEp(tmq, NULL, false, false);
|
||||
if (code != 0) {
|
||||
tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
} else{
|
||||
return pRspWrapper->code;
|
||||
} else if (code == TSDB_CODE_TMQ_NO_TABLE_QUALIFIED){
|
||||
code = 0;
|
||||
}
|
||||
tqInfoC("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s", tmq->consumerId, pollRspWrapper->vgId,
|
||||
tstrerror(pRspWrapper->code));
|
||||
|
@ -2383,6 +2382,7 @@ static int32_t processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
|
|||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||
}
|
||||
taosWUnLockLatch(&tmq->lock);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
|
||||
|
|
|
@ -181,7 +181,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
|
|||
printResult(tmqmessage);
|
||||
taos_free_result(tmqmessage);
|
||||
} else {
|
||||
ASSERT(taos_errno(NULL) == TSDB_CODE_TIMEOUT_ERROR);
|
||||
ASSERT(taos_errno(NULL) == TSDB_CODE_TMQ_POLL_TIMEOUT);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue