From 75f9728ab96f6145e23f5bc3360bee9046bac4a2 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 9 Dec 2022 10:43:12 +0800 Subject: [PATCH 01/11] fix: supportVnodes config is not in effect --- source/dnode/mnode/impl/src/mndVgroup.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index a6e52caf1f..4a6f0d14da 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -507,7 +507,7 @@ static int32_t mndGetAvailableDnode(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup for (int32_t v = 0; v < pVgroup->replica; ++v) { SVnodeGid *pVgid = &pVgroup->vnodeGid[v]; SDnodeObj *pDnode = taosArrayGet(pArray, v); - if (pDnode == NULL || pDnode->numOfVnodes > pDnode->numOfSupportVnodes) { + if (pDnode == NULL || pDnode->numOfVnodes >= pDnode->numOfSupportVnodes) { terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES; return -1; } @@ -891,7 +891,7 @@ static int32_t mndAddVnodeToVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgro } if (used) continue; - if (pDnode == NULL || pDnode->numOfVnodes > pDnode->numOfSupportVnodes) { + if (pDnode == NULL || pDnode->numOfVnodes >= pDnode->numOfSupportVnodes) { terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES; return -1; } From 06b0fefc385c94c0fba6ac578c796ede1e402747 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 9 Dec 2022 11:15:22 +0800 Subject: [PATCH 02/11] fix: check topic privilege only --- source/dnode/mnode/impl/src/mndConsumer.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 373a653b51..8e2b17a963 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -554,10 +554,13 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { goto SUBSCRIBE_OVER; } + // check topic only +#if 0 if (mndCheckDbPrivilegeByName(pMnode, pMsg->info.conn.user, MND_OPER_READ_DB, pTopic->db) != 0) { mndReleaseTopic(pMnode, pTopic); goto SUBSCRIBE_OVER; } +#endif if (mndCheckTopicPrivilege(pMnode, pMsg->info.conn.user, MND_OPER_SUBSCRIBE, pTopic) != 0) { mndReleaseTopic(pMnode, pTopic); From 0b1cef9b51e8a07d7649dcef065ca67d1804300d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 9 Dec 2022 11:29:04 +0800 Subject: [PATCH 03/11] fix udfd crash --- source/libs/transport/src/transCli.c | 1 + source/libs/transport/src/transComm.c | 3 +++ 2 files changed, 4 insertions(+) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 060d1d3a98..bc0486bd4c 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -285,6 +285,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { } destroyCmsg(msg); } + memset(conn->ctx, 0, sizeof(conn->ctx)); } bool cliMaySendCachedMsg(SCliConn* conn) { if (!transQueueEmpty(&conn->cliMsgs)) { diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index ad8d57c97a..09f9a78ab8 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -282,6 +282,9 @@ void transCtxCleanup(STransCtx* ctx) { } void transCtxMerge(STransCtx* dst, STransCtx* src) { + if (src->args == NULL || src->freeFunc == NULL) { + return; + } if (dst->args == NULL) { dst->args = src->args; dst->brokenVal = src->brokenVal; From 59c48d03962168aa58c12e0e68e7c16d67c2c265 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 9 Dec 2022 11:29:42 +0800 Subject: [PATCH 04/11] test: addjust cases --- tests/script/tsim/dnode/balance2.sim | 10 +++++----- tests/script/tsim/vnode/stable_balance_replica1.sim | 8 ++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/script/tsim/dnode/balance2.sim b/tests/script/tsim/dnode/balance2.sim index f677d3925c..3f5a42d4d3 100644 --- a/tests/script/tsim/dnode/balance2.sim +++ b/tests/script/tsim/dnode/balance2.sim @@ -4,11 +4,11 @@ system sh/deploy.sh -n dnode2 -i 2 system sh/deploy.sh -n dnode3 -i 3 system sh/deploy.sh -n dnode4 -i 4 system sh/deploy.sh -n dnode5 -i 5 -system sh/cfg.sh -n dnode1 -c supportVnodes -v 4 -system sh/cfg.sh -n dnode2 -c supportVnodes -v 4 -system sh/cfg.sh -n dnode3 -c supportVnodes -v 4 -system sh/cfg.sh -n dnode4 -c supportVnodes -v 4 -system sh/cfg.sh -n dnode5 -c supportVnodes -v 4 +system sh/cfg.sh -n dnode1 -c supportVnodes -v 5 +system sh/cfg.sh -n dnode2 -c supportVnodes -v 5 +system sh/cfg.sh -n dnode3 -c supportVnodes -v 5 +system sh/cfg.sh -n dnode4 -c supportVnodes -v 5 +system sh/cfg.sh -n dnode5 -c supportVnodes -v 5 print ========== step1 system sh/exec.sh -n dnode1 -s start diff --git a/tests/script/tsim/vnode/stable_balance_replica1.sim b/tests/script/tsim/vnode/stable_balance_replica1.sim index f124979397..84190a3be0 100644 --- a/tests/script/tsim/vnode/stable_balance_replica1.sim +++ b/tests/script/tsim/vnode/stable_balance_replica1.sim @@ -3,10 +3,10 @@ system sh/deploy.sh -n dnode1 -i 1 system sh/deploy.sh -n dnode2 -i 2 system sh/deploy.sh -n dnode3 -i 3 system sh/deploy.sh -n dnode4 -i 4 -system sh/cfg.sh -n dnode1 -c supportVnodes -v 4 -system sh/cfg.sh -n dnode2 -c supportVnodes -v 4 -system sh/cfg.sh -n dnode3 -c supportVnodes -v 4 -system sh/cfg.sh -n dnode4 -c supportVnodes -v 4 +system sh/cfg.sh -n dnode1 -c supportVnodes -v 5 +system sh/cfg.sh -n dnode2 -c supportVnodes -v 5 +system sh/cfg.sh -n dnode3 -c supportVnodes -v 5 +system sh/cfg.sh -n dnode4 -c supportVnodes -v 5 $dbPrefix = br1_db $tbPrefix = br1_tb From 8caca97568711ccff8e2b7eb6677bbd394f5b4ff Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Wed, 7 Dec 2022 18:50:35 +0800 Subject: [PATCH 05/11] feat:add delete mark for sma --- source/common/src/tdatablock.c | 4 +- source/libs/executor/src/timewindowoperator.c | 108 +++++++++++------- tests/script/tsim/stream/sliding.sim | 58 +++++++++- 3 files changed, 124 insertions(+), 46 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index dfd0b68039..4f726ab194 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1946,9 +1946,9 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) int32_t len = 0; len += snprintf(dumpBuf + len, size - len, "===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64 - "|rows:%d|version:%" PRIu64 "\n", + "|rows:%d|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "\n", flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.id.groupId, - pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version); + pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version, pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey); if (len >= size - 1) return dumpBuf; for (int32_t j = 0; j < rows; j++) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 6f1ab8ca65..2af551b832 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -23,6 +23,7 @@ #include "ttime.h" #define IS_FINAL_OP(op) ((op)->isFinal) +#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL); typedef struct SSessionAggOperatorInfo { SOptrBasicInfo binfo; @@ -56,6 +57,7 @@ typedef enum SResultTsInterpType { typedef struct SPullWindowInfo { STimeWindow window; uint64_t groupId; + STimeWindow calWin; } SPullWindowInfo; typedef struct SOpenWindowInfo { @@ -793,17 +795,18 @@ int32_t comparePullWinKey(void* pKey, void* data, int32_t index) { SArray* res = (SArray*)data; SPullWindowInfo* pos = taosArrayGet(res, index); SPullWindowInfo* pData = (SPullWindowInfo*)pKey; - if (pData->window.skey == pos->window.skey) { - if (pData->groupId > pos->groupId) { - return 1; - } else if (pData->groupId < pos->groupId) { - return -1; - } - return 0; - } else if (pData->window.skey > pos->window.skey) { + if (pData->groupId > pos->groupId) { return 1; + } else if (pData->groupId < pos->groupId) { + return -1; } - return -1; + + if (pData->window.skey > pos->window.ekey) { + return 1; + } else if (pData->window.ekey < pos->window.skey) { + return -1; + } + return 0; } static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) { @@ -812,10 +815,16 @@ static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) { if (index == -1) { index = 0; } else { - if (comparePullWinKey(pPullInfo, pPullWins, index) > 0) { - index++; - } else { + int32_t code = comparePullWinKey(pPullInfo, pPullWins, index); + if (code == 0) { + SPullWindowInfo* pos = taosArrayGet(pPullWins ,index); + pos->window.skey = TMIN(pos->window.skey, pPullInfo->window.skey); + pos->window.ekey = TMAX(pos->window.ekey, pPullInfo->window.ekey); + pos->calWin.skey = TMIN(pos->calWin.skey, pPullInfo->calWin.skey); + pos->calWin.ekey = TMAX(pos->calWin.ekey, pPullInfo->calWin.ekey); return TSDB_CODE_SUCCESS; + } else if (code > 0 ){ + index++; } } if (taosArrayInsert(pPullWins, index, pPullInfo) == NULL) { @@ -2255,8 +2264,8 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB colDataAppend(pStartTs, pBlock->info.rows, (const char*)&pWin->window.skey, false); colDataAppend(pEndTs, pBlock->info.rows, (const char*)&pWin->window.ekey, false); colDataAppend(pGroupId, pBlock->info.rows, (const char*)&pWin->groupId, false); - colDataAppend(pCalStartTs, pBlock->info.rows, (const char*)&pWin->window.skey, false); - colDataAppend(pCalEndTs, pBlock->info.rows, (const char*)&pWin->window.ekey, false); + colDataAppend(pCalStartTs, pBlock->info.rows, (const char*)&pWin->calWin.skey, false); + colDataAppend(pCalEndTs, pBlock->info.rows, (const char*)&pWin->calWin.ekey, false); pBlock->info.rows++; } if ((*pIndex) == size) { @@ -2266,27 +2275,33 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB blockDataUpdateTsWindow(pBlock, 0); } -void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) { +void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SInterval* pInterval) { SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); TSKEY* tsData = (TSKEY*)pStartCol->pData; + SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); + TSKEY* tsEndData = (TSKEY*)pEndCol->pData; SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); uint64_t* groupIdData = (uint64_t*)pGroupCol->pData; int32_t chId = getChildIndex(pBlock); for (int32_t i = 0; i < pBlock->info.rows; i++) { - SWinKey winRes = {.ts = tsData[i], .groupId = groupIdData[i]}; - void* chIds = taosHashGet(pMap, &winRes, sizeof(SWinKey)); - if (chIds) { - SArray* chArray = *(SArray**)chIds; - int32_t index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ); - if (index != -1) { - qDebug("===stream===window %" PRId64 " delete child id %d", winRes.ts, chId); - taosArrayRemove(chArray, index); - if (taosArrayGetSize(chArray) == 0) { - // pull data is over - taosArrayDestroy(chArray); - taosHashRemove(pMap, &winRes, sizeof(SWinKey)); + TSKEY winTs = tsData[i]; + while (winTs < tsEndData[i]) { + SWinKey winRes = {.ts = winTs, .groupId = groupIdData[i]}; + void* chIds = taosHashGet(pMap, &winRes, sizeof(SWinKey)); + if (chIds) { + SArray* chArray = *(SArray**)chIds; + int32_t index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ); + if (index != -1) { + qDebug("===stream===window %" PRId64 " delete child id %d", winRes.ts, chId); + taosArrayRemove(chArray, index); + if (taosArrayGetSize(chArray) == 0) { + // pull data is over + taosArrayDestroy(chArray); + taosHashRemove(pMap, &winRes, sizeof(SWinKey)); + } } } + winTs = taosTimeAdd(winTs, pInterval->sliding, pInterval->slidingUnit, pInterval->precision); } } } @@ -2299,12 +2314,13 @@ static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo) { if (needDeleteWindowBuf(&nextWin, &pInfo->twAggSup) && !pInfo->ignoreExpiredData) { void* chIds = taosHashGet(pInfo->pPullDataMap, winKey, sizeof(SWinKey)); if (!chIds) { - SPullWindowInfo pull = {.window = nextWin, .groupId = winKey->groupId}; + SPullWindowInfo pull = {.window = nextWin, .groupId = winKey->groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey}; // add pull data request - savePullWindow(&pull, pInfo->pPullWins); - int32_t size1 = taosArrayGetSize(pInfo->pChildren); - addPullWindow(pInfo->pPullDataMap, winKey, size1); - qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, size1); + if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) { + int32_t size1 = taosArrayGetSize(pInfo->pChildren); + addPullWindow(pInfo->pPullDataMap, winKey, size1); + qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, size1); + } } } } @@ -2374,12 +2390,13 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p }; void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey)); if (isDeletedStreamWindow(&nextWin, groupId, pInfo->pState, &pInfo->twAggSup) && !chIds) { - SPullWindowInfo pull = {.window = nextWin, .groupId = groupId}; + SPullWindowInfo pull = {.window = nextWin, .groupId = groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey}; // add pull data request - savePullWindow(&pull, pInfo->pPullWins); - int32_t size = taosArrayGetSize(pInfo->pChildren); - addPullWindow(pInfo->pPullDataMap, &winRes, size); - qDebug("===stream===prepare retrive %" PRId64 ", size:%d", winRes.ts, size); + if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) { + int32_t size = taosArrayGetSize(pInfo->pChildren); + addPullWindow(pInfo->pPullDataMap, &winRes, size); + qDebug("===stream===prepare retrive %" PRId64 ", size:%d", winRes.ts, size); + } } else { int32_t index = -1; SArray* chArray = NULL; @@ -2560,7 +2577,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } continue; } else if (pBlock->info.type == STREAM_PULL_OVER && IS_FINAL_OP(pInfo)) { - processPullOver(pBlock, pInfo->pPullDataMap); + processPullOver(pBlock, pInfo->pPullDataMap, &pInfo->interval); continue; } @@ -2638,6 +2655,15 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { return NULL; } +int64_t getDeleteMark(SIntervalPhysiNode* pIntervalPhyNode) { + if (pIntervalPhyNode->window.deleteMark <= 0) { + return DEAULT_DELETE_MARK; + } + int64_t deleteMark = TMAX(pIntervalPhyNode->window.deleteMark,pIntervalPhyNode->window.watermark); + deleteMark = TMAX(deleteMark, pIntervalPhyNode->interval); + return deleteMark; +} + SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild) { SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; @@ -2659,9 +2685,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, .calTrigger = pIntervalPhyNode->window.triggerType, .maxTs = INT64_MIN, .minTs = INT64_MAX, - // for test 315360000000 - .deleteMark = 1000LL * 60LL * 60LL * 24LL * 365LL * 10LL, - // .deleteMark = INT64_MAX, + .deleteMark = getDeleteMark(pIntervalPhyNode), .deleteMarkSaved = 0, .calTriggerSaved = 0, }; @@ -4805,7 +4829,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys .calTrigger = pIntervalPhyNode->window.triggerType, .maxTs = INT64_MIN, .minTs = INT64_MAX, - .deleteMark = INT64_MAX, + .deleteMark = getDeleteMark(pIntervalPhyNode), }; ASSERT(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY); diff --git a/tests/script/tsim/stream/sliding.sim b/tests/script/tsim/stream/sliding.sim index 49e22db1b1..c9a1ddd922 100644 --- a/tests/script/tsim/stream/sliding.sim +++ b/tests/script/tsim/stream/sliding.sim @@ -461,7 +461,7 @@ sql insert into t2 values(1648791213004,4,10,10,4.1); $loop_count = 0 loop2: -sleep 100 +sleep 200 $loop_count = $loop_count + 1 if $loop_count == 10 then @@ -519,7 +519,7 @@ print step 6 $loop_count = 0 loop3: -# sleep 300 +sleep 300 $loop_count = $loop_count + 1 if $loop_count == 10 then @@ -618,6 +618,60 @@ if $data41 != 2 then goto loop4 endi +sql insert into t1 values(1648791343003,4,4,4,3.1); +sql insert into t1 values(1648791213004,4,5,5,4.1); + +loop5: +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from streamt3; + +# row 0 +if $rows != 7 then + print =====rows=$rows + goto loop5 +endi + +if $data01 != 4 then + print =====data01=$data01 + goto loop5 +endi + +if $data11 != 6 then + print =====data11=$data11 + goto loop5 +endi + +if $data21 != 4 then + print =====data21=$data21 + goto loop5 +endi + +if $data31 != 4 then + print =====data31=$data31 + goto loop5 +endi + +if $data41 != 2 then + print =====data41=$data41 + goto loop5 +endi + +if $data51 != 1 then + print =====data51=$data51 + goto loop5 +endi + +if $data61 != 1 then + print =====data61=$data61 + goto loop5 +endi + $loop_all = $loop_all + 1 print ============loop_all=$loop_all From 77829e2676fe58b28d2bc62970fb1c6fd14756d1 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 9 Dec 2022 13:23:52 +0800 Subject: [PATCH 06/11] exhandle mem leak --- source/libs/transport/src/transCli.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 060d1d3a98..a6a79ce9e8 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -589,6 +589,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { } static int32_t allocConnRef(SCliConn* conn, bool update) { if (update) { + transReleaseExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId); conn->refId = -1; } @@ -697,6 +698,7 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); QUEUE_REMOVE(&conn->q); QUEUE_INIT(&conn->q); + transReleaseExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId); conn->refId = -1; @@ -731,6 +733,7 @@ static void cliDestroy(uv_handle_t* handle) { conn->timer = NULL; } + transReleaseExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId); taosMemoryFree(conn->ip); taosMemoryFree(conn->stream); From 52045c43ab3d0a683a5b8b514a0766a1f0a5e30b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 9 Dec 2022 13:30:25 +0800 Subject: [PATCH 07/11] compile error --- source/libs/transport/src/transCli.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index bc0486bd4c..879ff7c951 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -285,7 +285,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { } destroyCmsg(msg); } - memset(conn->ctx, 0, sizeof(conn->ctx)); + memset(&conn->ctx, 0, sizeof(conn->ctx)); } bool cliMaySendCachedMsg(SCliConn* conn) { if (!transQueueEmpty(&conn->cliMsgs)) { From 5b8cc2234b249a0299262bbc8a791e65127ac5ef Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 9 Dec 2022 14:38:29 +0800 Subject: [PATCH 08/11] fix: mem leak --- source/client/src/clientMain.c | 16 +++++----- source/client/src/clientTmq.c | 57 +++++++++++++++++++++++++++------- 2 files changed, 54 insertions(+), 19 deletions(-) diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 8f029e6061..333cfa3dfd 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -186,10 +186,10 @@ void taos_free_result(TAOS_RES *res) { destroyRequest(pRequest); } else if (TD_RES_TMQ_METADATA(res)) { SMqTaosxRspObj *pRsp = (SMqTaosxRspObj *)res; - if (pRsp->rsp.blockData) taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree); - if (pRsp->rsp.blockDataLen) taosArrayDestroy(pRsp->rsp.blockDataLen); - if (pRsp->rsp.withTbName) taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree); - if (pRsp->rsp.withSchema) taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); + taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree); + taosArrayDestroy(pRsp->rsp.blockDataLen); + taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree); + taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); // taosx taosArrayDestroy(pRsp->rsp.createTableLen); taosArrayDestroyP(pRsp->rsp.createTableReq, taosMemoryFree); @@ -199,10 +199,10 @@ void taos_free_result(TAOS_RES *res) { taosMemoryFree(pRsp); } else if (TD_RES_TMQ(res)) { SMqRspObj *pRsp = (SMqRspObj *)res; - if (pRsp->rsp.blockData) taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree); - if (pRsp->rsp.blockDataLen) taosArrayDestroy(pRsp->rsp.blockDataLen); - if (pRsp->rsp.withTbName) taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree); - if (pRsp->rsp.withSchema) taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); + taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree); + taosArrayDestroy(pRsp->rsp.blockDataLen); + taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree); + taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); pRsp->resInfo.pRspMsg = NULL; doFreeReqResultInfo(&pRsp->resInfo); taosMemoryFree(pRsp); diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index db717a4e4e..4352ec69d3 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -814,24 +814,55 @@ int32_t tmqHandleAllDelayedTask(tmq_t* tmq) { return 0; } +static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) { + if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) { + // do nothing + } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) { + SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper; + tDeleteSMqAskEpRsp(&pEpRspWrapper->msg); + } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) { + SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper; + taosArrayDestroyP(pRsp->dataRsp.blockData, taosMemoryFree); + taosArrayDestroy(pRsp->dataRsp.blockDataLen); + taosArrayDestroyP(pRsp->dataRsp.blockTbName, taosMemoryFree); + taosArrayDestroyP(pRsp->dataRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); + } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { + SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper; + taosMemoryFree(pRsp->metaRsp.metaRsp); + } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) { + SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper; + taosArrayDestroyP(pRsp->taosxRsp.blockData, taosMemoryFree); + taosArrayDestroy(pRsp->taosxRsp.blockDataLen); + taosArrayDestroyP(pRsp->taosxRsp.blockTbName, taosMemoryFree); + taosArrayDestroyP(pRsp->taosxRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); + // taosx + taosArrayDestroy(pRsp->taosxRsp.createTableLen); + taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree); + } +} + void tmqClearUnhandleMsg(tmq_t* tmq) { - SMqRspWrapper* msg = NULL; + SMqRspWrapper* rspWrapper = NULL; while (1) { - taosGetQitem(tmq->qall, (void**)&msg); - if (msg) - taosFreeQitem(msg); - else + taosGetQitem(tmq->qall, (void**)&rspWrapper); + if (rspWrapper) { + tmqFreeRspWrapper(rspWrapper); + taosFreeQitem(rspWrapper); + } else { break; + } } - msg = NULL; + rspWrapper = NULL; taosReadAllQitems(tmq->mqueue, tmq->qall); while (1) { - taosGetQitem(tmq->qall, (void**)&msg); - if (msg) - taosFreeQitem(msg); - else + taosGetQitem(tmq->qall, (void**)&rspWrapper); + if (rspWrapper) { + tmqFreeRspWrapper(rspWrapper); + taosFreeQitem(rspWrapper); + } else { break; + } } } @@ -1644,6 +1675,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) tDeleteSMqAskEpRsp(rspMsg); *pReset = true; } else { + tmqFreeRspWrapper(rspWrapper); *pReset = false; } } else { @@ -1695,6 +1727,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } else { tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", pollRspWrapper->dataRsp.head.epoch, consumerEpoch); + tmqFreeRspWrapper(rspWrapper); taosFreeQitem(pollRspWrapper); } } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { @@ -1713,6 +1746,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } else { tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", pollRspWrapper->metaRsp.head.epoch, consumerEpoch); + tmqFreeRspWrapper(rspWrapper); taosFreeQitem(pollRspWrapper); } } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) { @@ -1743,6 +1777,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } else { tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); + tmqFreeRspWrapper(rspWrapper); taosFreeQitem(pollRspWrapper); } } else { @@ -1794,7 +1829,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { while (1) { tmqHandleAllDelayedTask(tmq); if (tmqPollImpl(tmq, timeout) < 0) { - tscDebug("return since poll err"); + tscDebug("consumer:%" PRId64 " return since poll err", tmq->consumerId); /*return NULL;*/ } From e150913c7b18d63331c58c69b79a5ffb9b67e794 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 9 Dec 2022 17:08:51 +0800 Subject: [PATCH 09/11] change default sch value --- source/libs/qworker/inc/qwInt.h | 8 ++++---- source/libs/qworker/src/qwUtil.c | 11 +++++++++-- source/libs/qworker/src/qworker.c | 33 +++++++++++++++++-------------- 3 files changed, 31 insertions(+), 21 deletions(-) diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index af361323a7..9d8fad6fea 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -31,7 +31,7 @@ extern "C" { #define QW_DEFAULT_SCHEDULER_NUMBER 100 #define QW_DEFAULT_TASK_NUMBER 10000 -#define QW_DEFAULT_SCH_TASK_NUMBER 10000 +#define QW_DEFAULT_SCH_TASK_NUMBER 3000 #define QW_DEFAULT_SHORT_RUN_TIMES 2 #define QW_DEFAULT_HEARTBEAT_MSEC 5000 #define QW_SCH_TIMEOUT_MSEC 180000 @@ -247,7 +247,7 @@ typedef struct SQWorkerMgmt { #define QW_ERR_RET(c) \ do { \ - int32_t _code = (c); \ + int32_t _code = (c); \ if (_code != TSDB_CODE_SUCCESS) { \ terrno = _code; \ return _code; \ @@ -255,7 +255,7 @@ typedef struct SQWorkerMgmt { } while (0) #define QW_RET(c) \ do { \ - int32_t _code = (c); \ + int32_t _code = (c); \ if (_code != TSDB_CODE_SUCCESS) { \ terrno = _code; \ } \ @@ -263,7 +263,7 @@ typedef struct SQWorkerMgmt { } while (0) #define QW_ERR_JRET(c) \ do { \ - code = (c); \ + code = (c); \ if (code != TSDB_CODE_SUCCESS) { \ terrno = code; \ goto _return; \ diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index ce3b493638..f3d073634d 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -281,7 +281,7 @@ void qwFreeTaskHandle(qTaskInfo_t *taskHandle) { int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode) { int32_t code = 0; - + // Note: free/kill may in RC qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle); if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) { @@ -463,6 +463,8 @@ void qwDestroyImpl(void *pMgmt) { int8_t nodeType = mgmt->nodeType; int32_t nodeId = mgmt->nodeId; + int32_t taskCount = 0; + int32_t schStatusCount = 0; qDebug("start to destroy qworker, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt); taosTmrStop(mgmt->hbTimer); @@ -472,6 +474,7 @@ void qwDestroyImpl(void *pMgmt) { uint64_t qId, tId; int32_t eId; void *pIter = taosHashIterate(mgmt->ctxHash, NULL); + while (pIter) { SQWTaskCtx *ctx = (SQWTaskCtx *)pIter; void *key = taosHashGetKey(pIter, NULL); @@ -480,6 +483,7 @@ void qwDestroyImpl(void *pMgmt) { qwFreeTaskCtx(ctx); QW_TASK_DLOG_E("task ctx freed"); pIter = taosHashIterate(mgmt->ctxHash, pIter); + taskCount++; } taosHashCleanup(mgmt->ctxHash); @@ -487,7 +491,9 @@ void qwDestroyImpl(void *pMgmt) { while (pIter) { SQWSchStatus *sch = (SQWSchStatus *)pIter; qwDestroySchStatus(sch); + pIter = taosHashIterate(mgmt->schHash, pIter); + schStatusCount++; } taosHashCleanup(mgmt->schHash); @@ -499,7 +505,8 @@ void qwDestroyImpl(void *pMgmt) { qwCloseRef(); - qDebug("qworker destroyed, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt); + qDebug("qworker destroyed, type:%d, id:%d, handle:%p, taskCount:%d, schStatusCount: %d", nodeType, nodeId, mgmt, + taskCount, schStatusCount); } int32_t qwOpenRef(void) { diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index c5db4105d7..81f73b1226 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -8,9 +8,9 @@ #include "qwMsg.h" #include "tcommon.h" #include "tdatablock.h" +#include "tglobal.h" #include "tmsg.h" #include "tname.h" -#include "tglobal.h" SQWorkerMgmt gQwMgmt = { .lock = 0, @@ -117,7 +117,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { if (queryStop) { *queryStop = true; } - + return TSDB_CODE_SUCCESS; } @@ -275,7 +275,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, QW_ERR_RET(code); } - QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %"PRId64, pOutput->numOfBlocks, + QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %" PRId64, pOutput->numOfBlocks, pOutput->numOfRows); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC); @@ -327,12 +327,14 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, } if (0 == ctx->level) { - QW_TASK_DLOG("task fetched blocks %d rows %"PRId64", level %d", pOutput->numOfBlocks, pOutput->numOfRows, ctx->level); + QW_TASK_DLOG("task fetched blocks %d rows %" PRId64 ", level %d", pOutput->numOfBlocks, pOutput->numOfRows, + ctx->level); break; } if (pOutput->numOfRows >= QW_MIN_RES_ROWS) { - QW_TASK_DLOG("task fetched blocks %d rows %" PRId64 " reaches the min rows", pOutput->numOfBlocks, pOutput->numOfRows); + QW_TASK_DLOG("task fetched blocks %d rows %" PRId64 " reaches the min rows", pOutput->numOfBlocks, + pOutput->numOfRows); break; } } @@ -538,7 +540,7 @@ _return: SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo}; qwDbgSimulateRedirect(&qwMsg, ctx, &rsped); qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped); - if (!rsped) { + if (!rsped) { qwSendQueryRsp(QW_FPARAMS(), input->msgType + 1, ctx, code, false); } } @@ -650,8 +652,8 @@ _return: code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL); if (QUERY_RSP_POLICY_QUICK == tsQueryRspPolicy && ctx != NULL && QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { - void *rsp = NULL; - int32_t dataLen = 0; + void *rsp = NULL; + int32_t dataLen = 0; SOutputData sOutput = {0}; if (qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)) { return TSDB_CODE_SUCCESS; @@ -671,8 +673,8 @@ _return: qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, rsp, dataLen, code); rsp = NULL; - QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, - tstrerror(code), dataLen); + QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), + dataLen); } } @@ -689,7 +691,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { do { ctx = NULL; - + QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL)); QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); @@ -748,7 +750,8 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } QW_LOCK(QW_WRITE, &ctx->lock); - if ((queryStop && (0 == atomic_load_8((int8_t *)&ctx->queryContinue))) || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) { + if ((queryStop && (0 == atomic_load_8((int8_t *)&ctx->queryContinue))) || code || + 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) { // Note: query is not running anymore QW_SET_PHASE(ctx, 0); QW_UNLOCK(QW_WRITE, &ctx->lock); @@ -1176,7 +1179,7 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) { QW_DLOG("start to stop all tasks, taskNum:%d", taosHashGetSize(mgmt->ctxHash)); uint64_t qId, tId; - int32_t eId; + int32_t eId; void *pIter = taosHashIterate(mgmt->ctxHash, NULL); while (pIter) { SQWTaskCtx *ctx = (SQWTaskCtx *)pIter; @@ -1186,7 +1189,7 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) { QW_LOCK(QW_WRITE, &ctx->lock); QW_TASK_DLOG_E("start to force stop task"); - + if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { QW_TASK_WLOG_E("task already dropping"); QW_UNLOCK(QW_WRITE, &ctx->lock); @@ -1194,7 +1197,7 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) { pIter = taosHashIterate(mgmt->ctxHash, pIter); continue; } - + if (QW_QUERY_RUNNING(ctx)) { qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED); } else if (!QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { From a6fff957958d328e582613b8be36110cb0175da0 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 9 Dec 2022 16:51:18 +0800 Subject: [PATCH 10/11] fix: close wal ref --- include/libs/wal/wal.h | 20 ++++++++++---------- source/dnode/vnode/src/tq/tq.c | 14 ++++++++++---- source/libs/wal/src/walMgmt.c | 1 + source/libs/wal/src/walRef.c | 7 +++++-- 4 files changed, 26 insertions(+), 16 deletions(-) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 8b9fc84334..a1ae1e429d 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -33,16 +33,16 @@ extern "C" { #define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", DEBUG_TRACE, wDebugFlag, __VA_ARGS__); }} // clang-format on -#define WAL_PROTO_VER 0 -#define WAL_NOSUFFIX_LEN 20 -#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1) -#define WAL_LOG_SUFFIX "log" -#define WAL_INDEX_SUFFIX "idx" -#define WAL_REFRESH_MS 1000 -#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) -#define WAL_FILE_LEN (WAL_PATH_LEN + 32) -#define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL -#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3) +#define WAL_PROTO_VER 0 +#define WAL_NOSUFFIX_LEN 20 +#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1) +#define WAL_LOG_SUFFIX "log" +#define WAL_INDEX_SUFFIX "idx" +#define WAL_REFRESH_MS 1000 +#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) +#define WAL_FILE_LEN (WAL_PATH_LEN + 32) +#define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL +#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3) typedef enum { TAOS_WAL_WRITE = 1, diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 191fc1b49c..389c8013f9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -725,9 +725,15 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgL } taosWUnLockLatch(&pTq->pushLock); - code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); - if (code != 0) { - tqError("cannot process tq delete req %s, since no such handle", pReq->subKey); + STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); + if (pHandle) { + if (pHandle->pRef) { + walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId); + } + code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); + if (code != 0) { + tqError("cannot process tq delete req %s, since no such handle", pReq->subKey); + } } code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey); @@ -736,7 +742,7 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgL } if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) { - ASSERT(0); + tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey); } return 0; } diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index f53088fac6..702d05f576 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -223,6 +223,7 @@ void walClose(SWal *pWal) { taosMemoryFree(pRef); } taosHashCleanup(pWal->pRefHash); + pWal->pRefHash = NULL; taosThreadMutexUnlock(&pWal->mutex); taosRemoveRef(tsWal.refSetId, pWal->refId); diff --git a/source/libs/wal/src/walRef.c b/source/libs/wal/src/walRef.c index eae5d9f1a7..e86111109c 100644 --- a/source/libs/wal/src/walRef.c +++ b/source/libs/wal/src/walRef.c @@ -32,15 +32,18 @@ SWalRef *walOpenRef(SWal *pWal) { return pRef; } -#if 1 void walCloseRef(SWal *pWal, int64_t refId) { SWalRef **ppRef = taosHashGet(pWal->pRefHash, &refId, sizeof(int64_t)); if (ppRef == NULL) return; SWalRef *pRef = *ppRef; + if (pRef) { + wDebug("vgId:%d, wal close ref %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, pRef->refVer, pRef->refId); + } else { + wDebug("vgId:%d, wal close ref null, refId %" PRId64, pWal->cfg.vgId, refId); + } taosHashRemove(pWal->pRefHash, &refId, sizeof(int64_t)); taosMemoryFree(pRef); } -#endif int32_t walRefVer(SWalRef *pRef, int64_t ver) { SWal *pWal = pRef->pWal; From 6f663c3035db3c9c828c2f7a3e33056eed9a5443 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 9 Dec 2022 17:21:27 +0800 Subject: [PATCH 11/11] release: build 3.0.2.0 --- cmake/cmake.version | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/cmake.version b/cmake/cmake.version index 9faa7c75dd..b4084bd095 100644 --- a/cmake/cmake.version +++ b/cmake/cmake.version @@ -2,7 +2,7 @@ IF (DEFINED VERNUMBER) SET(TD_VER_NUMBER ${VERNUMBER}) ELSE () - SET(TD_VER_NUMBER "3.0.1.8") + SET(TD_VER_NUMBER "3.0.2.0") ENDIF () IF (DEFINED VERCOMPATIBLE)