test(stream): add unit test cases.

This commit is contained in:
Haojun Liao 2024-02-20 10:27:02 +08:00
parent 931b9f0b51
commit 03bc2d7c6d
5 changed files with 121 additions and 34 deletions

View File

@ -119,6 +119,7 @@ int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *
int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList);
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream);
SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream);
void destroyStreamTaskIter(SStreamTaskIter *pIter);

View File

@ -65,7 +65,7 @@ static void addIntoCheckpointList(SArray* pList, const SFailedCheckpointInfo* pI
taosArrayPush(pList, pInfo);
}
int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_RESET_NAME, " reset from failed checkpoint");
if (pTrans == NULL) {
return terrno;
@ -115,7 +115,7 @@ static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, in
} else {
mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, transId:%d, create reset trans", pStream->name,
pStream->uid, transId);
code = createStreamResetStatusTrans(pMnode, pStream);
code = mndCreateStreamResetStatusTrans(pMnode, pStream);
}
}

View File

@ -261,22 +261,30 @@ int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msg
return mndTransAppendRedoAction(pTrans, &action);
}
static bool identicalName(const char* pDb, const char* pParam, int32_t len) {
return (strlen(pDb) == len) && (strncmp(pDb, pParam, len) == 0);
}
int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
// data in the hash table will be removed automatically, no need to remove it here.
SStreamTransInfo *pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, len);
if (pTransInfo == NULL) {
return TSDB_CODE_SUCCESS;
}
void *pIter = NULL;
// not checkpoint trans, ignore
while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
SStreamTransInfo *pTransInfo = (SStreamTransInfo *)pIter;
if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) {
mDebug("not checkpoint trans, not kill it, name:%s, transId:%d", pTransInfo->name, pTransInfo->transId);
return TSDB_CODE_SUCCESS;
continue;
}
char *pDupDBName = strndup(pDBName, len);
mndKillTransImpl(pMnode, pTransInfo->transId, pDupDBName);
taosMemoryFree(pDupDBName);
SStreamObj *pStream = mndGetStreamObj(pMnode, pTransInfo->streamId);
if (pStream != NULL) {
if (identicalName(pStream->sourceDb, pDBName, len)) {
mndKillTransImpl(pMnode, pTransInfo->transId, pStream->sourceDb);
} else if (identicalName(pStream->targetDb, pDBName, len)) {
mndKillTransImpl(pMnode, pTransInfo->transId, pStream->targetDb);
}
mndReleaseStream(pMnode, pStream);
}
}
return TSDB_CODE_SUCCESS;
}

View File

@ -231,6 +231,8 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT
taosMemoryFree(pReq);
return -1;
}
mDebug("set the resume action for trans:%d", pTrans->id);
return 0;
}

File diff suppressed because one or more lines are too long