From faa4bbcaca726181d5ebecda1ba1adf31040117b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 23 Aug 2023 14:26:45 +0800 Subject: [PATCH 1/3] fix mem leak --- source/dnode/mnode/impl/src/mndStream.c | 162 +++++++++++++----------- 1 file changed, 85 insertions(+), 77 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 3eca61c1dd..53afcc0e3f 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -77,12 +77,12 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in int64_t streamId, int32_t taskId); static int32_t mndProcessNodeCheck(SRpcMsg *pReq); static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); -static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecNode); +static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode); -static SArray *doExtractNodeListFromStream(SMnode *pMnode); -static SArray *mndTakeVgroupSnapshot(SMnode *pMnode); +static SArray *doExtractNodeListFromStream(SMnode *pMnode); +static SArray *mndTakeVgroupSnapshot(SMnode *pMnode); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); -static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans); +static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans); static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset); int32_t mndInitStream(SMnode *pMnode) { @@ -129,7 +129,11 @@ int32_t mndInitStream(SMnode *pMnode) { return sdbSetTable(pMnode->pSdb, table); } -void mndCleanupStream(SMnode *pMnode) {} +void mndCleanupStream(SMnode *pMnode) { + taosArrayDestroy(execNodeList.pTaskList); + taosHashCleanup(execNodeList.pTaskMap); + taosThreadMutexDestroy(&execNodeList.lock); +} SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1156,7 +1160,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode); SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot); - bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0); + bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0); taosArrayDestroy(changeInfo.pUpdateNodeList); taosHashCleanup(changeInfo.pDBMap); taosArrayDestroy(pNodeSnapshot); @@ -1167,12 +1171,12 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { } } - {// check if all tasks are in TASK_STATUS__NORMAL status + { // check if all tasks are in TASK_STATUS__NORMAL status bool ready = true; taosThreadMutexLock(&execNodeList.lock); - for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pTaskList); ++i) { - STaskStatusEntry* p = taosArrayGet(execNodeList.pTaskList, i); + for (int32_t i = 0; i < taosArrayGetSize(execNodeList.pTaskList); ++i) { + STaskStatusEntry *p = taosArrayGet(execNodeList.pTaskList, i); if (p->status != TASK_STATUS__NORMAL) { mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, create checkpoint msg not issued", p->streamId, p->taskId, 0, streamGetTaskStatusStr(p->status)); @@ -1197,9 +1201,9 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { } mDebug("start to trigger checkpoint, checkpointId: %" PRId64 "", checkpointId); - const char* pDb = mndGetStreamDB(pMnode); + const char *pDb = mndGetStreamDB(pMnode); mndTransSetDbName(pTrans, pDb, "checkpoint"); - taosMemoryFree((void*)pDb); + taosMemoryFree((void *)pDb); if (mndTransCheckConflict(pMnode, pTrans) != 0) { mError("failed to trigger checkpoint, checkpointId: %" PRId64 ", reason:%s", checkpointId, @@ -2226,7 +2230,7 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) { return 0; } -static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecNode) { +static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode) { int32_t level = taosArrayGetSize(pStream->tasks); for (int32_t i = 0; i < level; i++) { SArray *pLevel = taosArrayGetP(pStream->tasks, i); @@ -2234,9 +2238,9 @@ static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* p int32_t numOfTasks = taosArrayGetSize(pLevel); for (int32_t j = 0; j < numOfTasks; j++) { SStreamTask *pTask = taosArrayGetP(pLevel, j); - int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId}; + int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId}; - void* p = taosHashGet(pExecNode->pTaskMap, keys, sizeof(keys)); + void *p = taosHashGet(pExecNode->pTaskMap, keys, sizeof(keys)); if (p == NULL) { STaskStatusEntry entry = { .streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .status = TASK_STATUS__STOP}; @@ -2250,21 +2254,23 @@ static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* p } // todo: this process should be executed by the write queue worker of the mnode - int32_t mndProcessStreamHb(SRpcMsg *pReq) { +int32_t mndProcessStreamHb(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamHbMsg req = {0}; int32_t code = TSDB_CODE_SUCCESS; - SDecoder decoder = {0}; + SDecoder decoder = {0}; tDecoderInit(&decoder, pReq->pCont, pReq->contLen); if (tDecodeStreamHbMsg(&decoder, &req) < 0) { + tDecoderClear(&decoder); terrno = TSDB_CODE_INVALID_MSG; return -1; } + tDecoderClear(&decoder); -// int64_t now = taosGetTimestampSec(); + // int64_t now = taosGetTimestampSec(); mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); taosThreadMutexLock(&execNodeList.lock); @@ -2273,12 +2279,12 @@ static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* p doExtractTasksFromStream(pMnode); } - for(int32_t i = 0; i < req.numOfTasks; ++i) { - STaskStatusEntry* p = taosArrayGet(req.pTaskStatus, i); - int64_t k[2] = {p->streamId, p->taskId}; - int32_t index = *(int32_t*) taosHashGet(execNodeList.pTaskMap, &k, sizeof(k)); + for (int32_t i = 0; i < req.numOfTasks; ++i) { + STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); + int64_t k[2] = {p->streamId, p->taskId}; + int32_t index = *(int32_t *)taosHashGet(execNodeList.pTaskMap, &k, sizeof(k)); - STaskStatusEntry* pStatusEntry = taosArrayGet(execNodeList.pTaskList, index); + STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, index); pStatusEntry->status = p->status; if (p->status != TASK_STATUS__NORMAL) { mDebug("received s-task:0x%x no in ready stat:%s", p->taskId, streamGetTaskStatusStr(p->status)); @@ -2286,64 +2292,66 @@ static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* p } taosThreadMutexUnlock(&execNodeList.lock); + taosArrayDestroy(req.pTaskStatus); + // bool nodeChanged = false; -// SArray* pList = taosArrayInit(4, sizeof(int32_t)); -/* - // record the timeout node - for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) { - SNodeEntry* pEntry = taosArrayGet(execNodeList.pNodeEntryList, i); - int64_t duration = now - pEntry->hbTimestamp; - if (duration > MND_STREAM_HB_INTERVAL) { // execNode timeout, try next - taosArrayPush(pList, &pEntry); - mWarn("nodeId:%d stream node timeout, since last hb:%"PRId64"s", pEntry->nodeId, duration); - continue; - } - - if (pEntry->nodeId != req.vgId) { - continue; - } - - pEntry->hbTimestamp = now; - - // check epset to identify whether the node has been transferred to other dnodes. - // node the epset is changed, which means the node transfer has occurred for this node. -// if (!isEpsetEqual(&pEntry->epset, &req.epset)) { -// nodeChanged = true; -// break; -// } - } - - // todo handle the node timeout case. Once the vnode is off-line, we should check the dnode status from mnode, - // to identify whether the dnode is truely offline or not. - - // handle the node changed case - if (!nodeChanged) { - return TSDB_CODE_SUCCESS; - } - - int32_t nodeId = req.vgId; - - {// check all streams that involved this vnode should update the epset info - SStreamObj *pStream = NULL; - void *pIter = NULL; - while (1) { - pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); - if (pIter == NULL) { - break; + // SArray* pList = taosArrayInit(4, sizeof(int32_t)); + /* + // record the timeout node + for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) { + SNodeEntry* pEntry = taosArrayGet(execNodeList.pNodeEntryList, i); + int64_t duration = now - pEntry->hbTimestamp; + if (duration > MND_STREAM_HB_INTERVAL) { // execNode timeout, try next + taosArrayPush(pList, &pEntry); + mWarn("nodeId:%d stream node timeout, since last hb:%"PRId64"s", pEntry->nodeId, duration); + continue; } - // update the related upstream and downstream tasks, todo remove this, no need this function - taosWLockLatch(&pStream->lock); -// streamTaskUpdateEpInfo(pStream->tasks, req.vgId, &req.epset); -// streamTaskUpdateEpInfo(pStream->pHTasksList, req.vgId, &req.epset); - taosWUnLockLatch(&pStream->lock); + if (pEntry->nodeId != req.vgId) { + continue; + } -// code = createStreamUpdateTrans(pMnode, pStream, nodeId, ); -// if (code != TSDB_CODE_SUCCESS) { -// todo -//// } -// } - } -*/ + pEntry->hbTimestamp = now; + + // check epset to identify whether the node has been transferred to other dnodes. + // node the epset is changed, which means the node transfer has occurred for this node. + // if (!isEpsetEqual(&pEntry->epset, &req.epset)) { + // nodeChanged = true; + // break; + // } + } + + // todo handle the node timeout case. Once the vnode is off-line, we should check the dnode status from mnode, + // to identify whether the dnode is truely offline or not. + + // handle the node changed case + if (!nodeChanged) { + return TSDB_CODE_SUCCESS; + } + + int32_t nodeId = req.vgId; + + {// check all streams that involved this vnode should update the epset info + SStreamObj *pStream = NULL; + void *pIter = NULL; + while (1) { + pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); + if (pIter == NULL) { + break; + } + + // update the related upstream and downstream tasks, todo remove this, no need this function + taosWLockLatch(&pStream->lock); + // streamTaskUpdateEpInfo(pStream->tasks, req.vgId, &req.epset); + // streamTaskUpdateEpInfo(pStream->pHTasksList, req.vgId, &req.epset); + taosWUnLockLatch(&pStream->lock); + + // code = createStreamUpdateTrans(pMnode, pStream, nodeId, ); + // if (code != TSDB_CODE_SUCCESS) { + // todo + //// } + // } + } + */ return TSDB_CODE_SUCCESS; } From 5b621fc0c57b0c91dc061adfbf2e6e73c5d60828 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 23 Aug 2023 16:32:32 +0800 Subject: [PATCH 2/3] fix mem leak --- source/dnode/mnode/impl/src/mndStream.c | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 53afcc0e3f..2591bf0f36 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -133,6 +133,7 @@ void mndCleanupStream(SMnode *pMnode) { taosArrayDestroy(execNodeList.pTaskList); taosHashCleanup(execNodeList.pTaskMap); taosThreadMutexDestroy(&execNodeList.lock); + mDebug("mnd stream cleanup"); } SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) { @@ -2282,9 +2283,13 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { for (int32_t i = 0; i < req.numOfTasks; ++i) { STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); int64_t k[2] = {p->streamId, p->taskId}; - int32_t index = *(int32_t *)taosHashGet(execNodeList.pTaskMap, &k, sizeof(k)); - STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, index); + int32_t *pIdx = (int32_t *)taosHashGet(execNodeList.pTaskMap, k, sizeof(k)); + if (pIdx == NULL) { + mDebug("s-task:0x%x not found in global execNodeList", p->taskId); + continue; + }; + STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, *pIdx); pStatusEntry->status = p->status; if (p->status != TASK_STATUS__NORMAL) { mDebug("received s-task:0x%x no in ready stat:%s", p->taskId, streamGetTaskStatusStr(p->status)); From 1ffc13d3911bce9bcd1523e6da5bbb7aee2fa6d0 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 23 Aug 2023 17:02:12 +0800 Subject: [PATCH 3/3] fix mem leak --- source/libs/stream/src/streamBackendRocksdb.c | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 94c859c669..90619cee44 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -790,14 +790,13 @@ int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t* for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) { if (wrapper->pHandle[i]) { rocksdb_column_family_handle_t* p = wrapper->pHandle[i]; - size_t len = 0; - char* name = rocksdb_column_family_handle_get_name(p, &len); - // char buf[64] = {0}; - // memcpy(buf, name, len); - // qError("column name: name: %s, len: %d", buf, (int)len); - // taosMemoryFree(name); - taosArrayPush(pHandle, &p); + // size_t len = 0; + // char* name = rocksdb_column_family_handle_get_name(p, &len); + // char buf[64] = {0}; + // memcpy(buf, name, len); + // qError("column name: name: %s, len: %d", buf, (int)len); + // taosMemoryFree(name); } } taosThreadRwlockUnlock(&wrapper->rwLock);