From 05bd600ef8847a2ad432beec7aa1f83e13f1e686 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Sat, 25 Feb 2023 09:34:37 +0800 Subject: [PATCH 1/6] fix: table scan crash issue --- source/dnode/vnode/src/tsdb/tsdbRead.c | 50 ++++++++++++++++++++++++- tests/parallel_test/cases.task | 3 +- tests/script/tsim/query/partitionby.sim | 39 +++++++++++++++++++ 3 files changed, 89 insertions(+), 3 deletions(-) create mode 100644 tests/script/tsim/query/partitionby.sim diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 655177a362..da6fad6bcd 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -155,6 +155,7 @@ typedef struct SBlockInfoBuf { int32_t currentIndex; SArray* pData; int32_t numPerBucket; + int32_t numOfTables; } SBlockInfoBuf; struct STsdbReader { @@ -300,6 +301,47 @@ static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) { taosArrayPush(pBuf->pData, &p); } + pBuf->numOfTables = numOfTables; + + return TSDB_CODE_SUCCESS; +} + +static int32_t ensureBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) { + if (numOfTables <= pBuf->numOfTables) { + return TSDB_CODE_SUCCESS; + } + + if (pBuf->numOfTables > 0) { + STableBlockScanInfo *p = (STableBlockScanInfo*)taosArrayPop(pBuf->pData); + taosMemoryFree(p); + pBuf->numOfTables /= pBuf->numPerBucket; + } + + int32_t num = (numOfTables - pBuf->numOfTables) / pBuf->numPerBucket; + int32_t remainder = (numOfTables - pBuf->numOfTables) % pBuf->numPerBucket; + if (pBuf->pData == NULL) { + pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES); + } + + for (int32_t i = 0; i < num; ++i) { + char* p = taosMemoryCalloc(pBuf->numPerBucket, sizeof(STableBlockScanInfo)); + if (p == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + taosArrayPush(pBuf->pData, &p); + } + + if (remainder > 0) { + char* p = taosMemoryCalloc(remainder, sizeof(STableBlockScanInfo)); + if (p == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + taosArrayPush(pBuf->pData, &p); + } + + pBuf->numOfTables = numOfTables; + return TSDB_CODE_SUCCESS; } @@ -3738,8 +3780,12 @@ int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t n clearBlockScanInfo(*p); } - // todo handle the case where size is less than the value of num - ASSERT(size >= num); + if (size < num) { + int32_t code = ensureBlockScanInfoBuf(&pReader->blockInfoBuf, num); + if (code) { + return code; + } + } taosHashClear(pReader->status.pTableMap); STableUidList* pUidList = &pReader->status.uidList; diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 188ab86944..fb15e57fec 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -179,7 +179,8 @@ ,,y,script,./test.sh -f tsim/query/sys_tbname.sim ,,y,script,./test.sh -f tsim/query/groupby.sim ,,y,script,./test.sh -f tsim/query/forceFill.sim -,,n,script,./test.sh -f tsim/query/join.sim +,,y,script,./test.sh -f tsim/query/join.sim +,,y,script,./test.sh -f tsim/query/partitionby.sim ,,y,script,./test.sh -f tsim/qnode/basic1.sim ,,y,script,./test.sh -f tsim/snode/basic1.sim ,,y,script,./test.sh -f tsim/mnode/basic1.sim diff --git a/tests/script/tsim/query/partitionby.sim b/tests/script/tsim/query/partitionby.sim new file mode 100644 index 0000000000..8babd1aa8d --- /dev/null +++ b/tests/script/tsim/query/partitionby.sim @@ -0,0 +1,39 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sql connect + +$dbPrefix = db +$tbPrefix1 = tba +$tbPrefix2 = tbb +$mtPrefix = stb +$tbNum = 10 +$rowNum = 2 + +print =============== step1 +$i = 0 +$db = $dbPrefix . $i +$mt1 = $mtPrefix . $i +$i = 1 +$mt2 = $mtPrefix . $i + +sql drop database $db -x step1 +step1: +sql create database $db vgroups 3 +sql use $db +sql create table $mt1 (ts timestamp, f1 int) TAGS(tag1 int, tag2 binary(500)) +sql create table tb0 using $mt1 tags(0, 'a'); +sql create table tb1 using $mt1 tags(1, 'b'); +sql create table tb2 using $mt1 tags(1, 'a'); +sql create table tb3 using $mt1 tags(1, 'a'); +sql create table tb4 using $mt1 tags(3, 'b'); +sql create table tb5 using $mt1 tags(3, 'a'); +sql create table tb6 using $mt1 tags(3, 'b'); +sql create table tb7 using $mt1 tags(3, 'b'); + +sql select * from $mt1 partition by tag1,tag2 limit 1; +if $rows != 0 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT From d6fca036e518c9102113f6988b16047a0b2f3e2d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 25 Feb 2023 11:02:42 +0800 Subject: [PATCH 2/6] refactor: do some internal refactor. --- include/common/ttime.h | 8 +-- source/common/src/ttime.c | 10 ++-- source/dnode/mnode/impl/src/mndConsumer.c | 59 ++++++++++++++--------- source/dnode/vnode/src/tq/tq.c | 29 ++++++----- source/dnode/vnode/src/tq/tqMeta.c | 4 +- source/dnode/vnode/src/tq/tqPush.c | 9 ++-- 6 files changed, 68 insertions(+), 51 deletions(-) diff --git a/include/common/ttime.h b/include/common/ttime.h index eaf44c2771..4a7c47d172 100644 --- a/include/common/ttime.h +++ b/include/common/ttime.h @@ -80,15 +80,15 @@ int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char int32_t parseAbsoluteDuration(const char* token, int32_t tokenlen, int64_t* ts, char* unit, int32_t timePrecision); int32_t parseNatualDuration(const char* token, int32_t tokenLen, int64_t* duration, char* unit, int32_t timePrecision); -int32_t taosParseTime(const char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t dayligth); +int32_t taosParseTime(const char* timestr, int64_t* pTime, int32_t len, int32_t timePrec, int8_t dayligth); void deltaToUtcInitOnce(); char getPrecisionUnit(int32_t precision); -int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision); -int64_t convertTimeFromPrecisionToUnit(int64_t time, int32_t fromPrecision, char toUnit); +int64_t convertTimePrecision(int64_t ts, int32_t fromPrecision, int32_t toPrecision); +int64_t convertTimeFromPrecisionToUnit(int64_t ts, int32_t fromPrecision, char toUnit); int32_t convertStringToTimestamp(int16_t type, char* inputData, int64_t timePrec, int64_t* timeVal); -void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t time, int32_t precision); +void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t ts, int32_t precision); #ifdef __cplusplus } diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index 559ffd2aaf..7996498d45 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -68,12 +68,12 @@ static int64_t user_mktime64(const uint32_t year0, const uint32_t mon0, const ui // ==== mktime() kernel code =================// static int64_t m_deltaUtc = 0; -void deltaToUtcInitOnce() { - struct tm tm = {0}; - (void)taosStrpTime("1970-01-01 00:00:00", (const char*)("%Y-%m-%d %H:%M:%S"), &tm); - m_deltaUtc = (int64_t)taosMktime(&tm); - // printf("====delta:%lld\n\n", seconds); +void deltaToUtcInitOnce() { + struct tm tm = {0}; + (void)taosStrpTime("1970-01-01 00:00:00", (const char*)("%Y-%m-%d %H:%M:%S"), &tm); + m_deltaUtc = (int64_t)taosMktime(&tm); + // printf("====delta:%lld\n\n", seconds); } static int64_t parseFraction(char* str, char** end, int32_t timePrec); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 7a1aed903b..e7d75312e4 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -557,6 +557,27 @@ int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj return 0; } +static int32_t validateTopics(const SArray* pTopicList, SMnode* pMnode, const char* pUser) { + int32_t numOfTopics = taosArrayGetSize(pTopicList); + + for (int32_t i = 0; i < numOfTopics; i++) { + char *pOneTopic = taosArrayGetP(pTopicList, i); + SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOneTopic); + if (pTopic == NULL) { // terrno has been set by callee function + return -1; + } + + if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) { + mndReleaseTopic(pMnode, pTopic); + return -1; + } + + mndReleaseTopic(pMnode, pTopic); + } + + return 0; +} + int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; char *msgStr = pMsg->pCont; @@ -570,11 +591,11 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { SMqConsumerObj *pConsumerNew = NULL; int32_t code = -1; - SArray *newSub = subscribe.topicNames; - taosArraySort(newSub, taosArrayCompareString); - taosArrayRemoveDuplicateP(newSub, taosArrayCompareString, taosMemoryFree); + SArray *pTopicList = subscribe.topicNames; + taosArraySort(pTopicList, taosArrayCompareString); + taosArrayRemoveDuplicateP(pTopicList, taosArrayCompareString, taosMemoryFree); - int32_t newTopicNum = taosArrayGetSize(newSub); + int32_t newTopicNum = taosArrayGetSize(pTopicList); // check topic existence STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe"); @@ -582,34 +603,24 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { goto _over; } - for (int32_t i = 0; i < newTopicNum; i++) { - char *topic = taosArrayGetP(newSub, i); - SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); - if (pTopic == NULL) { // terrno has been set by callee function - goto _over; - } - - if (mndCheckTopicPrivilege(pMnode, pMsg->info.conn.user, MND_OPER_SUBSCRIBE, pTopic) != 0) { - mndReleaseTopic(pMnode, pTopic); - goto _over; - } - - mndReleaseTopic(pMnode, pTopic); + code = validateTopics(pTopicList, pMnode, pMsg->info.conn.user); + if (code != TSDB_CODE_SUCCESS) { + goto _over; } pConsumerOld = mndAcquireConsumer(pMnode, consumerId); if (pConsumerOld == NULL) { - mInfo("receive subscribe request from new consumer:%" PRId64, consumerId); + mInfo("receive subscribe request from new consumer:0x%" PRIx64" cgroup:%s", consumerId, subscribe.cgroup); pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256); pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY; taosArrayDestroy(pConsumerNew->rebNewTopics); - pConsumerNew->rebNewTopics = newSub; + pConsumerNew->rebNewTopics = pTopicList; // all subscribe topics should re-balance. subscribe.topicNames = NULL; for (int32_t i = 0; i < newTopicNum; i++) { - char *newTopicCopy = strdup(taosArrayGetP(newSub, i)); + char *newTopicCopy = strdup(taosArrayGetP(pTopicList, i)); taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy); } @@ -621,7 +632,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { int32_t status = atomic_load_32(&pConsumerOld->status); - mInfo("receive subscribe request from existing consumer:%" PRId64 ", current status: %s, subscribe topic num: %d", + mInfo("receive subscribe request from existing consumer:0x%" PRIx64 ", current status: %s, subscribe topic num: %d", consumerId, mndConsumerStatusName(status), newTopicNum); if (status != MQ_CONSUMER_STATUS__READY) { @@ -637,7 +648,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY; for (int32_t i = 0; i < newTopicNum; i++) { - char *newTopicCopy = strdup(taosArrayGetP(newSub, i)); + char *newTopicCopy = strdup(taosArrayGetP(pTopicList, i)); taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy); } @@ -649,7 +660,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { int32_t i = 0, j = 0; while (i < oldTopicNum || j < newTopicNum) { if (i >= oldTopicNum) { - char *newTopicCopy = strdup(taosArrayGetP(newSub, j)); + char *newTopicCopy = strdup(taosArrayGetP(pTopicList, j)); taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy); j++; continue; @@ -660,7 +671,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { continue; } else { char *oldTopic = taosArrayGetP(pConsumerOld->currentTopics, i); - char *newTopic = taosArrayGetP(newSub, j); + char *newTopic = taosArrayGetP(pTopicList, j); int comp = compareLenPrefixedStr(oldTopic, newTopic); if (comp == 0) { i++; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 2b311babb0..f31cf97cf9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -275,7 +275,7 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con char buf2[80] = {0}; tFormatOffset(buf1, 80, &pRsp->reqOffset); tFormatOffset(buf2, 80, &pRsp->rspOffset); - tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s", + tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d), block num:%d, reqOffset:%s, rspOffset:%s", TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2); return 0; @@ -334,7 +334,7 @@ int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, co char buf2[80] = {0}; tFormatOffset(buf1, 80, &pRsp->reqOffset); tFormatOffset(buf2, 80, &pRsp->rspOffset); - tqDebug("taosx rsp, vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s", + tqDebug("taosx rsp, vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, numOfBlks:%d, req:%s, rsp:%s", TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2); return 0; @@ -495,14 +495,15 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } // update epoch if need - int32_t consumerEpoch = atomic_load_32(&pHandle->epoch); - while (consumerEpoch < reqEpoch) { - consumerEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, consumerEpoch, reqEpoch); + int32_t savedEpoch = atomic_load_32(&pHandle->epoch); + while (savedEpoch < reqEpoch) { + tqDebug("tmq poll: consumer:0x%"PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch); + savedEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, savedEpoch, reqEpoch); } char buf[80]; tFormatOffset(buf, 80, &reqOffset); - tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId, + tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s", consumerId, req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf); // 2.reset offset if needed @@ -538,7 +539,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { tqInitDataRsp(&dataRsp, &req, pHandle->execHandle.subType); tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal)); - tqDebug("tmq poll: consumer:0x %" PRIx64 ", subkey %s, vg %d, offset reset to %" PRId64, consumerId, + tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, offset reset to %" PRId64, consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version); if (tqSendDataRsp(pTq, pMsg, &req, &dataRsp) < 0) { code = -1; @@ -573,6 +574,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew); #if 1 + // till now, all data has been rsp to consumer, new data needs to push client once arrived. if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && dataRsp.reqOffset.version == dataRsp.rspOffset.version) { STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry)); @@ -585,8 +587,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { pPushEntry->dataRsp.head.epoch = reqEpoch; pPushEntry->dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP; taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey) + 1, &pPushEntry, sizeof(void*)); - tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vg %d save handle to push mgr", consumerId, pHandle->subKey, - TD_VID(pTq->pVnode)); + + tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s offset:%" PRId64 ", vgId:%d save handle to push mgr", + consumerId, pHandle->subKey, dataRsp.reqOffset.version, TD_VID(pTq->pVnode)); // unlock taosWUnLockLatch(&pTq->pushLock); return 0; @@ -599,7 +602,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { code = -1; } - tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vg %d, send data blockNum:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "", + tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp data block:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "", consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid, dataRsp.rspOffset.ts); @@ -658,11 +661,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { walSetReaderCapacity(pHandle->pWalReader, 2048); while (1) { - consumerEpoch = atomic_load_32(&pHandle->epoch); - if (consumerEpoch > reqEpoch) { + savedEpoch = atomic_load_32(&pHandle->epoch); + if (savedEpoch > reqEpoch) { tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, vg %d offset %" PRId64 ", found new consumer epoch %d, discard req epoch %d", - consumerId, req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch); + consumerId, req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, savedEpoch, reqEpoch); break; } diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 34f57bc697..095251ab73 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -195,8 +195,8 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) { int32_t vlen; tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code); - tqDebug("tq save %s(%d) consumer %" PRId64 " vgId:%d", pHandle->subKey, (int32_t)strlen(pHandle->subKey), - pHandle->consumerId, TD_VID(pTq->pVnode)); + tqDebug("tq save %s(%d) handle consumer:0x%" PRIx64 "epoch:%d vgId:%d", pHandle->subKey, + (int32_t)strlen(pHandle->subKey), pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode)); void* buf = taosMemoryCalloc(1, vlen); if (buf == NULL) { diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index b9df3e5826..7a356238a0 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -199,7 +199,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ memset(&pHandle->pushHandle.rpcInfo, 0, sizeof(SRpcHandleInfo)); taosWUnLockLatch(&pHandle->pushHandle.lock); - tqDebug("vgId:%d, offset %" PRId64 " from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, reqOffset:%" PRId64 ", rspOffset:%" PRId64, + tqDebug("vgId:%d offset %" PRId64 " from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, reqOffset:%" PRId64 ", rspOffset:%" PRId64, TD_VID(pTq->pVnode), fetchOffset, pHandle->pushHandle.consumerId, pHandle->pushHandle.epoch, rsp.blockNum, rsp.reqOffset, rsp.rspOffset); @@ -213,13 +213,14 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ #endif int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { - tqDebug("vgId:%d, tq push msg ver %" PRId64 ", type: %s", pTq->pVnode->config.vgId, ver, TMSG_INFO(msgType)); + tqDebug("vgId:%d tq push msg version:%" PRId64 " type: %s", pTq->pVnode->config.vgId, ver, TMSG_INFO(msgType)); if (msgType == TDMT_VND_SUBMIT) { // lock push mgr to avoid potential msg lost taosWLockLatch(&pTq->pushLock); - tqDebug("vgId:%d, push handle num %d", pTq->pVnode->config.vgId, taosHashGetSize(pTq->pPushMgr)); if (taosHashGetSize(pTq->pPushMgr) != 0) { + + tqDebug("vgId:%d, push handle num %d", pTq->pVnode->config.vgId, taosHashGetSize(pTq->pPushMgr)); SArray* cachedKeys = taosArrayInit(0, sizeof(void*)); SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t)); void* data = taosMemoryMalloc(msgLen); @@ -245,11 +246,13 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) tqDebug("vgId:%d, cannot find handle %s", pTq->pVnode->config.vgId, pPushEntry->subKey); continue; } + if (pPushEntry->dataRsp.reqOffset.version >= ver) { tqDebug("vgId:%d, push entry req version %" PRId64 ", while push version %" PRId64 ", skip", pTq->pVnode->config.vgId, pPushEntry->dataRsp.reqOffset.version, ver); continue; } + STqExecHandle* pExec = &pHandle->execHandle; qTaskInfo_t task = pExec->task; From 10e35f1ed0377eaa2ce56bd91ecb42badea15b74 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Sat, 25 Feb 2023 14:15:24 +0800 Subject: [PATCH 3/6] fix: free pointer issue --- source/dnode/vnode/src/tsdb/tsdbRead.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index da6fad6bcd..b3d31f2217 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -312,8 +312,8 @@ static int32_t ensureBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) } if (pBuf->numOfTables > 0) { - STableBlockScanInfo *p = (STableBlockScanInfo*)taosArrayPop(pBuf->pData); - taosMemoryFree(p); + STableBlockScanInfo **p = (STableBlockScanInfo**)taosArrayPop(pBuf->pData); + taosMemoryFree(*p); pBuf->numOfTables /= pBuf->numPerBucket; } From 50ee40308beb42138499581ad7a5e5764289573c Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Sat, 25 Feb 2023 16:10:48 +0800 Subject: [PATCH 4/6] fix: realloc uid list --- source/dnode/vnode/src/tsdb/tsdbRead.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index b3d31f2217..da6cef7628 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3785,6 +3785,7 @@ int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t n if (code) { return code; } + pReader->status.uidList.tableUidList = (uint64_t*)taosMemoryRealloc(pReader->status.uidList.tableUidList, sizeof(uint64_t) * num); } taosHashClear(pReader->status.pTableMap); From 04d2bc7d4ba8661312ce146a4c3e4ea9db8b3603 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Sat, 25 Feb 2023 17:35:27 +0800 Subject: [PATCH 5/6] fix: add case timeout time --- Jenkinsfile2 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Jenkinsfile2 b/Jenkinsfile2 index 5a0e7972c6..20b586b25a 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -423,7 +423,7 @@ pipeline { echo "${WKDIR}/restore.sh -p ${BRANCH_NAME} -n ${BUILD_ID} -c {container name}" } catchError(buildResult: 'FAILURE', stageResult: 'FAILURE') { - timeout(time: 120, unit: 'MINUTES'){ + timeout(time: 130, unit: 'MINUTES'){ pre_test() script { sh ''' From bf5ed1355bb2b9434c8f64ccc20f6108b0b79893 Mon Sep 17 00:00:00 2001 From: dapan1121 <72057773+dapan1121@users.noreply.github.com> Date: Sat, 25 Feb 2023 19:10:47 +0800 Subject: [PATCH 6/6] Update cases.task --- tests/parallel_test/cases.task | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index fb15e57fec..7a98bec96b 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -179,7 +179,6 @@ ,,y,script,./test.sh -f tsim/query/sys_tbname.sim ,,y,script,./test.sh -f tsim/query/groupby.sim ,,y,script,./test.sh -f tsim/query/forceFill.sim -,,y,script,./test.sh -f tsim/query/join.sim ,,y,script,./test.sh -f tsim/query/partitionby.sim ,,y,script,./test.sh -f tsim/qnode/basic1.sim ,,y,script,./test.sh -f tsim/snode/basic1.sim