diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h
index 24222677a4..c2f7c6de2f 100644
--- a/include/libs/stream/streamState.h
+++ b/include/libs/stream/streamState.h
@@ -13,6 +13,9 @@
* along with this program. If not, see .
*/
+#ifndef _STREAM_STATE_H_
+#define _STREAM_STATE_H_
+
#include "tdatablock.h"
#include "rocksdb/c.h"
@@ -20,9 +23,6 @@
#include "tsimplehash.h"
#include "tstreamFileState.h"
-#ifndef _STREAM_STATE_H_
-#define _STREAM_STATE_H_
-
#ifdef __cplusplus
extern "C" {
#endif
diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h
index dce8fffe11..2135bb706b 100644
--- a/include/libs/stream/tstream.h
+++ b/include/libs/stream/tstream.h
@@ -13,6 +13,9 @@
* along with this program. If not, see .
*/
+#ifndef _STREAM_H_
+#define _STREAM_H_
+
#include "os.h"
#include "streamState.h"
#include "tdatablock.h"
@@ -26,9 +29,6 @@
extern "C" {
#endif
-#ifndef _STREAM_H_
-#define _STREAM_H_
-
#define ONE_MiB_F (1048576.0)
#define ONE_KiB_F (1024.0)
#define SIZE_IN_MiB(_v) ((_v) / ONE_MiB_F)
diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h
index bedd72759d..554205decb 100644
--- a/source/dnode/mnode/impl/inc/mndDef.h
+++ b/source/dnode/mnode/impl/inc/mndDef.h
@@ -556,7 +556,7 @@ typedef struct {
} SMqConsumerObj;
SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]);
-void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer, bool delete);
+void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer, bool isDeleted);
int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer);
void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer, int8_t sver);
diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h
index 372612274f..4d1125a340 100644
--- a/source/dnode/mnode/impl/inc/mndStream.h
+++ b/source/dnode/mnode/impl/inc/mndStream.h
@@ -124,6 +124,7 @@ SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream);
void destroyStreamTaskIter(SStreamTaskIter *pIter);
bool streamTaskIterNextTask(SStreamTaskIter *pIter);
SStreamTask *streamTaskIterGetCurrent(SStreamTaskIter *pIter);
+void mndInitExecInfo();
#ifdef __cplusplus
}
diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c
index 47b26fba24..0a78914011 100644
--- a/source/dnode/mnode/impl/src/mndStream.c
+++ b/source/dnode/mnode/impl/src/mndStream.c
@@ -62,8 +62,6 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
-static void freeCheckpointCandEntry(void *);
-static void freeTaskList(void *param);
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream);
@@ -121,17 +119,7 @@ int32_t mndInitStream(SMnode *pMnode) {
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
- taosThreadMutexInit(&execInfo.lock, NULL);
- _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
-
- execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId));
- execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK);
- execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK);
- execInfo.transMgmt.pWaitingList = taosHashInit(32, fn, true, HASH_NO_LOCK);
- execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK);
-
- taosHashSetFreeFp(execInfo.transMgmt.pWaitingList, freeCheckpointCandEntry);
- taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList);
+ mndInitExecInfo();
if (sdbSetTable(pMnode->pSdb, table) != 0) {
return -1;
@@ -2117,16 +2105,6 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
}
-void freeCheckpointCandEntry(void *param) {
- SCheckpointCandEntry *pEntry = param;
- taosMemoryFreeClear(pEntry->pName);
-}
-
-void freeTaskList(void* param) {
- SArray** pList = (SArray **)param;
- taosArrayDestroy(*pList);
-}
-
static void doAddTaskId(SArray* pList, int32_t taskId, int64_t uid, int32_t numOfTotal) {
int32_t num = taosArrayGetSize(pList);
for(int32_t i = 0; i < num; ++i) {
@@ -2202,4 +2180,4 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
taosThreadMutexUnlock(&execInfo.lock);
return 0;
-}
\ No newline at end of file
+}
diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c
index 005caea31b..97474fa851 100644
--- a/source/dnode/mnode/impl/src/mndStreamHb.c
+++ b/source/dnode/mnode/impl/src/mndStreamHb.c
@@ -316,16 +316,20 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
// current checkpoint is failed, rollback from the checkpoint trans
// kill the checkpoint trans and then set all tasks status to be normal
if (taosArrayGetSize(pFailedTasks) > 0) {
- bool allReady = true;
- SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady);
- taosArrayDestroy(p);
+ bool allReady = true;
+ if (pMnode != NULL) {
+ SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady);
+ taosArrayDestroy(p);
+ } else {
+ allReady = false;
+ }
if (allReady || snodeChanged) {
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
for(int32_t i = 0; i < taosArrayGetSize(pFailedTasks); ++i) {
SFailedCheckpointInfo *pInfo = taosArrayGet(pFailedTasks, i);
mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status",
- pInfo->checkpointId, pInfo->transId);
+ pInfo->checkpointId, pInfo->transId);
mndResetStatusFromCheckpoint(pMnode, pInfo->streamUid, pInfo->transId);
}
diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c
index 235c604b27..3cabce2201 100644
--- a/source/dnode/mnode/impl/src/mndStreamUtil.c
+++ b/source/dnode/mnode/impl/src/mndStreamUtil.c
@@ -543,3 +543,27 @@ int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *
taosWUnLockLatch(&pStream->lock);
return 0;
}
+
+static void freeCheckpointCandEntry(void *param) {
+ SCheckpointCandEntry *pEntry = param;
+ taosMemoryFreeClear(pEntry->pName);
+}
+
+static void freeTaskList(void* param) {
+ SArray** pList = (SArray **)param;
+ taosArrayDestroy(*pList);
+}
+
+void mndInitExecInfo() {
+ taosThreadMutexInit(&execInfo.lock, NULL);
+ _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
+
+ execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId));
+ execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK);
+ execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK);
+ execInfo.transMgmt.pWaitingList = taosHashInit(32, fn, true, HASH_NO_LOCK);
+ execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK);
+
+ taosHashSetFreeFp(execInfo.transMgmt.pWaitingList, freeCheckpointCandEntry);
+ taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList);
+}
diff --git a/source/dnode/mnode/impl/test/CMakeLists.txt b/source/dnode/mnode/impl/test/CMakeLists.txt
index a002b20bde..bc5b5125f1 100644
--- a/source/dnode/mnode/impl/test/CMakeLists.txt
+++ b/source/dnode/mnode/impl/test/CMakeLists.txt
@@ -4,7 +4,7 @@ add_subdirectory(acct)
#add_subdirectory(db)
#add_subdirectory(dnode)
add_subdirectory(func)
-#add_subdirectory(mnode)
+add_subdirectory(stream)
add_subdirectory(profile)
add_subdirectory(qnode)
add_subdirectory(sdb)
diff --git a/source/dnode/mnode/impl/test/stream/CMakeLists.txt b/source/dnode/mnode/impl/test/stream/CMakeLists.txt
new file mode 100644
index 0000000000..b1bb62735f
--- /dev/null
+++ b/source/dnode/mnode/impl/test/stream/CMakeLists.txt
@@ -0,0 +1,13 @@
+SET(CMAKE_CXX_STANDARD 11)
+
+aux_source_directory(. MNODE_STREAM_TEST_SRC)
+add_executable(streamTest ${MNODE_STREAM_TEST_SRC})
+target_link_libraries(
+ streamTest
+ PRIVATE dnode gtest
+)
+
+add_test(
+ NAME streamTest
+ COMMAND streamTest
+)
diff --git a/source/dnode/mnode/impl/test/stream/stream.cpp b/source/dnode/mnode/impl/test/stream/stream.cpp
new file mode 100644
index 0000000000..a3babad80c
--- /dev/null
+++ b/source/dnode/mnode/impl/test/stream/stream.cpp
@@ -0,0 +1,155 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#include
+#include
+
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wwrite-strings"
+#pragma GCC diagnostic ignored "-Wunused-function"
+#pragma GCC diagnostic ignored "-Wunused-variable"
+#pragma GCC diagnostic ignored "-Wsign-compare"
+
+#include
+#include
+#include "../../inc/mndStream.h"
+
+namespace {
+SRpcMsg buildHbReq() {
+ SStreamHbMsg msg = {0};
+ msg.vgId = 1;
+ msg.numOfTasks = 5;
+ msg.pTaskStatus = taosArrayInit(4, sizeof(STaskStatusEntry));
+
+ for (int32_t i = 0; i < 4; ++i) {
+ STaskStatusEntry entry = {0};
+ entry.nodeId = i + 1;
+ entry.stage = 1;
+ entry.id.taskId = i + 1;
+ entry.id.streamId = 999;
+
+ if (i == 0) {
+ entry.stage = 4;
+ }
+
+ taosArrayPush(msg.pTaskStatus, &entry);
+ }
+
+ // (p->checkpointId != 0) && p->checkpointFailed
+ // add failed checkpoint info
+ {
+ STaskStatusEntry entry = {0};
+ entry.nodeId = 5;
+ entry.stage = 1;
+
+ entry.id.taskId = 5;
+ entry.id.streamId = 999;
+
+ entry.checkpointId = 1;
+ entry.checkpointFailed = true;
+
+ taosArrayPush(msg.pTaskStatus, &entry);
+ }
+
+ int32_t tlen = 0;
+ int32_t code = 0;
+ SEncoder encoder;
+ void* buf = NULL;
+ SRpcMsg msg1 = {0};
+ msg1.info.noResp = 1;
+
+ tEncodeSize(tEncodeStreamHbMsg, &msg, tlen, code);
+ if (code < 0) {
+ goto _end;
+ }
+
+ buf = rpcMallocCont(tlen);
+ if (buf == NULL) {
+ goto _end;
+ }
+
+ tEncoderInit(&encoder, (uint8_t*)buf, tlen);
+ if ((code = tEncodeStreamHbMsg(&encoder, &msg)) < 0) {
+ rpcFreeCont(buf);
+ goto _end;
+ }
+ tEncoderClear(&encoder);
+
+ initRpcMsg(&msg1, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
+
+ taosArrayDestroy(msg.pTaskStatus);
+ return msg1;
+
+_end:
+ return msg1;
+}
+
+void setTask(SStreamTask* pTask, int32_t nodeId, int64_t streamId, int32_t taskId) {
+ SStreamExecInfo* pExecNode = &execInfo;
+
+ pTask->id.streamId = streamId;
+ pTask->id.taskId = taskId;
+ pTask->info.nodeId = nodeId;
+
+ STaskId id;
+ id.streamId = pTask->id.streamId;
+ id.taskId = pTask->id.taskId;
+
+ STaskStatusEntry entry;
+ streamTaskStatusInit(&entry, pTask);
+
+ entry.stage = 1;
+ entry.status = TASK_STATUS__READY;
+
+ taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry));
+ taosArrayPush(pExecNode->pTaskList, &id);
+}
+void initStreamExecInfo() {
+ SStreamExecInfo* pExecNode = &execInfo;
+
+ SStreamTask task = {0};
+ setTask(&task, 1, 999, 1);
+ setTask(&task, 1, 999, 2);
+ setTask(&task, 1, 999, 3);
+ setTask(&task, 1, 999, 4);
+ setTask(&task, 2, 999, 5);
+}
+
+void initNodeInfo() {
+ execInfo.pNodeList = taosArrayInit(4, sizeof(SNodeEntry));
+ SNodeEntry entry = {0};
+ entry.nodeId = 2;
+ entry.stageUpdated = true;
+ taosArrayPush(execInfo.pNodeList, &entry);
+}
+} // namespace
+
+int main(int argc, char** argv) {
+ testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
+
+TEST(mndHbTest, handle_error_in_hb) {
+ mndInitExecInfo();
+ initStreamExecInfo();
+ initNodeInfo();
+
+ SRpcMsg msg = buildHbReq();
+ int32_t code = mndProcessStreamHb(&msg);
+
+ rpcFreeCont(msg.pCont);
+}
+
+#pragma GCC diagnostic pop
\ No newline at end of file
diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c
index 31a56de036..cb17b0f911 100644
--- a/source/libs/parser/src/parTranslater.c
+++ b/source/libs/parser/src/parTranslater.c
@@ -8223,7 +8223,7 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta
return code;
}
- code = nodesListAppend((*pSelect1)->pGroupByList, (SNode*)pNode1);
+ code = nodesListAppend((*pSelect1)->pGroupByList, nodesCloneNode((const SNode*)pNode1));
if (code) {
return code;
}
@@ -8236,18 +8236,17 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta
pNode2->groupingSetType = GP_TYPE_NORMAL;
pNode2->pParameterList = nodesMakeList();
if (NULL == pNode2->pParameterList) {
- nodesDestroyNode((SNode*)pNode1);
+ nodesDestroyNode((SNode*)pNode2);
return TSDB_CODE_OUT_OF_MEMORY;
}
- code = nodesListAppend(pNode2->pParameterList, (SNode*)pFunc2);
+ code = nodesListAppend(pNode2->pParameterList, nodesCloneNode((const SNode*)pFunc2));
if (code) {
nodesDestroyNode((SNode*)pNode2);
return code;
}
- code = nodesListAppend((*pSelect1)->pGroupByList, (SNode*)pNode2);
- return code;
+ return nodesListAppend((*pSelect1)->pGroupByList, (SNode*)pNode2);
}
static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {