From 6fafa8443e37332ce61a64c21e75b8eeef856a14 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 5 Mar 2023 15:32:00 +0800 Subject: [PATCH] fix(query): sleep a little bit when errors occur. --- source/client/src/clientTmq.c | 5 +++-- source/dnode/mnode/impl/src/mndConsumer.c | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index d08cabd27e..f6a2c5fdc1 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1228,8 +1228,9 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { // in case of consumer mismatch, wait for 500ms and retry if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) { -// taosMsleep(500); + taosMsleep(500); atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER); + tscDebug("consumer:0x%" PRIx64" wait for the re-balance, wait for 500ms and set status to be RECOVER", tmq->consumerId); } else if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0); if (pRspWrapper == NULL) { @@ -1918,7 +1919,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { // in no topic status, delayed task also need to be processed if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) { tscDebug("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId); - // sleep for a while + taosMsleep(500); // sleep for a while return NULL; } diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index e52b046053..24974a1973 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -637,7 +637,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { subscribe.cgroup, (int32_t) taosArrayGetSize(pTopicList)); pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); - tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256); + tstrncpy(pConsumerNew->clientId, subscribe.clientId, tListLen(pConsumerNew->clientId)); // set the update type pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;