From 46574c385fc486e042331a3ce5b521ee25d9c05b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 17 Aug 2023 18:52:33 +0800 Subject: [PATCH 1/3] fix mem leak --- source/dnode/mnode/impl/src/mndStream.c | 91 ++++++++++--------- source/dnode/mnode/sdb/src/sdbRaw.c | 2 +- source/libs/stream/src/streamBackendRocksdb.c | 7 +- 3 files changed, 54 insertions(+), 46 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index b296255995..add8a48953 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -26,8 +26,8 @@ #include "mndUser.h" #include "mndVgroup.h" #include "parser.h" -#include "tname.h" #include "tmisce.h" +#include "tname.h" #define MND_STREAM_VER_NUMBER 3 #define MND_STREAM_RESERVE_SIZE 64 @@ -35,14 +35,14 @@ typedef struct SNodeEntry { int32_t nodeId; - SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes. - int64_t hbTimestamp; // second + SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes. + int64_t hbTimestamp; // second } SNodeEntry; typedef struct SStreamVnodeRevertIndex { - SArray* pDBList; - SArray* pNodeEntryList; - int64_t ts; // snapshot ts + SArray *pDBList; + SArray *pNodeEntryList; + int64_t ts; // snapshot ts } SStreamVnodeRevertIndex; static int32_t mndNodeCheckSentinel = 0; @@ -70,8 +70,8 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in int64_t streamId, int32_t taskId); static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); -static int32_t mndPersistTransLog(SStreamObj* pStream, STrans* pTrans); -static void initTransAction(STransAction* pAction, void* pCont, int32_t contLen, int32_t msgType, const SEpSet* pEpset); +static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans); +static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset); int32_t mndInitStream(SMnode *pMnode) { SSdbTable table = { @@ -98,7 +98,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint); -// mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb); + // mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq); @@ -173,6 +173,7 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) { if (sver != MND_STREAM_VER_NUMBER) { terrno = 0; + mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER); goto STREAM_DECODE_OVER; } @@ -778,16 +779,16 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { } } -// pDb = mndAcquireDb(pMnode, streamObj.sourceDb); -// if (pDb->cfg.replications != 1) { -// mError("stream source db must have only 1 replica, but %s has %d", pDb->name, pDb->cfg.replications); -// terrno = TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB; -// mndReleaseDb(pMnode, pDb); -// pDb = NULL; -// goto _OVER; -// } + // pDb = mndAcquireDb(pMnode, streamObj.sourceDb); + // if (pDb->cfg.replications != 1) { + // mError("stream source db must have only 1 replica, but %s has %d", pDb->name, pDb->cfg.replications); + // terrno = TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB; + // mndReleaseDb(pMnode, pDb); + // pDb = NULL; + // goto _OVER; + // } -// mndReleaseDb(pMnode, pDb); + // mndReleaseDb(pMnode, pDb); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stream"); if (pTrans == NULL) { @@ -870,7 +871,7 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) { SRpcMsg rpcMsg = { .msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = sizeof(SMStreamDoCheckpointMsg)}; -// tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + // tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); return 0; } @@ -1051,7 +1052,7 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream } STransAction action = {0}; - SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); + SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); initTransAction(&action, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset); mndReleaseVgroup(pMnode, pVgObj); @@ -1168,6 +1169,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { sdbRelease(pMnode->pSdb, pStream); return -1; } + mInfo("trans:%d, used to drop stream:%s", pTrans->id, dropReq.name); mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); @@ -1176,6 +1178,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { mndTransDrop(pTrans); return -1; } + //mndTransSetSerial(pTrans); // drop all tasks if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { @@ -1742,11 +1745,12 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { } typedef struct SVgroupChangeInfo { - SHashObj* pDBMap; - SArray* pUpdateNodeList; //SArray + SHashObj *pDBMap; + SArray *pUpdateNodeList; // SArray } SVgroupChangeInfo; -static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg* pMsg, const SVgroupChangeInfo* pInfo, int64_t streamId, int32_t taskId) { +static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, int64_t streamId, + int32_t taskId) { pMsg->streamId = streamId; pMsg->taskId = taskId; pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo)); @@ -1792,7 +1796,7 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha return TSDB_CODE_SUCCESS; } -int32_t mndPersistTransLog(SStreamObj* pStream, STrans* pTrans) { +int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans) { SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); if (pCommitRaw == NULL) { mError("failed to encode stream since %s", terrstr()); @@ -1816,7 +1820,7 @@ int32_t mndPersistTransLog(SStreamObj* pStream, STrans* pTrans) { return 0; } -void initTransAction(STransAction* pAction, void* pCont, int32_t contLen, int32_t msgType, const SEpSet* pEpset) { +void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset) { pAction->epSet = *pEpset; pAction->contLen = contLen; pAction->pCont = pCont; @@ -1825,7 +1829,7 @@ void initTransAction(STransAction* pAction, void* pCont, int32_t contLen, int32_ // todo extract method: traverse stream tasks // build trans to update the epset -static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo* pInfo) { +static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo) { STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-task-update"); if (pTrans == NULL) { mError("failed to build stream task DAG update, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); @@ -1889,11 +1893,11 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgr return TSDB_CODE_ACTION_IN_PROGRESS; } -static bool isNodeEpsetChanged(const SEpSet* pPrevEpset, const SEpSet* pCurrent) { - const SEp* pEp = GET_ACTIVE_EP(pPrevEpset); +static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) { + const SEp *pEp = GET_ACTIVE_EP(pPrevEpset); - for(int32_t i = 0; i < pCurrent->numOfEps; ++i) { - const SEp* p = &(pCurrent->eps[i]); + for (int32_t i = 0; i < pCurrent->numOfEps; ++i) { + const SEp *p = &(pCurrent->eps[i]); if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) { return false; } @@ -1948,12 +1952,12 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP return info; } -static SArray* mndTakeVgroupSnapshot(SMnode* pMnode) { +static SArray *mndTakeVgroupSnapshot(SMnode *pMnode) { SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; SVgObj *pVgroup = NULL; - SArray* pVgroupListSnapshot = taosArrayInit(4, sizeof(SNodeEntry)); + SArray *pVgroupListSnapshot = taosArrayInit(4, sizeof(SNodeEntry)); while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); @@ -1973,7 +1977,7 @@ static SArray* mndTakeVgroupSnapshot(SMnode* pMnode) { return pVgroupListSnapshot; } -int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo* pChangeInfo) { +int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) { SSdb *pSdb = pMnode->pSdb; // check all streams that involved this vnode should update the epset info @@ -1985,14 +1989,14 @@ int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo* pChangeInfo) { break; } - void* p = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb)); - void* p1 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb)); + void *p = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb)); + void *p1 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb)); if (p == NULL && p1 == NULL) { mndReleaseStream(pMnode, pStream); continue; } - mDebug("stream:0x%"PRIx64" involved node changed, create update trans", pStream->uid); + mDebug("stream:0x%" PRIx64 " involved node changed, create update trans", pStream->uid); int32_t code = createStreamUpdateTrans(pMnode, pStream, pChangeInfo); if (code != TSDB_CODE_SUCCESS) { // todo @@ -2002,12 +2006,12 @@ int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo* pChangeInfo) { return 0; } -static SArray* doExtractNodeListFromStream(SMnode *pMnode) { +static SArray *doExtractNodeListFromStream(SMnode *pMnode) { SSdb *pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; void *pIter = NULL; - SHashObj* pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); while (1) { pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); if (pIter == NULL) { @@ -2023,7 +2027,7 @@ static SArray* doExtractNodeListFromStream(SMnode *pMnode) { int32_t numOfTasks = taosArrayGetSize(pLevel); for (int32_t k = 0; k < numOfTasks; ++k) { SStreamTask *pTask = taosArrayGetP(pLevel, k); - SNodeEntry entry = {0}; + SNodeEntry entry = {0}; epsetAssign(&entry.epset, &pTask->info.epSet); entry.nodeId = pTask->info.nodeId; entry.hbTimestamp = -1; @@ -2033,14 +2037,15 @@ static SArray* doExtractNodeListFromStream(SMnode *pMnode) { } taosWUnLockLatch(&pStream->lock); + sdbRelease(pSdb, pStream); } - SArray* plist = taosArrayInit(taosHashGetSize(pHash), sizeof(SNodeEntry)); + SArray *plist = taosArrayInit(taosHashGetSize(pHash), sizeof(SNodeEntry)); // convert to list pIter = NULL; - while((pIter = taosHashIterate(pHash, pIter)) != NULL) { - SNodeEntry* pEntry = (SNodeEntry*) pIter; + while ((pIter = taosHashIterate(pHash, pIter)) != NULL) { + SNodeEntry *pEntry = (SNodeEntry *)pIter; taosArrayPush(plist, pEntry); } taosHashCleanup(pHash); @@ -2094,7 +2099,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { } // todo: this process should be executed by the write queue worker of the mnode -//int32_t mndProcessStreamHb(SRpcMsg *pReq) { +// int32_t mndProcessStreamHb(SRpcMsg *pReq) { // SMnode *pMnode = pReq->info.node; // SSdb *pSdb = pMnode->pSdb; // SStreamHbMsg req = {0}; diff --git a/source/dnode/mnode/sdb/src/sdbRaw.c b/source/dnode/mnode/sdb/src/sdbRaw.c index 3a16ee3f13..244e50b52e 100644 --- a/source/dnode/mnode/sdb/src/sdbRaw.c +++ b/source/dnode/mnode/sdb/src/sdbRaw.c @@ -46,7 +46,7 @@ SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen) { void sdbFreeRaw(SSdbRaw *pRaw) { if (pRaw != NULL) { #if 1 - mTrace("raw:%p, is freed", pRaw); + mTrace("raw:%p, is freed, len:%d, table:%s", pRaw, pRaw->dataLen, sdbTableName(pRaw->type)); #endif taosMemoryFree(pRaw); } diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 920ec8a254..a760968339 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1412,7 +1412,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t } int streamStateOpenBackend(void* backend, SStreamState* pState) { qInfo("start to open state %p on backend %p 0x%" PRIx64 "-%d", pState, backend, pState->streamId, pState->taskId); - // taosAcquireRef(streamBackendId, pState->streamBackendRid); + taosAcquireRef(streamBackendId, pState->streamBackendRid); SBackendWrapper* handle = backend; SBackendCfWrapper* pBackendCfWrapper = taosMemoryCalloc(1, sizeof(SBackendCfWrapper)); @@ -1495,6 +1495,9 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { void streamStateCloseBackend(SStreamState* pState, bool remove) { SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; SBackendWrapper* pHandle = wrapper->pBackend; + + qInfo("start to close state on backend: %p", pHandle); + taosThreadMutexLock(&pHandle->cfMutex); RocksdbCfInst** ppInst = taosHashGet(pHandle->cfInst, wrapper->idstr, strlen(pState->pTdbState->idstr) + 1); if (ppInst != NULL && *ppInst != NULL) { @@ -1505,7 +1508,7 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) { taosThreadMutexUnlock(&pHandle->cfMutex); char* status[] = {"close", "drop"}; - qInfo("start to close %s state %p on backendWrapper %p %s", status[remove == false ? 0 : 1], pState, wrapper, + qInfo("start to %s state %p on backendWrapper %p %s", status[remove == false ? 0 : 1], pState, wrapper, wrapper->idstr); wrapper->remove |= remove; // update by other pState taosReleaseRef(streamBackendCfWrapperId, pState->pTdbState->backendCfWrapperId); From 002d5979c119192af50f64f722fb707d0488ad4a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 17 Aug 2023 21:13:03 +0800 Subject: [PATCH 2/3] add trace log --- source/libs/stream/src/streamBackendRocksdb.c | 2 +- source/libs/stream/src/streamState.c | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index a760968339..6ab39ce81b 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1411,7 +1411,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t return 0; } int streamStateOpenBackend(void* backend, SStreamState* pState) { - qInfo("start to open state %p on backend %p 0x%" PRIx64 "-%d", pState, backend, pState->streamId, pState->taskId); + // qInfo("start to open state %p on backend %p 0x%" PRIx64 "-%d", pState, backend, pState->streamId, pState->taskId); taosAcquireRef(streamBackendId, pState->streamBackendRid); SBackendWrapper* handle = backend; SBackendCfWrapper* pBackendCfWrapper = taosMemoryCalloc(1, sizeof(SBackendCfWrapper)); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index b1ed36de18..26904ebbfb 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -138,7 +138,8 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz int64_t id = *(int64_t*)uniqueId; pState->pTdbState->backendCfWrapperId = id; pState->pTdbState->pBackendCfWrapper = taosAcquireRef(streamBackendCfWrapperId, id); - + // already exist stream task for + qInfo("already exist stream state for %s", pState->pTdbState->idstr); taosAcquireRef(streamBackendId, pState->streamBackendRid); } taosThreadMutexUnlock(&pMeta->backendMutex); @@ -148,6 +149,8 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT); pState->parNameMap = tSimpleHashInit(1024, hashFn); + qInfo("succ to open state %p on backend %p 0x%" PRIx64 "-%d", pState, pMeta->streamBackend, pState->streamId, + pState->taskId); return pState; #else From 80f6d092a1c2b88fe8b162dee4b664f3bb90810e Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 18 Aug 2023 08:43:37 +0800 Subject: [PATCH 3/3] add trace log --- source/dnode/mnode/impl/src/mndStream.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index ed20492fff..6add3c6a9e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -40,7 +40,6 @@ typedef struct SNodeEntry { } SNodeEntry; typedef struct SStreamVnodeRevertIndex { - SArray *pDBList; SArray *pNodeEntryList; int64_t ts; // snapshot ts } SStreamVnodeRevertIndex;