From dc1ea9f9a15ad7ea8281a0640699937880d51f92 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 24 Jan 2024 16:23:47 +0800 Subject: [PATCH] fix(stream): clear htask info when unregistering the task. --- source/dnode/mnode/impl/src/mndStream.c | 8 +++++++- source/libs/stream/src/streamMeta.c | 9 +++++++-- source/libs/stream/src/streamTask.c | 12 ++++++------ tests/system-test/2-query/select_null.py | 3 ++- 4 files changed, 22 insertions(+), 10 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 696daca918..b8e0126650 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -85,6 +85,7 @@ static void killTransImpl(SMnode *pMnode, int32_t transId, const char *pDbNam static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList); static void freeCheckpointCandEntry(void *); +static void freeTaskList(void *param); static SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); @@ -154,6 +155,7 @@ int32_t mndInitStream(SMnode *pMnode) { execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK); taosHashSetFreeFp(execInfo.transMgmt.pWaitingList, freeCheckpointCandEntry); + taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList); if (sdbSetTable(pMnode->pSdb, table) != 0) { return -1; @@ -3036,6 +3038,11 @@ void freeCheckpointCandEntry(void *param) { taosMemoryFreeClear(pEntry->pName); } +void freeTaskList(void* param) { + SArray** pList = (SArray **)param; + taosArrayDestroy(*pList); +} + SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) { void *pIter = NULL; SSdb *pSdb = pMnode->pSdb; @@ -3111,7 +3118,6 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false); // remove this entry - taosArrayDestroy(*pReqTaskList); taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t)); int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 87c558a99e..331cf60077 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -467,7 +467,6 @@ void streamMetaClear(SStreamMeta* pMeta) { } taosRemoveRef(streamBackendId, pMeta->streamBackendRid); - taosHashClear(pMeta->pTasksMap); taosArrayClear(pMeta->pTaskList); @@ -505,7 +504,9 @@ void streamMetaCloseImpl(void* arg) { return; } + streamMetaWLock(pMeta); streamMetaClear(pMeta); + streamMetaWUnLock(pMeta); tdbAbort(pMeta->db, pMeta->txn); tdbTbClose(pMeta->pTaskDb); @@ -519,7 +520,6 @@ void streamMetaCloseImpl(void* arg) { taosHashCleanup(pMeta->pTasksMap); taosHashCleanup(pMeta->pTaskDbUnique); taosHashCleanup(pMeta->pUpdateTaskSet); - // taosHashCleanup(pMeta->pTaskBackendUnique); taosHashCleanup(pMeta->updateInfo.pTasks); taosHashCleanup(pMeta->startInfo.pReadyTaskSet); taosHashCleanup(pMeta->startInfo.pFailedTaskSet); @@ -534,6 +534,8 @@ void streamMetaCloseImpl(void* arg) { bkdMgtDestroy(pMeta->bkdChkptMgt); pMeta->role = NODE_ROLE_UNINIT; + taosThreadRwlockDestroy(&pMeta->lock); + taosMemoryFree(pMeta); stDebug("end to close stream meta"); } @@ -731,6 +733,9 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t // it is an fill-history task, remove the related stream task's id that points to it atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1); + if (pTask->info.fillHistory == 1) { + streamTaskClearHTaskAttr(pTask, false); + } taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); doRemoveIdFromList(pMeta, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 0c671ccc6f..66d34d8712 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -346,9 +346,9 @@ void tFreeStreamTask(SStreamTask* pTask, bool metaLock) { STaskExecStatisInfo* pStatis = &pTask->execInfo; // check for mnode - if (pTask->pMeta != NULL) { - streamTaskClearHTaskAttr(pTask, metaLock); - } +// if (pTask->pMeta != NULL) { +// streamTaskClearHTaskAttr(pTask, metaLock); +// } ETaskStatus status1 = TASK_STATUS__UNINIT; taosThreadMutexLock(&pTask->lock); @@ -751,13 +751,13 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, bool metaLock) { SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &sTaskId, sizeof(sTaskId)); if (ppStreamTask != NULL) { + stDebug("s-task:%s clear the related stream task:0x%x attr to fill-history task", pTask->id.idStr, + (int32_t)sTaskId.taskId); + taosThreadMutexLock(&(*ppStreamTask)->lock); CLEAR_RELATED_FILLHISTORY_TASK((*ppStreamTask)); streamMetaSaveTask(pMeta, *ppStreamTask); taosThreadMutexUnlock(&(*ppStreamTask)->lock); - - stDebug("s-task:%s clear the related stream task:0x%x attr to fill-history task", pTask->id.idStr, - (int32_t)sTaskId.taskId); } if (metaLock) { diff --git a/tests/system-test/2-query/select_null.py b/tests/system-test/2-query/select_null.py index 8411a33a1f..682a98ad19 100755 --- a/tests/system-test/2-query/select_null.py +++ b/tests/system-test/2-query/select_null.py @@ -24,7 +24,8 @@ from util.dnodes import tdDnodes from util.dnodes import * class TDTestCase: - + updatecfgDict = {'debugflag':0,'stdebugFlag': 143 ,"tqDebugflag":135} + def init(self, conn, logSql, replicaVar): tdLog.debug("start to execute %s" % __file__) tdSql.init(conn.cursor(), logSql)