From c5dfe169a6cce58efacffc311a5090945b310738 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 16 Jun 2023 19:33:03 +0800 Subject: [PATCH 1/6] fix:error --- include/libs/wal/wal.h | 2 +- source/dnode/mnode/impl/src/mndDef.c | 2 ++ source/dnode/vnode/src/tq/tqMeta.c | 2 +- source/libs/wal/src/walRef.c | 16 ++++++++-------- 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 47230bc95c..a4c1e6a648 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -214,7 +214,7 @@ int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead); void walRefFirstVer(SWal *, SWalRef *); void walRefLastVer(SWal *, SWalRef *); -void walRefCommitVer(SWal *, SWalRef *); +//void walRefCommitVer(SWal *, SWalRef *); SWalRef *walOpenRef(SWal *); void walCloseRef(SWal *pWal, int64_t refId); diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 09c4053f93..6b281fca6b 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -630,6 +630,8 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) { } } buf = taosDecodeString(buf, &pSub->qmsg); + }else{ + pSub->qmsg = taosStrdup(""); } return (void *)buf; } diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 3b0e6749c2..900bb94121 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -379,7 +379,7 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){ handle->execHandle.execTb.qmsg = taosStrdup(req->qmsg); } - handle->snapshotVer = walGetLastVer(pTq->pVnode->pWal); + handle->snapshotVer = walGetCommittedVer(pTq->pVnode->pWal); if(buildHandle(pTq, handle) < 0){ return -1; diff --git a/source/libs/wal/src/walRef.c b/source/libs/wal/src/walRef.c index 2f1bcfee83..fd1d51a4f1 100644 --- a/source/libs/wal/src/walRef.c +++ b/source/libs/wal/src/walRef.c @@ -81,11 +81,11 @@ void walRefLastVer(SWal *pWal, SWalRef *pRef) { wDebug("vgId:%d, wal ref version %" PRId64 " for last", pWal->cfg.vgId, ver); } -void walRefCommitVer(SWal *pWal, SWalRef *pRef) { - taosThreadMutexLock(&pWal->mutex); - int64_t ver = walGetCommittedVer(pWal); - pRef->refVer = ver; - - taosThreadMutexUnlock(&pWal->mutex); - wDebug("vgId:%d, wal ref version %" PRId64 " for committed", pWal->cfg.vgId, ver); -} +//void walRefCommitVer(SWal *pWal, SWalRef *pRef) { +// taosThreadMutexLock(&pWal->mutex); +// int64_t ver = walGetCommittedVer(pWal); +// pRef->refVer = ver; +// +// taosThreadMutexUnlock(&pWal->mutex); +// wDebug("vgId:%d, wal ref version %" PRId64 " for committed", pWal->cfg.vgId, ver); +//} From d3dedbe676eccc207cab66e36128a865a662f6ba Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sun, 25 Jun 2023 14:52:45 +0800 Subject: [PATCH 2/6] fix:report current offset in tmq --- source/client/src/clientTmq.c | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index e7927cd0ae..31c15158f9 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -810,7 +810,7 @@ void tmqSendHbReq(void* param, void* tmrId) { OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1); offRows->vgId = pVg->vgId; offRows->rows = pVg->numOfRows; - offRows->offset = pVg->offsetInfo.committedOffset; + offRows->offset = pVg->offsetInfo.currentOffset; char buf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset); tscInfo("report offset: vgId:%d, offset:%s, rows:%"PRId64, offRows->vgId, buf, offRows->rows); @@ -1463,7 +1463,8 @@ CREATE_MSG_FAIL: } typedef struct SVgroupSaveInfo { - STqOffsetVal offset; + STqOffsetVal currentOffset; + STqOffsetVal commitOffset; int64_t numOfRows; } SVgroupSaveInfo; @@ -1488,12 +1489,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic makeTopicVgroupKey(vgKey, pTopic->topicName, pVgEp->vgId); SVgroupSaveInfo* pInfo = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey)); - int64_t numOfRows = 0; STqOffsetVal offsetNew = {.type = tmq->resetOffsetCfg}; - if (pInfo != NULL) { - offsetNew = pInfo->offset; - numOfRows = pInfo->numOfRows; - } SMqClientVg clientVg = { .pollCnt = 0, @@ -1502,11 +1498,11 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic .vgStatus = TMQ_VG_STATUS__IDLE, .vgSkipCnt = 0, .emptyBlockReceiveTs = 0, - .numOfRows = numOfRows, + .numOfRows = pInfo ? pInfo->numOfRows : 0, }; - clientVg.offsetInfo.currentOffset = offsetNew; - clientVg.offsetInfo.committedOffset = offsetNew; + clientVg.offsetInfo.currentOffset = pInfo ? pInfo->currentOffset : offsetNew; + clientVg.offsetInfo.committedOffset = pInfo ? pInfo->commitOffset : offsetNew; clientVg.offsetInfo.walVerBegin = -1; clientVg.offsetInfo.walVerEnd = -1; clientVg.seekUpdated = false; @@ -1565,7 +1561,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) tscInfo("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId, vgKey, buf); - SVgroupSaveInfo info = {.offset = pVgCur->offsetInfo.currentOffset, .numOfRows = pVgCur->numOfRows}; + SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.currentOffset, .commitOffset = pVgCur->offsetInfo.committedOffset, .numOfRows = pVgCur->numOfRows}; taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo)); } } From 610aa0c8ba00b968730d1c4375286ce3552c8d4c Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sun, 25 Jun 2023 15:14:34 +0800 Subject: [PATCH 3/6] fix:modify log --- source/client/src/clientTmq.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 31c15158f9..fad542ce0b 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1926,7 +1926,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset); if (pDataRsp->blockNum == 0) { tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64 - " total:%" PRId64 " reqId:0x%" PRIx64, + ", total:%" PRId64 ", reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); pVg->emptyBlockReceiveTs = taosGetTimestampMs(); @@ -1937,7 +1937,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { tmq->totalRows += numOfRows; pVg->emptyBlockReceiveTs = 0; tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 - " vg total:%" PRId64 " total:%" PRId64 ", reqId:0x%" PRIx64, + ", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId); taosFreeQitem(pollRspWrapper); @@ -2009,7 +2009,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); if (pollRspWrapper->taosxRsp.blockNum == 0) { - tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 " reqId:0x%" PRIx64, + tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 ", reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId, pVg->numOfRows, pollRspWrapper->reqId); pVg->emptyBlockReceiveTs = taosGetTimestampMs(); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); @@ -2033,7 +2033,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { char buf[TSDB_OFFSET_LEN]; tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.currentOffset); tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 - ", vg total:%" PRId64 " total:%" PRId64 " reqId:0x%" PRIx64, + ", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId); From 5b88dfb90b092b85d1074bf0dbe15848e22e5049 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sun, 25 Jun 2023 15:24:52 +0800 Subject: [PATCH 4/6] fix:remove report offset flag --- source/client/src/clientTmq.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index fad542ce0b..8758cec2ec 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -99,7 +99,7 @@ struct tmq_t { // poll info int64_t pollCnt; int64_t totalRows; - bool needReportOffsetRows; +// bool needReportOffsetRows; // timer tmr_h hbLiveTimer; @@ -797,7 +797,7 @@ void tmqSendHbReq(void* param, void* tmrId) { SMqHbReq req = {0}; req.consumerId = tmq->consumerId; req.epoch = tmq->epoch; - if(tmq->needReportOffsetRows){ +// if(tmq->needReportOffsetRows){ req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows)); for(int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++){ SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); @@ -816,8 +816,8 @@ void tmqSendHbReq(void* param, void* tmrId) { tscInfo("report offset: vgId:%d, offset:%s, rows:%"PRId64, offRows->vgId, buf, offRows->rows); } } - tmq->needReportOffsetRows = false; - } +// tmq->needReportOffsetRows = false; +// } int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req); if (tlen < 0) { @@ -1094,7 +1094,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->status = TMQ_CONSUMER_STATUS__INIT; pTmq->pollCnt = 0; pTmq->epoch = 0; - pTmq->needReportOffsetRows = true; +// pTmq->needReportOffsetRows = true; // set conf strcpy(pTmq->clientId, conf->clientId); @@ -2452,7 +2452,7 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) { // if no more waiting rsp pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam); taosMemoryFree(pParamSet); - tmq->needReportOffsetRows = true; +// tmq->needReportOffsetRows = true; taosReleaseRef(tmqMgmt.rsetId, refId); return 0; From 1d42603a1bc12c365a0d4ad883eef2ac3c8cd509 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 26 Jun 2023 10:41:46 +0800 Subject: [PATCH 5/6] fix:modify prefix of offset --- source/common/src/tmsg.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index e299b088d3..c7e027426e 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -7168,9 +7168,9 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) { } else if (pVal->type == TMQ_OFFSET__RESET_LATEST) { snprintf(buf, maxLen, "latest"); } else if (pVal->type == TMQ_OFFSET__LOG) { - snprintf(buf, maxLen, "log:%" PRId64, pVal->version); + snprintf(buf, maxLen, "wal:%" PRId64, pVal->version); } else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pVal->type == TMQ_OFFSET__SNAPSHOT_META) { - snprintf(buf, maxLen, "snapshot:%" PRId64 "|%" PRId64, pVal->uid, pVal->ts); + snprintf(buf, maxLen, "tsdb:%" PRId64 "|%" PRId64, pVal->uid, pVal->ts); } else { return TSDB_CODE_INVALID_PARA; } From 330ef571776df74353692ead6c3eb7c826b148f1 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 26 Jun 2023 14:28:36 +0800 Subject: [PATCH 6/6] opti:logic --- source/common/src/tmsg.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index c7e027426e..4e8797b1ec 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -3745,8 +3745,8 @@ int32_t tSerializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) { if (tEncodeI32(&encoder, pIndexRsp->indexSize) < 0) return -1; int32_t num = taosArrayGetSize(pIndexRsp->pIndex); if (tEncodeI32(&encoder, num) < 0) return -1; - for (int32_t i = 0; i < num; ++i) { - STableIndexInfo *pInfo = (STableIndexInfo *)taosArrayGet(pIndexRsp->pIndex, i); + for (int32_t j = 0; j < num; ++j) { + STableIndexInfo *pInfo = (STableIndexInfo *)taosArrayGet(pIndexRsp->pIndex, j); if (tSerializeSTableIndexInfo(&encoder, pInfo) < 0) return -1; } } @@ -3812,7 +3812,7 @@ int32_t tDeserializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) { tableIndexRsp.pIndex = taosArrayInit(num, sizeof(STableIndexInfo)); if (NULL == tableIndexRsp.pIndex) return -1; STableIndexInfo info; - for (int32_t i = 0; i < num; ++i) { + for (int32_t j = 0; j < num; ++j) { if (tDeserializeSTableIndexInfo(&decoder, &info) < 0) return -1; if (NULL == taosArrayPush(tableIndexRsp.pIndex, &info)) { taosMemoryFree(info.expr);