From fad23f8cb0932f6198efce18c9bb86af4504df98 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 19 Mar 2023 14:03:01 +0800 Subject: [PATCH] fix(tmq): adjust log. --- source/client/src/clientTmq.c | 37 ++++++++++++++++++++++------------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 64deb42c76..e95f716313 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1377,6 +1377,11 @@ CREATE_MSG_FAIL: return -1; } +typedef struct SVgroupSaveInfo { + STqOffsetVal offset; + int64_t numOfRows; +} SVgroupSaveInfo; + static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap, tmq_t* tmq) { pTopic->schema = pTopicEp->schema; @@ -1396,11 +1401,13 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j); makeTopicVgroupKey(vgKey, pTopic->topicName, pVgEp->vgId); - STqOffsetVal* pOffset = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey)); + SVgroupSaveInfo* pInfo = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey)); + int64_t numOfRows = 0; STqOffsetVal offsetNew = {.type = tmq->resetOffsetCfg}; - if (pOffset != NULL) { - offsetNew = *pOffset; + if (pInfo != NULL) { + offsetNew = pInfo->offset; + numOfRows = pInfo->numOfRows; } SMqClientVg clientVg = { @@ -1411,7 +1418,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic .vgStatus = TMQ_VG_STATUS__IDLE, .vgSkipCnt = 0, .emptyBlockReceiveTs = 0, - .numOfRows = 0, + .numOfRows = numOfRows, }; taosArrayPush(pTopic->vgs, &clientVg); @@ -1463,7 +1470,9 @@ static bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { tFormatOffset(buf, 80, &pVgCur->currentOffset); tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId, vgKey, buf); - taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal)); + + SVgroupSaveInfo info = {.offset = pVgCur->currentOffset, .numOfRows = pVgCur->numOfRows}; + taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo)); } } } @@ -1835,19 +1844,19 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { char buf[80]; tFormatOffset(buf, 80, &pDataRsp->rspOffset); if (pDataRsp->blockNum == 0) { - tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%"PRId64" reqId:0x%" PRIx64, tmq->consumerId, - pVg->vgId, buf, pVg->numOfRows, pollRspWrapper->reqId); + tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%"PRId64" total:%"PRId64" reqId:0x%" PRIx64, tmq->consumerId, + pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); } else { // build rsp int64_t numOfRows = 0; SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows); - tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 - ", vg total:%" PRId64 " reqId:0x%" PRIx64, - tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, - pollRspWrapper->reqId); - tmq->totalRows += numOfRows; + + tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 + " vg total:%" PRId64 " total:%" PRId64 ", reqId:0x%" PRIx64, + tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, tmq->totalRows, pVg->numOfRows, + pollRspWrapper->reqId); taosFreeQitem(pollRspWrapper); return pRsp; } @@ -1911,9 +1920,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { char buf[80]; tFormatOffset(buf, 80, &pVg->currentOffset); tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 - ", vg total:%" PRId64 " reqId:0x%" PRIx64, + ", vg total:%" PRId64 " total:%"PRId64" reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows, - pollRspWrapper->reqId); + tmq->totalRows, pollRspWrapper->reqId); taosFreeQitem(pollRspWrapper); return pRsp;