diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h
index 24222677a4..c2f7c6de2f 100644
--- a/include/libs/stream/streamState.h
+++ b/include/libs/stream/streamState.h
@@ -13,6 +13,9 @@
* along with this program. If not, see .
*/
+#ifndef _STREAM_STATE_H_
+#define _STREAM_STATE_H_
+
#include "tdatablock.h"
#include "rocksdb/c.h"
@@ -20,9 +23,6 @@
#include "tsimplehash.h"
#include "tstreamFileState.h"
-#ifndef _STREAM_STATE_H_
-#define _STREAM_STATE_H_
-
#ifdef __cplusplus
extern "C" {
#endif
diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h
index dce8fffe11..2135bb706b 100644
--- a/include/libs/stream/tstream.h
+++ b/include/libs/stream/tstream.h
@@ -13,6 +13,9 @@
* along with this program. If not, see .
*/
+#ifndef _STREAM_H_
+#define _STREAM_H_
+
#include "os.h"
#include "streamState.h"
#include "tdatablock.h"
@@ -26,9 +29,6 @@
extern "C" {
#endif
-#ifndef _STREAM_H_
-#define _STREAM_H_
-
#define ONE_MiB_F (1048576.0)
#define ONE_KiB_F (1024.0)
#define SIZE_IN_MiB(_v) ((_v) / ONE_MiB_F)
diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h
index b056d561c7..f2b94aa1d4 100644
--- a/source/dnode/mnode/impl/inc/mndDef.h
+++ b/source/dnode/mnode/impl/inc/mndDef.h
@@ -553,7 +553,7 @@ typedef struct {
} SMqConsumerObj;
SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]);
-void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer, bool delete);
+void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer, bool isDeleted);
int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer);
void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer, int8_t sver);
diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h
index 372612274f..4d1125a340 100644
--- a/source/dnode/mnode/impl/inc/mndStream.h
+++ b/source/dnode/mnode/impl/inc/mndStream.h
@@ -124,6 +124,7 @@ SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream);
void destroyStreamTaskIter(SStreamTaskIter *pIter);
bool streamTaskIterNextTask(SStreamTaskIter *pIter);
SStreamTask *streamTaskIterGetCurrent(SStreamTaskIter *pIter);
+void mndInitExecInfo();
#ifdef __cplusplus
}
diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c
index 18cecddbdb..d51a740b70 100644
--- a/source/dnode/mnode/impl/src/mndStream.c
+++ b/source/dnode/mnode/impl/src/mndStream.c
@@ -62,8 +62,6 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
-static void freeCheckpointCandEntry(void *);
-static void freeTaskList(void *param);
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream);
@@ -121,17 +119,7 @@ int32_t mndInitStream(SMnode *pMnode) {
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
- taosThreadMutexInit(&execInfo.lock, NULL);
- _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
-
- execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId));
- execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK);
- execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK);
- execInfo.transMgmt.pWaitingList = taosHashInit(32, fn, true, HASH_NO_LOCK);
- execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK);
-
- taosHashSetFreeFp(execInfo.transMgmt.pWaitingList, freeCheckpointCandEntry);
- taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList);
+ mndInitExecInfo();
if (sdbSetTable(pMnode->pSdb, table) != 0) {
return -1;
@@ -2118,16 +2106,6 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
}
-void freeCheckpointCandEntry(void *param) {
- SCheckpointCandEntry *pEntry = param;
- taosMemoryFreeClear(pEntry->pName);
-}
-
-void freeTaskList(void* param) {
- SArray** pList = (SArray **)param;
- taosArrayDestroy(*pList);
-}
-
static void doAddTaskId(SArray* pList, int32_t taskId, int64_t uid, int32_t numOfTotal) {
int32_t num = taosArrayGetSize(pList);
for(int32_t i = 0; i < num; ++i) {
@@ -2203,4 +2181,4 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
taosThreadMutexUnlock(&execInfo.lock);
return 0;
-}
\ No newline at end of file
+}
diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c
index 4426ab0672..49a931bba8 100644
--- a/source/dnode/mnode/impl/src/mndStreamHb.c
+++ b/source/dnode/mnode/impl/src/mndStreamHb.c
@@ -316,16 +316,20 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
// current checkpoint is failed, rollback from the checkpoint trans
// kill the checkpoint trans and then set all tasks status to be normal
if (taosArrayGetSize(pFailedTasks) > 0) {
- bool allReady = true;
- SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady);
- taosArrayDestroy(p);
+ bool allReady = true;
+ if (pMnode != NULL) {
+ SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady);
+ taosArrayDestroy(p);
+ } else {
+ allReady = false;
+ }
if (allReady || snodeChanged) {
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
for(int32_t i = 0; i < taosArrayGetSize(pFailedTasks); ++i) {
SFailedCheckpointInfo *pInfo = taosArrayGet(pFailedTasks, i);
mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status",
- pInfo->checkpointId, pInfo->transId);
+ pInfo->checkpointId, pInfo->transId);
mndResetStatusFromCheckpoint(pMnode, pInfo->streamUid, pInfo->transId);
}
diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c
index 235c604b27..3cabce2201 100644
--- a/source/dnode/mnode/impl/src/mndStreamUtil.c
+++ b/source/dnode/mnode/impl/src/mndStreamUtil.c
@@ -543,3 +543,27 @@ int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *
taosWUnLockLatch(&pStream->lock);
return 0;
}
+
+static void freeCheckpointCandEntry(void *param) {
+ SCheckpointCandEntry *pEntry = param;
+ taosMemoryFreeClear(pEntry->pName);
+}
+
+static void freeTaskList(void* param) {
+ SArray** pList = (SArray **)param;
+ taosArrayDestroy(*pList);
+}
+
+void mndInitExecInfo() {
+ taosThreadMutexInit(&execInfo.lock, NULL);
+ _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
+
+ execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId));
+ execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK);
+ execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK);
+ execInfo.transMgmt.pWaitingList = taosHashInit(32, fn, true, HASH_NO_LOCK);
+ execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK);
+
+ taosHashSetFreeFp(execInfo.transMgmt.pWaitingList, freeCheckpointCandEntry);
+ taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList);
+}
diff --git a/source/dnode/mnode/impl/test/CMakeLists.txt b/source/dnode/mnode/impl/test/CMakeLists.txt
index a002b20bde..bc5b5125f1 100644
--- a/source/dnode/mnode/impl/test/CMakeLists.txt
+++ b/source/dnode/mnode/impl/test/CMakeLists.txt
@@ -4,7 +4,7 @@ add_subdirectory(acct)
#add_subdirectory(db)
#add_subdirectory(dnode)
add_subdirectory(func)
-#add_subdirectory(mnode)
+add_subdirectory(stream)
add_subdirectory(profile)
add_subdirectory(qnode)
add_subdirectory(sdb)
diff --git a/source/dnode/mnode/impl/test/stream/CMakeLists.txt b/source/dnode/mnode/impl/test/stream/CMakeLists.txt
new file mode 100644
index 0000000000..b1bb62735f
--- /dev/null
+++ b/source/dnode/mnode/impl/test/stream/CMakeLists.txt
@@ -0,0 +1,13 @@
+SET(CMAKE_CXX_STANDARD 11)
+
+aux_source_directory(. MNODE_STREAM_TEST_SRC)
+add_executable(streamTest ${MNODE_STREAM_TEST_SRC})
+target_link_libraries(
+ streamTest
+ PRIVATE dnode gtest
+)
+
+add_test(
+ NAME streamTest
+ COMMAND streamTest
+)