Merge pull request #24738 from taosdata/fix/3_liaohj
test(stream): add unit test case for the mnode stream module.
This commit is contained in:
commit
ecdcc6dee1
|
@ -13,6 +13,9 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _STREAM_STATE_H_
|
||||
#define _STREAM_STATE_H_
|
||||
|
||||
#include "tdatablock.h"
|
||||
|
||||
#include "rocksdb/c.h"
|
||||
|
@ -20,9 +23,6 @@
|
|||
#include "tsimplehash.h"
|
||||
#include "tstreamFileState.h"
|
||||
|
||||
#ifndef _STREAM_STATE_H_
|
||||
#define _STREAM_STATE_H_
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
|
|
@ -13,6 +13,9 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _STREAM_H_
|
||||
#define _STREAM_H_
|
||||
|
||||
#include "os.h"
|
||||
#include "streamState.h"
|
||||
#include "tdatablock.h"
|
||||
|
@ -26,9 +29,6 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#ifndef _STREAM_H_
|
||||
#define _STREAM_H_
|
||||
|
||||
#define ONE_MiB_F (1048576.0)
|
||||
#define ONE_KiB_F (1024.0)
|
||||
#define SIZE_IN_MiB(_v) ((_v) / ONE_MiB_F)
|
||||
|
|
|
@ -556,7 +556,7 @@ typedef struct {
|
|||
} SMqConsumerObj;
|
||||
|
||||
SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]);
|
||||
void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer, bool delete);
|
||||
void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer, bool isDeleted);
|
||||
int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer);
|
||||
void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer, int8_t sver);
|
||||
|
||||
|
|
|
@ -124,6 +124,7 @@ SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream);
|
|||
void destroyStreamTaskIter(SStreamTaskIter *pIter);
|
||||
bool streamTaskIterNextTask(SStreamTaskIter *pIter);
|
||||
SStreamTask *streamTaskIterGetCurrent(SStreamTaskIter *pIter);
|
||||
void mndInitExecInfo();
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -62,8 +62,6 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
|
|||
static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
|
||||
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
|
||||
static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
|
||||
static void freeCheckpointCandEntry(void *);
|
||||
static void freeTaskList(void *param);
|
||||
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
|
||||
|
||||
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream);
|
||||
|
@ -121,17 +119,7 @@ int32_t mndInitStream(SMnode *pMnode) {
|
|||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
|
||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
|
||||
|
||||
taosThreadMutexInit(&execInfo.lock, NULL);
|
||||
_hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
|
||||
|
||||
execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId));
|
||||
execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK);
|
||||
execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK);
|
||||
execInfo.transMgmt.pWaitingList = taosHashInit(32, fn, true, HASH_NO_LOCK);
|
||||
execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK);
|
||||
|
||||
taosHashSetFreeFp(execInfo.transMgmt.pWaitingList, freeCheckpointCandEntry);
|
||||
taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList);
|
||||
mndInitExecInfo();
|
||||
|
||||
if (sdbSetTable(pMnode->pSdb, table) != 0) {
|
||||
return -1;
|
||||
|
@ -2117,16 +2105,6 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
|
|||
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
|
||||
}
|
||||
|
||||
void freeCheckpointCandEntry(void *param) {
|
||||
SCheckpointCandEntry *pEntry = param;
|
||||
taosMemoryFreeClear(pEntry->pName);
|
||||
}
|
||||
|
||||
void freeTaskList(void* param) {
|
||||
SArray** pList = (SArray **)param;
|
||||
taosArrayDestroy(*pList);
|
||||
}
|
||||
|
||||
static void doAddTaskId(SArray* pList, int32_t taskId, int64_t uid, int32_t numOfTotal) {
|
||||
int32_t num = taosArrayGetSize(pList);
|
||||
for(int32_t i = 0; i < num; ++i) {
|
||||
|
@ -2202,4 +2180,4 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
|
|||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -316,16 +316,20 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
|||
// current checkpoint is failed, rollback from the checkpoint trans
|
||||
// kill the checkpoint trans and then set all tasks status to be normal
|
||||
if (taosArrayGetSize(pFailedTasks) > 0) {
|
||||
bool allReady = true;
|
||||
SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady);
|
||||
taosArrayDestroy(p);
|
||||
bool allReady = true;
|
||||
if (pMnode != NULL) {
|
||||
SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady);
|
||||
taosArrayDestroy(p);
|
||||
} else {
|
||||
allReady = false;
|
||||
}
|
||||
|
||||
if (allReady || snodeChanged) {
|
||||
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
|
||||
for(int32_t i = 0; i < taosArrayGetSize(pFailedTasks); ++i) {
|
||||
SFailedCheckpointInfo *pInfo = taosArrayGet(pFailedTasks, i);
|
||||
mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status",
|
||||
pInfo->checkpointId, pInfo->transId);
|
||||
pInfo->checkpointId, pInfo->transId);
|
||||
|
||||
mndResetStatusFromCheckpoint(pMnode, pInfo->streamUid, pInfo->transId);
|
||||
}
|
||||
|
|
|
@ -543,3 +543,27 @@ int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *
|
|||
taosWUnLockLatch(&pStream->lock);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void freeCheckpointCandEntry(void *param) {
|
||||
SCheckpointCandEntry *pEntry = param;
|
||||
taosMemoryFreeClear(pEntry->pName);
|
||||
}
|
||||
|
||||
static void freeTaskList(void* param) {
|
||||
SArray** pList = (SArray **)param;
|
||||
taosArrayDestroy(*pList);
|
||||
}
|
||||
|
||||
void mndInitExecInfo() {
|
||||
taosThreadMutexInit(&execInfo.lock, NULL);
|
||||
_hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
|
||||
|
||||
execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId));
|
||||
execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK);
|
||||
execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK);
|
||||
execInfo.transMgmt.pWaitingList = taosHashInit(32, fn, true, HASH_NO_LOCK);
|
||||
execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK);
|
||||
|
||||
taosHashSetFreeFp(execInfo.transMgmt.pWaitingList, freeCheckpointCandEntry);
|
||||
taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList);
|
||||
}
|
||||
|
|
|
@ -4,7 +4,7 @@ add_subdirectory(acct)
|
|||
#add_subdirectory(db)
|
||||
#add_subdirectory(dnode)
|
||||
add_subdirectory(func)
|
||||
#add_subdirectory(mnode)
|
||||
add_subdirectory(stream)
|
||||
add_subdirectory(profile)
|
||||
add_subdirectory(qnode)
|
||||
add_subdirectory(sdb)
|
||||
|
|
|
@ -0,0 +1,13 @@
|
|||
SET(CMAKE_CXX_STANDARD 11)
|
||||
|
||||
aux_source_directory(. MNODE_STREAM_TEST_SRC)
|
||||
add_executable(streamTest ${MNODE_STREAM_TEST_SRC})
|
||||
target_link_libraries(
|
||||
streamTest
|
||||
PRIVATE dnode gtest
|
||||
)
|
||||
|
||||
add_test(
|
||||
NAME streamTest
|
||||
COMMAND streamTest
|
||||
)
|
|
@ -0,0 +1,155 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <iostream>
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic ignored "-Wwrite-strings"
|
||||
#pragma GCC diagnostic ignored "-Wunused-function"
|
||||
#pragma GCC diagnostic ignored "-Wunused-variable"
|
||||
#pragma GCC diagnostic ignored "-Wsign-compare"
|
||||
|
||||
#include <libs/stream/tstream.h>
|
||||
#include <libs/transport/trpc.h>
|
||||
#include "../../inc/mndStream.h"
|
||||
|
||||
namespace {
|
||||
SRpcMsg buildHbReq() {
|
||||
SStreamHbMsg msg = {0};
|
||||
msg.vgId = 1;
|
||||
msg.numOfTasks = 5;
|
||||
msg.pTaskStatus = taosArrayInit(4, sizeof(STaskStatusEntry));
|
||||
|
||||
for (int32_t i = 0; i < 4; ++i) {
|
||||
STaskStatusEntry entry = {0};
|
||||
entry.nodeId = i + 1;
|
||||
entry.stage = 1;
|
||||
entry.id.taskId = i + 1;
|
||||
entry.id.streamId = 999;
|
||||
|
||||
if (i == 0) {
|
||||
entry.stage = 4;
|
||||
}
|
||||
|
||||
taosArrayPush(msg.pTaskStatus, &entry);
|
||||
}
|
||||
|
||||
// (p->checkpointId != 0) && p->checkpointFailed
|
||||
// add failed checkpoint info
|
||||
{
|
||||
STaskStatusEntry entry = {0};
|
||||
entry.nodeId = 5;
|
||||
entry.stage = 1;
|
||||
|
||||
entry.id.taskId = 5;
|
||||
entry.id.streamId = 999;
|
||||
|
||||
entry.checkpointId = 1;
|
||||
entry.checkpointFailed = true;
|
||||
|
||||
taosArrayPush(msg.pTaskStatus, &entry);
|
||||
}
|
||||
|
||||
int32_t tlen = 0;
|
||||
int32_t code = 0;
|
||||
SEncoder encoder;
|
||||
void* buf = NULL;
|
||||
SRpcMsg msg1 = {0};
|
||||
msg1.info.noResp = 1;
|
||||
|
||||
tEncodeSize(tEncodeStreamHbMsg, &msg, tlen, code);
|
||||
if (code < 0) {
|
||||
goto _end;
|
||||
}
|
||||
|
||||
buf = rpcMallocCont(tlen);
|
||||
if (buf == NULL) {
|
||||
goto _end;
|
||||
}
|
||||
|
||||
tEncoderInit(&encoder, (uint8_t*)buf, tlen);
|
||||
if ((code = tEncodeStreamHbMsg(&encoder, &msg)) < 0) {
|
||||
rpcFreeCont(buf);
|
||||
goto _end;
|
||||
}
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
initRpcMsg(&msg1, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
|
||||
|
||||
taosArrayDestroy(msg.pTaskStatus);
|
||||
return msg1;
|
||||
|
||||
_end:
|
||||
return msg1;
|
||||
}
|
||||
|
||||
void setTask(SStreamTask* pTask, int32_t nodeId, int64_t streamId, int32_t taskId) {
|
||||
SStreamExecInfo* pExecNode = &execInfo;
|
||||
|
||||
pTask->id.streamId = streamId;
|
||||
pTask->id.taskId = taskId;
|
||||
pTask->info.nodeId = nodeId;
|
||||
|
||||
STaskId id;
|
||||
id.streamId = pTask->id.streamId;
|
||||
id.taskId = pTask->id.taskId;
|
||||
|
||||
STaskStatusEntry entry;
|
||||
streamTaskStatusInit(&entry, pTask);
|
||||
|
||||
entry.stage = 1;
|
||||
entry.status = TASK_STATUS__READY;
|
||||
|
||||
taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry));
|
||||
taosArrayPush(pExecNode->pTaskList, &id);
|
||||
}
|
||||
void initStreamExecInfo() {
|
||||
SStreamExecInfo* pExecNode = &execInfo;
|
||||
|
||||
SStreamTask task = {0};
|
||||
setTask(&task, 1, 999, 1);
|
||||
setTask(&task, 1, 999, 2);
|
||||
setTask(&task, 1, 999, 3);
|
||||
setTask(&task, 1, 999, 4);
|
||||
setTask(&task, 2, 999, 5);
|
||||
}
|
||||
|
||||
void initNodeInfo() {
|
||||
execInfo.pNodeList = taosArrayInit(4, sizeof(SNodeEntry));
|
||||
SNodeEntry entry = {0};
|
||||
entry.nodeId = 2;
|
||||
entry.stageUpdated = true;
|
||||
taosArrayPush(execInfo.pNodeList, &entry);
|
||||
}
|
||||
} // namespace
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
||||
|
||||
TEST(mndHbTest, handle_error_in_hb) {
|
||||
mndInitExecInfo();
|
||||
initStreamExecInfo();
|
||||
initNodeInfo();
|
||||
|
||||
SRpcMsg msg = buildHbReq();
|
||||
int32_t code = mndProcessStreamHb(&msg);
|
||||
|
||||
rpcFreeCont(msg.pCont);
|
||||
}
|
||||
|
||||
#pragma GCC diagnostic pop
|
|
@ -8223,7 +8223,7 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta
|
|||
return code;
|
||||
}
|
||||
|
||||
code = nodesListAppend((*pSelect1)->pGroupByList, (SNode*)pNode1);
|
||||
code = nodesListAppend((*pSelect1)->pGroupByList, nodesCloneNode((const SNode*)pNode1));
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
@ -8236,18 +8236,17 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta
|
|||
pNode2->groupingSetType = GP_TYPE_NORMAL;
|
||||
pNode2->pParameterList = nodesMakeList();
|
||||
if (NULL == pNode2->pParameterList) {
|
||||
nodesDestroyNode((SNode*)pNode1);
|
||||
nodesDestroyNode((SNode*)pNode2);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
code = nodesListAppend(pNode2->pParameterList, (SNode*)pFunc2);
|
||||
code = nodesListAppend(pNode2->pParameterList, nodesCloneNode((const SNode*)pFunc2));
|
||||
if (code) {
|
||||
nodesDestroyNode((SNode*)pNode2);
|
||||
return code;
|
||||
}
|
||||
|
||||
code = nodesListAppend((*pSelect1)->pGroupByList, (SNode*)pNode2);
|
||||
return code;
|
||||
return nodesListAppend((*pSelect1)->pGroupByList, (SNode*)pNode2);
|
||||
}
|
||||
|
||||
static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {
|
||||
|
|
Loading…
Reference in New Issue