From d364397e0d981c575135fd47ebd73d3f23740aed Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 31 Jul 2024 19:15:17 +0800 Subject: [PATCH 1/9] fix(stream): fix dead lock caused by refactor. --- source/libs/stream/src/streamCheckpoint.c | 12 +++++++++--- source/libs/stream/src/streamDispatch.c | 4 ++-- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 4dc7b14cc3..0e89cb003e 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -390,6 +390,7 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId for (int32_t i = 0; i < size; ++i) { STaskDownstreamReadyInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i); if (p == NULL) { + streamMutexUnlock(&pInfo->lock); return TSDB_CODE_INVALID_PARA; } @@ -433,6 +434,7 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) { STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i); if (pReadyInfo == NULL) { + streamMutexUnlock(&pInfo->lock); return TSDB_CODE_INVALID_PARA; } @@ -447,6 +449,7 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) { STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i); if (pReadyInfo == NULL) { + streamMutexUnlock(&pInfo->lock); return TSDB_CODE_INVALID_PARA; } @@ -830,6 +833,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { if (pNotSendList == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno)); + streamMutexUnlock(&pActiveInfo->lock); return; } @@ -938,13 +942,14 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) streamMutexLock(&pInfo->lock); if (!pInfo->dispatchTrigger) { - streamMutexUnlock(&pTask->lock); + streamMutexUnlock(&pInfo->lock); return false; } for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) { STaskTriggerSendInfo* pSendInfo = taosArrayGet(pInfo->pDispatchTriggerList, i); if (pSendInfo == NULL) { + streamMutexUnlock(&pInfo->lock); return TSDB_CODE_INVALID_PARA; } @@ -964,11 +969,11 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) id, pSendInfo->sendTs, before, pInfo->activeId, pInfo->transId); } - streamMutexUnlock(&pTask->lock); + streamMutexUnlock(&pInfo->lock); return true; } - ASSERT(0); + streamMutexUnlock(&pInfo->lock); return false; } @@ -1028,6 +1033,7 @@ int32_t streamTaskGetNumOfConfirmed(SStreamTask* pTask) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) { STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i); if (p == NULL) { + streamMutexUnlock(&pInfo->lock); return num; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 4e128ace54..255afb44f9 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -907,8 +907,8 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, i); SRpcMsg msg = {0}; - int32_t code = initCheckpointReadyMsg(pTask, pInfo->upstreamNodeId, pInfo->upstreamTaskId, pInfo->childId, pInfo->checkpointId, - &msg); + int32_t code = initCheckpointReadyMsg(pTask, pInfo->upstreamNodeId, pInfo->upstreamTaskId, pInfo->childId, + pInfo->checkpointId, &msg); if (code == TSDB_CODE_SUCCESS) { code = tmsgSendReq(&pInfo->upstreamNodeEpset, &msg); if (code == TSDB_CODE_SUCCESS) { From 4b9b1e2474ab37dcc80153163fd18e211788a0b7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 31 Jul 2024 19:24:41 +0800 Subject: [PATCH 2/9] fix(stream): compare vg replica according to different db. --- source/dnode/mnode/impl/src/mndStreamUtil.c | 111 +++++++++++++------- 1 file changed, 74 insertions(+), 37 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 5a17d659cd..fe124de0be 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -87,15 +87,48 @@ void destroyStreamTaskIter(SStreamTaskIter* pIter) { taosMemoryFree(pIter); } -int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray** pList) { - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; - SVgObj *pVgroup = NULL; - int32_t replica = -1; // do the replica check - int32_t code = 0; +static bool checkStatusForEachReplica(SVgObj *pVgroup) { + for (int32_t i = 0; i < pVgroup->replica; ++i) { + if (!pVgroup->vnodeGid[i].syncRestore) { + mInfo("vgId:%d not restored, not ready for checkpoint or other operations", pVgroup->vgId); + return false; + } + + ESyncState state = pVgroup->vnodeGid[i].syncState; + if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR || state == TAOS_SYNC_STATE_LEARNER || + state == TAOS_SYNC_STATE_CANDIDATE) { + mInfo("vgId:%d state:%d , not ready for checkpoint or other operations, not check other vgroups", pVgroup->vgId, + state); + return false; + } + } + + return true; +} + +int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + SVgObj *pVgroup = NULL; + int32_t code = 0; + SArray *pVgroupList = NULL; + SHashObj *pHash = NULL; + + pVgroupList = taosArrayInit(4, sizeof(SNodeEntry)); + if (pVgroupList == NULL) { + mError("failed to prepare arraylist during take vgroup snapshot, code:%s", tstrerror(terrno)); + code = terrno; + goto _err; + } + + pHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + if (pHash == NULL) { + mError("failed to prepare hashmap during take vgroup snapshot, code:%s", tstrerror(terrno)); + code = terrno; + goto _err; + } *allReady = true; - SArray *pVgroupList = taosArrayInit(4, sizeof(SNodeEntry)); while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); @@ -106,44 +139,37 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray** pList) { SNodeEntry entry = {.nodeId = pVgroup->vgId, .hbTimestamp = pVgroup->updateTime}; entry.epset = mndGetVgroupEpset(pMnode, pVgroup); - if (replica == -1) { - replica = pVgroup->replica; - } else { - if (replica != pVgroup->replica) { - mInfo("vgId:%d replica:%d inconsistent with other vgroups replica:%d, not ready for stream operations", - pVgroup->vgId, pVgroup->replica, replica); - *allReady = false; + int8_t *pReplica = taosHashGet(pHash, &pVgroup->dbUid, sizeof(pVgroup->dbUid)); + if (pReplica == NULL) { // not exist, add it into hash map + code = taosHashPut(pHash, &pVgroup->dbUid, sizeof(pVgroup->dbUid), &pVgroup->replica, sizeof(pVgroup->replica)); + if (code) { + mError("failed to put info into hashmap during task vgroup snapshot, code:%s", tstrerror(code)); sdbRelease(pSdb, pVgroup); - break; + goto _err; // take snapshot failed, and not all ready + } + } else { + if (*pReplica != pVgroup->replica) { + mInfo("vgId:%d replica:%d inconsistent with other vgroups replica:%d, not ready for stream operations", + pVgroup->vgId, pVgroup->replica, *pReplica); + *allReady = false; // task snap success, but not all ready } } // if not all ready till now, no need to check the remaining vgroups. + // but still we need to put the info of the existed vgroups into the snapshot list if (*allReady) { - for (int32_t i = 0; i < pVgroup->replica; ++i) { - if (!pVgroup->vnodeGid[i].syncRestore) { - mInfo("vgId:%d not restored, not ready for checkpoint or other operations", pVgroup->vgId); - *allReady = false; - break; - } - - ESyncState state = pVgroup->vnodeGid[i].syncState; - if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR || state == TAOS_SYNC_STATE_LEARNER || - state == TAOS_SYNC_STATE_CANDIDATE) { - mInfo("vgId:%d state:%d , not ready for checkpoint or other operations, not check other vgroups", - pVgroup->vgId, state); - *allReady = false; - break; - } - } + *allReady = checkStatusForEachReplica(pVgroup); } char buf[256] = {0}; - (void) epsetToStr(&entry.epset, buf, tListLen(buf)); + (void)epsetToStr(&entry.epset, buf, tListLen(buf)); - void* p = taosArrayPush(pVgroupList, &entry); + void *p = taosArrayPush(pVgroupList, &entry); if (p == NULL) { mError("failed to put entry in vgroup list, nodeId:%d code:out of memory", entry.nodeId); + code = terrno; + sdbRelease(pSdb, pVgroup); + goto _err; } else { mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf); } @@ -162,15 +188,19 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray** pList) { code = addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port); if (code) { sdbRelease(pSdb, pObj); - continue; + mError("failed to extract epset for fqdn:%s during task vgroup snapshot", pObj->pDnode->fqdn); + goto _err; } char buf[256] = {0}; - (void) epsetToStr(&entry.epset, buf, tListLen(buf)); + (void)epsetToStr(&entry.epset, buf, tListLen(buf)); - void* p = taosArrayPush(pVgroupList, &entry); + void *p = taosArrayPush(pVgroupList, &entry); if (p == NULL) { - mError("failed to put entry in vgroup list, nodeId:%d code:out of memory", entry.nodeId); + code = terrno; + sdbRelease(pSdb, pObj); + mError("failed to put entry in vgroup list, nodeId:%d code:%s", entry.nodeId, tstrerror(code)); + goto _err; } else { mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf); } @@ -180,6 +210,13 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray** pList) { *pList = pVgroupList; return code; + +_err: + *allReady = false; + taosArrayDestroy(pVgroupList); + taosHashCleanup(pHash); + + return code; } int32_t mndGetStreamObj(SMnode *pMnode, int64_t streamId, SStreamObj **pStream) { From 02123897b4d6694fb3bda4b2ded1645f891e7f63 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 31 Jul 2024 19:55:26 +0800 Subject: [PATCH 3/9] fix(stream): fix memory leak. --- source/dnode/mnode/impl/src/mndStreamUtil.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 10c6e5bf82..85e9737958 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -208,6 +208,7 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) { } *pList = pVgroupList; + taosHashCleanup(pHash); return code; _err: From 3a74d5bd11db00c4c78c3a6c8d4ff6dc10269214 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 1 Aug 2024 09:21:54 +0800 Subject: [PATCH 4/9] fix(stream): cancel fetch --- source/dnode/mnode/impl/src/mndStreamUtil.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 85e9737958..cc3fdd6bb5 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -144,6 +144,7 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) { if (code) { mError("failed to put info into hashmap during task vgroup snapshot, code:%s", tstrerror(code)); sdbRelease(pSdb, pVgroup); + sdbCancelFetch(pSdb, pIter); goto _err; // take snapshot failed, and not all ready } } else { @@ -168,6 +169,7 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) { mError("failed to put entry in vgroup list, nodeId:%d code:out of memory", entry.nodeId); code = terrno; sdbRelease(pSdb, pVgroup); + sdbCancelFetch(pSdb, pIter); goto _err; } else { mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf); @@ -187,6 +189,7 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) { code = addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port); if (code) { sdbRelease(pSdb, pObj); + sdbCancelFetch(pSdb, pIter); mError("failed to extract epset for fqdn:%s during task vgroup snapshot", pObj->pDnode->fqdn); goto _err; } @@ -198,6 +201,7 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) { if (p == NULL) { code = terrno; sdbRelease(pSdb, pObj); + sdbCancelFetch(pSdb, pIter); mError("failed to put entry in vgroup list, nodeId:%d code:%s", entry.nodeId, tstrerror(code)); goto _err; } else { From 240d2c2a2cb891732da6a9d5a90ee604894eae70 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 1 Aug 2024 09:31:51 +0800 Subject: [PATCH 5/9] fix(stream): fix dead lock caused by refactor. --- source/libs/stream/src/streamCheckpoint.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 0e89cb003e..f817447099 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -253,6 +253,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock for (int32_t i = 0; i < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++i) { STaskCheckpointReadyInfo* p = taosArrayGet(pActiveInfo->pReadyMsgList, i); if (p == NULL) { + streamMutexUnlock(&pTask->lock); return TSDB_CODE_INVALID_PARA; } From c2b6d0da62b3c01dc9e6dcafe6aff6f5de6810a0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 1 Aug 2024 14:15:08 +0800 Subject: [PATCH 6/9] fix(stream): add more check. --- source/libs/stream/src/streamTask.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 86090fed43..a249bad724 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -602,9 +602,9 @@ int32_t streamTaskStop(SStreamTask* pTask) { stError("failed to handle STOP event, s-task:%s", id); } - if (pTask->info.taskLevel != TASK_LEVEL__SINK) { + if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) { code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS); - if (code) { + if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s failed to kill task related query handle", id); } } From b35ebe61312bfe250788c10a0294a51d56c830b2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 1 Aug 2024 15:48:09 +0800 Subject: [PATCH 7/9] fix(stream): set correct return value. --- source/libs/stream/src/streamMeta.c | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 81640e0050..08cf490b94 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1199,14 +1199,17 @@ void streamMetaWUnLock(SStreamMeta* pMeta) { } int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) { - *pList = NULL; + QRY_OPTR_CHECK(pList); + int32_t code = 0; SArray* pTaskList = taosArrayDup(pMeta->pTaskList, NULL); if (pTaskList == NULL) { stError("failed to generate the task list during send hbMsg to mnode, vgId:%d, code: out of memory", pMeta->vgId); - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } + *pList = pTaskList; + bool sendMsg = pMeta->sendMsgBeforeClosing; if (!sendMsg) { stDebug("vgId:%d no need to send msg to mnode before closing tasks", pMeta->vgId); @@ -1239,9 +1242,9 @@ int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) { streamMetaReleaseTask(pMeta, pTask); } - code = streamMetaSendHbHelper(pMeta); + (void)streamMetaSendHbHelper(pMeta); pMeta->sendMsgBeforeClosing = false; - return code; + return TSDB_CODE_SUCCESS; // always return true } void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader) { From 90ab86deb6a485f40d8b198e6e0f34f8119d6068 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 1 Aug 2024 15:59:57 +0800 Subject: [PATCH 8/9] fix(stream): remove invalid return code check. --- source/dnode/mnode/impl/src/mndStreamUtil.c | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index cc3fdd6bb5..38fddd8bf0 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -636,12 +636,9 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) { void *pBuf = NULL; int32_t len = 0; - int32_t code = streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList); - if (code) { - return code; - } + (void)streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList); - code = doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id); + int32_t code = doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id); if (code) { return code; } From 55b06746de7a14410c757c0c14aea6b346fe3ec5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 1 Aug 2024 22:22:22 +0800 Subject: [PATCH 9/9] fix(test): update a test case. --- tests/system-test/8-stream/stream_multi_agg.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/system-test/8-stream/stream_multi_agg.py b/tests/system-test/8-stream/stream_multi_agg.py index 1386814f0c..ede3ef427a 100644 --- a/tests/system-test/8-stream/stream_multi_agg.py +++ b/tests/system-test/8-stream/stream_multi_agg.py @@ -41,8 +41,9 @@ class TDTestCase: time.sleep(10) tdSql.execute("use test", queryTimes=100) tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s)") - tdLog.debug("========create stream and insert data ok========") + time.sleep(5) + tdLog.debug("========create stream and insert data ok========") tdSql.query("select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s) order by groupid,_wstart") rowCnt = tdSql.getRows() results_meters = tdSql.queryResult