From a641a07085b2e228e363a312f31899e7e8e9dc24 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 4 Jul 2023 11:13:03 +0800 Subject: [PATCH 1/4] fix:[TD-25071] pSub is null & return wal not exist if no data --- source/dnode/mnode/impl/src/mndConsumer.c | 3 +++ source/libs/wal/src/walRead.c | 5 +++++ 2 files changed, 8 insertions(+) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 47cc4a1ce7..bdf9931ca2 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -419,6 +419,9 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { mDebug("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName); SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName); + if(pSub == NULL){ + continue; + } taosWLockLatch(&pSub->lock); SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t)); if(pConsumerEp){ diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 1223e3756c..786f48ce88 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -82,6 +82,11 @@ int32_t walNextValidMsg(SWalReader *pReader) { ", applied index:%" PRId64", end index:%" PRId64, pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer); + if (fetchVer > endVer){ + terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; + return -1; + } + while (fetchVer <= endVer) { if (walFetchHeadNew(pReader, fetchVer) < 0) { return -1; From 11e6856d3ee8c71ccf3ac8a0cd21b13aa842aceb Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 6 Jul 2023 13:37:58 +0800 Subject: [PATCH 2/4] fix:init char array --- source/client/src/clientTmq.c | 12 ++++++------ source/dnode/vnode/src/tq/tq.c | 2 +- source/dnode/vnode/src/tq/tqUtil.c | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 83550aa15d..db54c6e196 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1414,7 +1414,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tDecoderClear(&decoder); memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); - char buf[TSDB_OFFSET_LEN]; + char buf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(buf, TSDB_OFFSET_LEN, &pRspWrapper->dataRsp.rspOffset); tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req ver:%" PRId64 ", rsp:%s type %d, reqId:0x%" PRIx64, tmq->consumerId, vgId, pRspWrapper->dataRsp.reqOffset.version, buf, rspType, requestId); @@ -1559,7 +1559,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j); makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId); - char buf[TSDB_OFFSET_LEN]; + char buf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.currentOffset); tscInfo("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId, vgKey, buf); @@ -1788,7 +1788,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p sendInfo->msgType = TDMT_VND_TMQ_CONSUME; int64_t transporterId = 0; - char offsetFormatBuf[TSDB_OFFSET_LEN]; + char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.currentOffset); tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId, @@ -1925,7 +1925,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { pVg->offsetInfo.walVerEnd = pDataRsp->head.walever; pVg->receivedInfoFromVnode = true; - char buf[TSDB_OFFSET_LEN]; + char buf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset); if (pDataRsp->blockNum == 0) { tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64 @@ -2033,7 +2033,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { tmq->totalRows += numOfRows; - char buf[TSDB_OFFSET_LEN]; + char buf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.currentOffset); tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 ", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64, @@ -2653,7 +2653,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO; int64_t transporterId = 0; - char offsetFormatBuf[TSDB_OFFSET_LEN]; + char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.currentOffset); tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a293858207..176fe7383f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -510,7 +510,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { pHandle->epoch = reqEpoch; } - char buf[TSDB_OFFSET_LEN]; + char buf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset); tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64, consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index a301d82c30..720fc48aba 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -99,7 +99,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand if (pOffset != NULL) { *pOffsetVal = pOffset->val; - char formatBuf[TSDB_OFFSET_LEN]; + char formatBuf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(formatBuf, TSDB_OFFSET_LEN, pOffsetVal); tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue. reqId:0x%" PRIx64, From 0928bd551017852790f3e0c4f42f335d59d0c41c Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 6 Jul 2023 19:35:48 +0800 Subject: [PATCH 3/4] fix:send rsp offset to client if unregister push mgr --- source/client/src/clientTmq.c | 2 +- source/dnode/vnode/src/inc/tq.h | 3 +- source/dnode/vnode/src/tq/tq.c | 49 ++++++++++++++++++++---------- source/dnode/vnode/src/tq/tqPush.c | 7 +++-- 4 files changed, 41 insertions(+), 20 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 94afbc8644..c5b10b0a4c 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1928,7 +1928,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { // update the local offset value only for the returned values, only when the local offset is NOT updated // by tmq_offset_seek function if (!pVg->seekUpdated) { - tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", tmq->consumerId); + tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set, rsp offset:%d,%"PRId64, tmq->consumerId, pDataRsp->rspOffset.type, pDataRsp->rspOffset.version); pVg->offsetInfo.currentOffset = pDataRsp->rspOffset; } else { tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", tmq->consumerId); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index b35dc71ed9..c390cdfd38 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -151,7 +151,8 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision); int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type, int32_t vgId); -int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId); +//int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId); +int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId); // tqMeta int32_t tqMetaOpen(STQ* pTq); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 176fe7383f..00296ea64b 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -232,26 +232,43 @@ static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqData return 0; } -int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) { +int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) { + SMqPollReq req = {0}; + if (tDeserializeSMqPollReq(pHandle->msg->pCont, pHandle->msg->contLen, &req) < 0) { + tqError("tDeserializeSMqPollReq %d failed", pHandle->msg->contLen); + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + SMqDataRsp dataRsp = {0}; - dataRsp.head.consumerId = pHandle->consumerId; - dataRsp.head.epoch = pHandle->epoch; - dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP; - - int64_t sver = 0, ever = 0; - walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); - tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver, - ever); - - char buf1[TSDB_OFFSET_LEN] = {0}; - char buf2[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset); - tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset); - tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", vgId, - dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2); + tqInitDataRsp(&dataRsp, &req); + dataRsp.blockNum = 0; + dataRsp.rspOffset = dataRsp.reqOffset; + tqSendDataRsp(pHandle, pHandle->msg, &req, &dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId); + tDeleteMqDataRsp(&dataRsp); return 0; } +//int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) { +// SMqDataRsp dataRsp = {0}; +// dataRsp.head.consumerId = pHandle->consumerId; +// dataRsp.head.epoch = pHandle->epoch; +// dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP; +// +// int64_t sver = 0, ever = 0; +// walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); +// tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver, +// ever); +// +// char buf1[TSDB_OFFSET_LEN] = {0}; +// char buf2[TSDB_OFFSET_LEN] = {0}; +// tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset); +// tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset); +// tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", vgId, +// dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2); +// return 0; +//} + int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type, int32_t vgId) { int64_t sver = 0, ever = 0; diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 4048ebe3f9..06af53d453 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -64,7 +64,9 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) { memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); pHandle->msg->pCont = rpcMallocCont(pMsg->contLen); } else { - tqPushDataRsp(pHandle, vgId); +// tqPushDataRsp(pHandle, vgId); + tqPushEmptyDataRsp(pHandle, vgId); + void* tmp = pHandle->msg->pCont; memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); pHandle->msg->pCont = tmp; @@ -89,7 +91,8 @@ int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) { tqDebug("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId); if(pHandle->msg != NULL) { - tqPushDataRsp(pHandle, vgId); +// tqPushDataRsp(pHandle, vgId); + tqPushEmptyDataRsp(pHandle, vgId); rpcFreeCont(pHandle->msg->pCont); taosMemoryFree(pHandle->msg); From a5363e6e036db3f22dfd4803e5f472661f3b2f6a Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sun, 9 Jul 2023 12:49:50 +0800 Subject: [PATCH 4/4] fix:test case in tmq --- source/dnode/vnode/src/tq/tqUtil.c | 1 + source/libs/parser/src/parInsertSml.c | 4 ++-- tests/system-test/7-tmq/subscribeStb3.py | 4 ++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 720fc48aba..34c5112eee 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -162,6 +162,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, uint64_t consumerId = pRequest->consumerId; int32_t vgId = TD_VID(pTq->pVnode); int code = 0; + terrno = 0; SMqDataRsp dataRsp = {0}; tqInitDataRsp(&dataRsp, pRequest); diff --git a/source/libs/parser/src/parInsertSml.c b/source/libs/parser/src/parInsertSml.c index 0e5ffc57da..78b05b6df5 100644 --- a/source/libs/parser/src/parInsertSml.c +++ b/source/libs/parser/src/parInsertSml.c @@ -127,7 +127,7 @@ static int32_t smlBuildTagRow(SArray* cols, SBoundColInfo* tags, SSchema* pSchem if(kv->keyLen != strlen(pTagSchema->name) || memcmp(kv->key, pTagSchema->name, kv->keyLen) != 0 || kv->type != pTagSchema->type){ code = TSDB_CODE_SML_INVALID_DATA; - uError("SML smlBuildCol error col not same %s", pTagSchema->name); + uError("SML smlBuildTagRow error col not same %s", pTagSchema->name); goto end; } @@ -210,7 +210,7 @@ int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void* data, int32 SSmlKv* kv = (SSmlKv*)data; if(kv->keyLen != strlen(pColSchema->name) || memcmp(kv->key, pColSchema->name, kv->keyLen) != 0 || kv->type != pColSchema->type){ ret = TSDB_CODE_SML_INVALID_DATA; - uError("SML smlBuildCol error col not same %s", pColSchema->name); + uInfo("SML smlBuildCol error col not same %s", pColSchema->name); goto end; } if (kv->type == TSDB_DATA_TYPE_NCHAR) { diff --git a/tests/system-test/7-tmq/subscribeStb3.py b/tests/system-test/7-tmq/subscribeStb3.py index 6f3230e687..ed44ab1fb1 100644 --- a/tests/system-test/7-tmq/subscribeStb3.py +++ b/tests/system-test/7-tmq/subscribeStb3.py @@ -546,7 +546,7 @@ class TDTestCase: keyList = 'group.id:cgrp1,\ enable.auto.commit:false,\ auto.commit.interval.ms:6000,\ - auto.offset.reset:none' + auto.offset.reset:earliest' self.insertConsumerInfo(consumerId, expectrowcnt/2,topicList,keyList,ifcheckdata,ifManualCommit) tdLog.info("again start consume processor") @@ -569,7 +569,7 @@ class TDTestCase: keyList = 'group.id:cgrp1,\ enable.auto.commit:false,\ auto.commit.interval.ms:6000,\ - auto.offset.reset:none' + auto.offset.reset:earliest' self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) tdLog.info("again start consume processor")