fix(query): sleep a little bit when errors occur.
This commit is contained in:
parent
6c14139805
commit
6fafa8443e
|
@ -1228,8 +1228,9 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
|
|
||||||
// in case of consumer mismatch, wait for 500ms and retry
|
// in case of consumer mismatch, wait for 500ms and retry
|
||||||
if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
|
if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
|
||||||
// taosMsleep(500);
|
taosMsleep(500);
|
||||||
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
|
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
|
||||||
|
tscDebug("consumer:0x%" PRIx64" wait for the re-balance, wait for 500ms and set status to be RECOVER", tmq->consumerId);
|
||||||
} else if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
|
} else if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
|
||||||
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
|
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
|
||||||
if (pRspWrapper == NULL) {
|
if (pRspWrapper == NULL) {
|
||||||
|
@ -1918,7 +1919,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
||||||
// in no topic status, delayed task also need to be processed
|
// in no topic status, delayed task also need to be processed
|
||||||
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
|
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
|
||||||
tscDebug("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId);
|
tscDebug("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId);
|
||||||
// sleep for a while
|
taosMsleep(500); // sleep for a while
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -637,7 +637,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
subscribe.cgroup, (int32_t) taosArrayGetSize(pTopicList));
|
subscribe.cgroup, (int32_t) taosArrayGetSize(pTopicList));
|
||||||
|
|
||||||
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
|
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
|
||||||
tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256);
|
tstrncpy(pConsumerNew->clientId, subscribe.clientId, tListLen(pConsumerNew->clientId));
|
||||||
|
|
||||||
// set the update type
|
// set the update type
|
||||||
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
|
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
|
||||||
|
|
Loading…
Reference in New Issue