From 23ae62d268a01993a476f0b8910a53a8a1c369bc Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 24 Nov 2023 11:10:39 +0800 Subject: [PATCH] Merge branch '3.0' into enh/refactorBackend --- source/dnode/mnode/impl/src/mndStream.c | 73 +++++++++---------- source/libs/stream/src/streamBackendRocksdb.c | 2 +- 2 files changed, 37 insertions(+), 38 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index c92dca099b..602b3035b9 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -27,9 +27,9 @@ #include "tmisce.h" #include "tname.h" -#define MND_STREAM_VER_NUMBER 4 -#define MND_STREAM_RESERVE_SIZE 64 -#define MND_STREAM_MAX_NUM 60 +#define MND_STREAM_VER_NUMBER 4 +#define MND_STREAM_RESERVE_SIZE 64 +#define MND_STREAM_MAX_NUM 60 #define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint" #define MND_STREAM_PAUSE_NAME "stream-pause" @@ -77,7 +77,7 @@ static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); -static STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char* pMsg); +static STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg); static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans); static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, int32_t retryCode); @@ -85,9 +85,9 @@ static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *p static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); static void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode); static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot); -static int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDbName, size_t len); +static int32_t killActiveCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len); static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList); -static void freeCheckpointCandEntry(void*); +static void freeCheckpointCandEntry(void *); static SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); @@ -168,9 +168,8 @@ void mndCleanupStream(SMnode *pMnode) { taosArrayDestroy(execInfo.pTaskList); taosHashCleanup(execInfo.pTaskMap); taosHashCleanup(execInfo.transMgmt.pDBTrans); - taosThreadMutexDestroy(&execInfo.lock); - taosHashCleanup(execInfo.transMgmt.pDBTrans); taosHashCleanup(execInfo.transMgmt.pWaitingList); + taosThreadMutexDestroy(&execInfo.lock); mDebug("mnd stream exec info cleanup"); } @@ -333,11 +332,11 @@ static void mndShowStreamStatus(char *dst, SStreamObj *pStream) { } } -SSdbRaw * mndStreamSeqActionEncode(SStreamObj *pStream) { return NULL; } -SSdbRow * mndStreamSeqActionDecode(SSdbRaw *pRaw) { return NULL; } -int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; } -int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; } -int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; } +SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream) { return NULL; } +SSdbRow *mndStreamSeqActionDecode(SSdbRaw *pRaw) { return NULL; } +int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; } +int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; } +int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; } static void mndShowStreamTrigger(char *dst, SStreamObj *pStream) { int8_t trigger = pStream->conf.trigger; @@ -761,7 +760,7 @@ int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { // check for number of existed tasks int32_t numOfStream = 0; SStreamObj *pStream = NULL; - void *pIter = NULL; + void * pIter = NULL; while ((pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) { if (pStream->sourceDbUid == pStreamObj->sourceDbUid) { @@ -1087,7 +1086,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre } STransAction act = {0}; - SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); + SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); mndReleaseVgroup(pMnode, pVgObj); initTransAction(&act, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY); @@ -1265,7 +1264,7 @@ static int32_t mndCheckNodeStatus(SMnode *pMnode) { } for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { - STaskId *p = taosArrayGet(execInfo.pTaskList, i); + STaskId * p = taosArrayGet(execInfo.pTaskList, i); STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p)); if (pEntry == NULL) { continue; @@ -1280,13 +1279,13 @@ static int32_t mndCheckNodeStatus(SMnode *pMnode) { } taosThreadMutexUnlock(&execInfo.lock); - return ready? 0:-1; + return ready ? 0 : -1; } static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; + SMnode * pMnode = pReq->info.node; + SSdb * pSdb = pMnode->pSdb; + void * pIter = NULL; SStreamObj *pStream = NULL; int32_t code = 0; @@ -1308,7 +1307,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { static int32_t mndProcessStreamCheckpointInCandid(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; - void *pIter = NULL; + void * pIter = NULL; int32_t code = 0; taosThreadMutexLock(&execInfo.lock); @@ -1327,7 +1326,7 @@ static int32_t mndProcessStreamCheckpointInCandid(SRpcMsg *pReq) { SCheckpointCandEntry *pEntry = pIter; SStreamObj *ps = mndAcquireStream(pMnode, pEntry->pName); - mDebug("start to launch checkpoint for stream:%s %"PRIx64" in candidate list", pEntry->pName, pEntry->streamId); + mDebug("start to launch checkpoint for stream:%s %" PRIx64 " in candidate list", pEntry->pName, pEntry->streamId); code = mndProcessStreamCheckpointTrans(pMnode, ps, pEntry->checkpointId); mndReleaseStream(pMnode, ps); @@ -2497,13 +2496,13 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) { } // kill all trans in the dst DB -static void killAllCheckpointTrans(SMnode* pMnode, SVgroupChangeInfo* pChangeInfo) { - void* pIter = NULL; - while((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) { - char* pDb = (char*) pIter; +static void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) { + void *pIter = NULL; + while ((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) { + char *pDb = (char *)pIter; size_t len = 0; - void* pKey = taosHashGetKey(pDb, &len); + void * pKey = taosHashGetKey(pDb, &len); killActiveCheckpointTrans(pMnode, pKey, len); } } @@ -2652,7 +2651,7 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList)); } -STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char* pMsg) { +STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg) { STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, name); if (pTrans == NULL) { mError("failed to build trans:%s, reason: %s", name, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); @@ -2660,7 +2659,7 @@ STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const return NULL; } - mDebug("s-task:0x%"PRIx64" start to build trans %s", pStream->uid, pMsg); + mDebug("s-task:0x%" PRIx64 " start to build trans %s", pStream->uid, pMsg); mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); if (mndTransCheckConflict(pMnode, pTrans) != 0) { @@ -2740,9 +2739,9 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { return TSDB_CODE_ACTION_IN_PROGRESS; } -int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDBName, size_t len) { +int32_t killActiveCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) { // data in the hash table will be removed automatically, no need to remove it here. - SStreamTransInfo* pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, len); + SStreamTransInfo *pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, len); if (pTransInfo == NULL) { return TSDB_CODE_SUCCESS; } @@ -2753,7 +2752,7 @@ int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDBName, size_t le return TSDB_CODE_SUCCESS; } - STrans* pTrans = mndAcquireTrans(pMnode, pTransInfo->transId); + STrans *pTrans = mndAcquireTrans(pMnode, pTransInfo->transId); if (pTrans != NULL) { mInfo("kill checkpoint transId:%d in Db:%s", pTransInfo->transId, pDBName); mndKillTrans(pMnode, pTrans); @@ -2764,7 +2763,7 @@ int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDBName, size_t le } int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) { - STrans* pTrans = mndAcquireTrans(pMnode, transId); + STrans *pTrans = mndAcquireTrans(pMnode, transId); if (pTrans != NULL) { mInfo("kill checkpoint transId:%d to reset task status", transId); mndKillTrans(pMnode, pTrans); @@ -2783,8 +2782,8 @@ int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) { bool conflict = mndStreamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb); if (conflict) { - mError("stream:%s other trans exists in DB:%s & %s failed to start reset-status trans", - pStream->name, pStream->sourceDb, pStream->targetDb); + mError("stream:%s other trans exists in DB:%s & %s failed to start reset-status trans", pStream->name, + pStream->sourceDb, pStream->targetDb); continue; } @@ -2928,7 +2927,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { return TSDB_CODE_SUCCESS; } -void freeCheckpointCandEntry(void* param) { - SCheckpointCandEntry* pEntry = param; +void freeCheckpointCandEntry(void *param) { + SCheckpointCandEntry *pEntry = param; taosMemoryFreeClear(pEntry->pName); } diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index f07ef71003..dc61bd25f2 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -190,7 +190,7 @@ int32_t getCfIdx(const char* cfName) { } bool isValidCheckpoint(const char* dir) { - return true; + // return true; STaskDbWrapper* pDb = taskDbOpenImpl(NULL, NULL, (char*)dir); if (pDb == NULL) { return true;