test(stream): add unit test case for the mnode stream module.
This commit is contained in:
parent
6736fd1615
commit
739485a419
|
@ -13,6 +13,9 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#ifndef _STREAM_STATE_H_
|
||||||
|
#define _STREAM_STATE_H_
|
||||||
|
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
|
|
||||||
#include "rocksdb/c.h"
|
#include "rocksdb/c.h"
|
||||||
|
@ -20,9 +23,6 @@
|
||||||
#include "tsimplehash.h"
|
#include "tsimplehash.h"
|
||||||
#include "tstreamFileState.h"
|
#include "tstreamFileState.h"
|
||||||
|
|
||||||
#ifndef _STREAM_STATE_H_
|
|
||||||
#define _STREAM_STATE_H_
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -13,6 +13,9 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#ifndef _STREAM_H_
|
||||||
|
#define _STREAM_H_
|
||||||
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "streamState.h"
|
#include "streamState.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
|
@ -26,9 +29,6 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#ifndef _STREAM_H_
|
|
||||||
#define _STREAM_H_
|
|
||||||
|
|
||||||
#define ONE_MiB_F (1048576.0)
|
#define ONE_MiB_F (1048576.0)
|
||||||
#define ONE_KiB_F (1024.0)
|
#define ONE_KiB_F (1024.0)
|
||||||
#define SIZE_IN_MiB(_v) ((_v) / ONE_MiB_F)
|
#define SIZE_IN_MiB(_v) ((_v) / ONE_MiB_F)
|
||||||
|
|
|
@ -553,7 +553,7 @@ typedef struct {
|
||||||
} SMqConsumerObj;
|
} SMqConsumerObj;
|
||||||
|
|
||||||
SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]);
|
SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]);
|
||||||
void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer, bool delete);
|
void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer, bool isDeleted);
|
||||||
int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer);
|
int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer);
|
||||||
void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer, int8_t sver);
|
void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer, int8_t sver);
|
||||||
|
|
||||||
|
|
|
@ -124,6 +124,7 @@ SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream);
|
||||||
void destroyStreamTaskIter(SStreamTaskIter *pIter);
|
void destroyStreamTaskIter(SStreamTaskIter *pIter);
|
||||||
bool streamTaskIterNextTask(SStreamTaskIter *pIter);
|
bool streamTaskIterNextTask(SStreamTaskIter *pIter);
|
||||||
SStreamTask *streamTaskIterGetCurrent(SStreamTaskIter *pIter);
|
SStreamTask *streamTaskIterGetCurrent(SStreamTaskIter *pIter);
|
||||||
|
void mndInitExecInfo();
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,8 +62,6 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
|
||||||
static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
|
static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
|
||||||
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
|
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
|
||||||
static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
|
static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
|
||||||
static void freeCheckpointCandEntry(void *);
|
|
||||||
static void freeTaskList(void *param);
|
|
||||||
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
|
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
|
||||||
|
|
||||||
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream);
|
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream);
|
||||||
|
@ -121,17 +119,7 @@ int32_t mndInitStream(SMnode *pMnode) {
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
|
||||||
|
|
||||||
taosThreadMutexInit(&execInfo.lock, NULL);
|
mndInitExecInfo();
|
||||||
_hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
|
|
||||||
|
|
||||||
execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId));
|
|
||||||
execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK);
|
|
||||||
execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK);
|
|
||||||
execInfo.transMgmt.pWaitingList = taosHashInit(32, fn, true, HASH_NO_LOCK);
|
|
||||||
execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK);
|
|
||||||
|
|
||||||
taosHashSetFreeFp(execInfo.transMgmt.pWaitingList, freeCheckpointCandEntry);
|
|
||||||
taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList);
|
|
||||||
|
|
||||||
if (sdbSetTable(pMnode->pSdb, table) != 0) {
|
if (sdbSetTable(pMnode->pSdb, table) != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -2118,16 +2106,6 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
|
||||||
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
|
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
|
||||||
}
|
}
|
||||||
|
|
||||||
void freeCheckpointCandEntry(void *param) {
|
|
||||||
SCheckpointCandEntry *pEntry = param;
|
|
||||||
taosMemoryFreeClear(pEntry->pName);
|
|
||||||
}
|
|
||||||
|
|
||||||
void freeTaskList(void* param) {
|
|
||||||
SArray** pList = (SArray **)param;
|
|
||||||
taosArrayDestroy(*pList);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void doAddTaskId(SArray* pList, int32_t taskId, int64_t uid, int32_t numOfTotal) {
|
static void doAddTaskId(SArray* pList, int32_t taskId, int64_t uid, int32_t numOfTotal) {
|
||||||
int32_t num = taosArrayGetSize(pList);
|
int32_t num = taosArrayGetSize(pList);
|
||||||
for(int32_t i = 0; i < num; ++i) {
|
for(int32_t i = 0; i < num; ++i) {
|
||||||
|
@ -2203,4 +2181,4 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
|
||||||
taosThreadMutexUnlock(&execInfo.lock);
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -316,16 +316,20 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
// current checkpoint is failed, rollback from the checkpoint trans
|
// current checkpoint is failed, rollback from the checkpoint trans
|
||||||
// kill the checkpoint trans and then set all tasks status to be normal
|
// kill the checkpoint trans and then set all tasks status to be normal
|
||||||
if (taosArrayGetSize(pFailedTasks) > 0) {
|
if (taosArrayGetSize(pFailedTasks) > 0) {
|
||||||
bool allReady = true;
|
bool allReady = true;
|
||||||
SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady);
|
if (pMnode != NULL) {
|
||||||
taosArrayDestroy(p);
|
SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady);
|
||||||
|
taosArrayDestroy(p);
|
||||||
|
} else {
|
||||||
|
allReady = false;
|
||||||
|
}
|
||||||
|
|
||||||
if (allReady || snodeChanged) {
|
if (allReady || snodeChanged) {
|
||||||
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
|
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
|
||||||
for(int32_t i = 0; i < taosArrayGetSize(pFailedTasks); ++i) {
|
for(int32_t i = 0; i < taosArrayGetSize(pFailedTasks); ++i) {
|
||||||
SFailedCheckpointInfo *pInfo = taosArrayGet(pFailedTasks, i);
|
SFailedCheckpointInfo *pInfo = taosArrayGet(pFailedTasks, i);
|
||||||
mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status",
|
mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status",
|
||||||
pInfo->checkpointId, pInfo->transId);
|
pInfo->checkpointId, pInfo->transId);
|
||||||
|
|
||||||
mndResetStatusFromCheckpoint(pMnode, pInfo->streamUid, pInfo->transId);
|
mndResetStatusFromCheckpoint(pMnode, pInfo->streamUid, pInfo->transId);
|
||||||
}
|
}
|
||||||
|
|
|
@ -543,3 +543,27 @@ int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *
|
||||||
taosWUnLockLatch(&pStream->lock);
|
taosWUnLockLatch(&pStream->lock);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void freeCheckpointCandEntry(void *param) {
|
||||||
|
SCheckpointCandEntry *pEntry = param;
|
||||||
|
taosMemoryFreeClear(pEntry->pName);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void freeTaskList(void* param) {
|
||||||
|
SArray** pList = (SArray **)param;
|
||||||
|
taosArrayDestroy(*pList);
|
||||||
|
}
|
||||||
|
|
||||||
|
void mndInitExecInfo() {
|
||||||
|
taosThreadMutexInit(&execInfo.lock, NULL);
|
||||||
|
_hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
|
||||||
|
|
||||||
|
execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId));
|
||||||
|
execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK);
|
||||||
|
execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK);
|
||||||
|
execInfo.transMgmt.pWaitingList = taosHashInit(32, fn, true, HASH_NO_LOCK);
|
||||||
|
execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK);
|
||||||
|
|
||||||
|
taosHashSetFreeFp(execInfo.transMgmt.pWaitingList, freeCheckpointCandEntry);
|
||||||
|
taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList);
|
||||||
|
}
|
||||||
|
|
|
@ -4,7 +4,7 @@ add_subdirectory(acct)
|
||||||
#add_subdirectory(db)
|
#add_subdirectory(db)
|
||||||
#add_subdirectory(dnode)
|
#add_subdirectory(dnode)
|
||||||
add_subdirectory(func)
|
add_subdirectory(func)
|
||||||
#add_subdirectory(mnode)
|
add_subdirectory(stream)
|
||||||
add_subdirectory(profile)
|
add_subdirectory(profile)
|
||||||
add_subdirectory(qnode)
|
add_subdirectory(qnode)
|
||||||
add_subdirectory(sdb)
|
add_subdirectory(sdb)
|
||||||
|
|
|
@ -0,0 +1,13 @@
|
||||||
|
SET(CMAKE_CXX_STANDARD 11)
|
||||||
|
|
||||||
|
aux_source_directory(. MNODE_STREAM_TEST_SRC)
|
||||||
|
add_executable(streamTest ${MNODE_STREAM_TEST_SRC})
|
||||||
|
target_link_libraries(
|
||||||
|
streamTest
|
||||||
|
PRIVATE dnode gtest
|
||||||
|
)
|
||||||
|
|
||||||
|
add_test(
|
||||||
|
NAME streamTest
|
||||||
|
COMMAND streamTest
|
||||||
|
)
|
Loading…
Reference in New Issue