From 739485a41970ba5e88eed18f00a13b6b61250e27 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 5 Feb 2024 09:10:04 +0800 Subject: [PATCH] test(stream): add unit test case for the mnode stream module. --- include/libs/stream/streamState.h | 6 ++--- include/libs/stream/tstream.h | 6 ++--- source/dnode/mnode/impl/inc/mndDef.h | 2 +- source/dnode/mnode/impl/inc/mndStream.h | 1 + source/dnode/mnode/impl/src/mndStream.c | 26 ++----------------- source/dnode/mnode/impl/src/mndStreamHb.c | 12 ++++++--- source/dnode/mnode/impl/src/mndStreamUtil.c | 24 +++++++++++++++++ source/dnode/mnode/impl/test/CMakeLists.txt | 2 +- .../mnode/impl/test/stream/CMakeLists.txt | 13 ++++++++++ 9 files changed, 56 insertions(+), 36 deletions(-) create mode 100644 source/dnode/mnode/impl/test/stream/CMakeLists.txt diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 24222677a4..c2f7c6de2f 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -13,6 +13,9 @@ * along with this program. If not, see . */ +#ifndef _STREAM_STATE_H_ +#define _STREAM_STATE_H_ + #include "tdatablock.h" #include "rocksdb/c.h" @@ -20,9 +23,6 @@ #include "tsimplehash.h" #include "tstreamFileState.h" -#ifndef _STREAM_STATE_H_ -#define _STREAM_STATE_H_ - #ifdef __cplusplus extern "C" { #endif diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index dce8fffe11..2135bb706b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -13,6 +13,9 @@ * along with this program. If not, see . */ +#ifndef _STREAM_H_ +#define _STREAM_H_ + #include "os.h" #include "streamState.h" #include "tdatablock.h" @@ -26,9 +29,6 @@ extern "C" { #endif -#ifndef _STREAM_H_ -#define _STREAM_H_ - #define ONE_MiB_F (1048576.0) #define ONE_KiB_F (1024.0) #define SIZE_IN_MiB(_v) ((_v) / ONE_MiB_F) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index b056d561c7..f2b94aa1d4 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -553,7 +553,7 @@ typedef struct { } SMqConsumerObj; SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]); -void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer, bool delete); +void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer, bool isDeleted); int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer); void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer, int8_t sver); diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 372612274f..4d1125a340 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -124,6 +124,7 @@ SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream); void destroyStreamTaskIter(SStreamTaskIter *pIter); bool streamTaskIterNextTask(SStreamTaskIter *pIter); SStreamTask *streamTaskIterGetCurrent(SStreamTaskIter *pIter); +void mndInitExecInfo(); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 18cecddbdb..d51a740b70 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -62,8 +62,6 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot); static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len); -static void freeCheckpointCandEntry(void *); -static void freeTaskList(void *param); static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream); @@ -121,17 +119,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask); - taosThreadMutexInit(&execInfo.lock, NULL); - _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR); - - execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId)); - execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK); - execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK); - execInfo.transMgmt.pWaitingList = taosHashInit(32, fn, true, HASH_NO_LOCK); - execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK); - - taosHashSetFreeFp(execInfo.transMgmt.pWaitingList, freeCheckpointCandEntry); - taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList); + mndInitExecInfo(); if (sdbSetTable(pMnode->pSdb, table) != 0) { return -1; @@ -2118,16 +2106,6 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList)); } -void freeCheckpointCandEntry(void *param) { - SCheckpointCandEntry *pEntry = param; - taosMemoryFreeClear(pEntry->pName); -} - -void freeTaskList(void* param) { - SArray** pList = (SArray **)param; - taosArrayDestroy(*pList); -} - static void doAddTaskId(SArray* pList, int32_t taskId, int64_t uid, int32_t numOfTotal) { int32_t num = taosArrayGetSize(pList); for(int32_t i = 0; i < num; ++i) { @@ -2203,4 +2181,4 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { taosThreadMutexUnlock(&execInfo.lock); return 0; -} \ No newline at end of file +} diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 4426ab0672..49a931bba8 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -316,16 +316,20 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { // current checkpoint is failed, rollback from the checkpoint trans // kill the checkpoint trans and then set all tasks status to be normal if (taosArrayGetSize(pFailedTasks) > 0) { - bool allReady = true; - SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady); - taosArrayDestroy(p); + bool allReady = true; + if (pMnode != NULL) { + SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady); + taosArrayDestroy(p); + } else { + allReady = false; + } if (allReady || snodeChanged) { // if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal for(int32_t i = 0; i < taosArrayGetSize(pFailedTasks); ++i) { SFailedCheckpointInfo *pInfo = taosArrayGet(pFailedTasks, i); mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status", - pInfo->checkpointId, pInfo->transId); + pInfo->checkpointId, pInfo->transId); mndResetStatusFromCheckpoint(pMnode, pInfo->streamUid, pInfo->transId); } diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 235c604b27..3cabce2201 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -543,3 +543,27 @@ int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj * taosWUnLockLatch(&pStream->lock); return 0; } + +static void freeCheckpointCandEntry(void *param) { + SCheckpointCandEntry *pEntry = param; + taosMemoryFreeClear(pEntry->pName); +} + +static void freeTaskList(void* param) { + SArray** pList = (SArray **)param; + taosArrayDestroy(*pList); +} + +void mndInitExecInfo() { + taosThreadMutexInit(&execInfo.lock, NULL); + _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR); + + execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId)); + execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK); + execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK); + execInfo.transMgmt.pWaitingList = taosHashInit(32, fn, true, HASH_NO_LOCK); + execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK); + + taosHashSetFreeFp(execInfo.transMgmt.pWaitingList, freeCheckpointCandEntry); + taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList); +} diff --git a/source/dnode/mnode/impl/test/CMakeLists.txt b/source/dnode/mnode/impl/test/CMakeLists.txt index a002b20bde..bc5b5125f1 100644 --- a/source/dnode/mnode/impl/test/CMakeLists.txt +++ b/source/dnode/mnode/impl/test/CMakeLists.txt @@ -4,7 +4,7 @@ add_subdirectory(acct) #add_subdirectory(db) #add_subdirectory(dnode) add_subdirectory(func) -#add_subdirectory(mnode) +add_subdirectory(stream) add_subdirectory(profile) add_subdirectory(qnode) add_subdirectory(sdb) diff --git a/source/dnode/mnode/impl/test/stream/CMakeLists.txt b/source/dnode/mnode/impl/test/stream/CMakeLists.txt new file mode 100644 index 0000000000..b1bb62735f --- /dev/null +++ b/source/dnode/mnode/impl/test/stream/CMakeLists.txt @@ -0,0 +1,13 @@ +SET(CMAKE_CXX_STANDARD 11) + +aux_source_directory(. MNODE_STREAM_TEST_SRC) +add_executable(streamTest ${MNODE_STREAM_TEST_SRC}) +target_link_libraries( + streamTest + PRIVATE dnode gtest +) + +add_test( + NAME streamTest + COMMAND streamTest +)