From 5b10753ac08e9b54c50f19d2dbf8a1f6e15e5248 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Thu, 19 Dec 2024 18:23:57 +0800 Subject: [PATCH] add ut --- source/libs/stream/inc/streamInt.h | 3 + source/libs/stream/src/streamCheckpoint.c | 6 +- .../libs/stream/test/streamCheckPointTest.cpp | 203 ++++++++++++++++++ 3 files changed, 211 insertions(+), 1 deletion(-) create mode 100644 source/libs/stream/test/streamCheckPointTest.cpp diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index c25b0f34fc..41ac0117f3 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -250,6 +250,9 @@ void chkptFailedByRetrieveReqToSource(SStreamTask* pTask, int64_t checkpointId); // inject stream errors void chkptFailedByRetrieveReqToSource(SStreamTask* pTask, int64_t checkpointId); +int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type); +int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray** ppNotSendList); + #ifdef __cplusplus } #endif diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 876cd1b472..0ec66cd2ce 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -21,7 +21,9 @@ static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName); static int32_t deleteCheckpointFile(const char* id, const char* name); static int32_t streamTaskUploadCheckpoint(const char* id, const char* path, int64_t checkpointId); +#ifdef BUILD_NO_CALL static int32_t deleteCheckpoint(const char* id); +#endif static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName); static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask); static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, @@ -998,7 +1000,7 @@ static int32_t doFindNotSendUpstream(SStreamTask* pTask, SArray* pList, SArray** return 0; } -static int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray** ppNotSendList) { +int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray** ppNotSendList) { const char* id = pTask->id.idStr; SArray* pList = pTask->upstreamInfo.pList; // send msg to retrieve checkpoint trigger msg SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; @@ -1492,6 +1494,7 @@ int32_t streamTaskDownloadCheckpointData(const char* id, char* path, int64_t che return 0; } +#ifdef BUILD_NO_CALL int32_t deleteCheckpoint(const char* id) { if (id == NULL || strlen(id) == 0) { stError("deleteCheckpoint parameters invalid"); @@ -1504,6 +1507,7 @@ int32_t deleteCheckpoint(const char* id) { } return 0; } +#endif int32_t deleteCheckpointFile(const char* id, const char* name) { char object[128] = {0}; diff --git a/source/libs/stream/test/streamCheckPointTest.cpp b/source/libs/stream/test/streamCheckPointTest.cpp new file mode 100644 index 0000000000..cad02a514e --- /dev/null +++ b/source/libs/stream/test/streamCheckPointTest.cpp @@ -0,0 +1,203 @@ +#include +#include "tstream.h" +#include "streamInt.h" + +TEST(streamCheckpointTest, StreamTaskProcessCheckpointTriggerRsp) { + SStreamTask* pTask = NULL; + int64_t uid = 1111111111111111; + SArray* array = taosArrayInit(4, sizeof(SStreamTask)); + int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array, + false, 1, &pTask); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + pTask->chkInfo.pActiveInfo->activeId = 123111; + pTask->chkInfo.pActiveInfo->transId = 4561111; + + streamTaskSetStatusReady(pTask); + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + SCheckpointTriggerRsp pRsp; + memset(&pRsp, 0, sizeof(SCheckpointTriggerRsp)); + pRsp.rspCode = TSDB_CODE_SUCCESS; + pRsp.checkpointId = 123; + pRsp.transId = 456; + pRsp.upstreamTaskId = 789; + + code = streamTaskProcessCheckpointTriggerRsp(pTask, &pRsp); + ASSERT_NE(code, TSDB_CODE_SUCCESS); + + pRsp.rspCode = TSDB_CODE_FAILED; + code = streamTaskProcessCheckpointTriggerRsp(pTask, &pRsp); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + tFreeStreamTask(pTask); +} + +TEST(streamCheckpointTest, StreamTaskSetFailedCheckpointId) { + SStreamTask* pTask = NULL; + int64_t uid = 1111111111111111; + SArray* array = taosArrayInit(4, sizeof(SStreamTask)); + int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array, + false, 1, &pTask); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; + pInfo->failedId = 0; + + int64_t failedCheckpointId = 123; + + streamTaskSetFailedCheckpointId(pTask, failedCheckpointId); + ASSERT_EQ(pInfo->failedId, failedCheckpointId); + + streamTaskSetFailedCheckpointId(pTask, 0); + ASSERT_EQ(pInfo->failedId, failedCheckpointId); + + streamTaskSetFailedCheckpointId(pTask, pInfo->failedId - 1); + ASSERT_EQ(pInfo->failedId, failedCheckpointId); + tFreeStreamTask(pTask); +} + +TEST(UploadCheckpointDataTest, UploadSuccess) { + SStreamTask* pTask = NULL; + int64_t uid = 1111111111111111; + SArray* array = taosArrayInit(4, sizeof(SStreamTask)); + int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array, + false, 1, &pTask); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + int64_t checkpointId = 123; + int64_t dbRefId = 1; + ECHECKPOINT_BACKUP_TYPE type = DATA_UPLOAD_S3; + + STaskDbWrapper* pBackend = NULL; + int64_t processVer = -1; + const char *path = "/tmp/backend3/stream"; + code = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL, &pTask->pMeta); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + SStreamState *pState = streamStateOpen((char *)path, pTask, 0, 0); + ASSERT(pState != NULL); + + pTask->pBackend = pState->pTdbState->pOwner->pBackend; + + code = taskDbDoCheckpoint(pTask->pBackend, checkpointId, 0); + ASSERT(code == 0); + + int32_t result = uploadCheckpointData(pTask, checkpointId, dbRefId, type); + + EXPECT_EQ(result, TSDB_CODE_SUCCESS) << "uploadCheckpointData should return 0 on success"; +} + +TEST(UploadCheckpointDataTest, UploadDisabled) { + SStreamTask* pTask = NULL; + int64_t uid = 2222222222222; + SArray* array = taosArrayInit(4, sizeof(SStreamTask)); + int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array, + false, 1, &pTask); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + int64_t checkpointId = 123; + int64_t dbRefId = 1; + + STaskDbWrapper* pBackend = NULL; + int64_t processVer = -1; + const char *path = "/tmp/backend4/stream"; + code = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL, &pTask->pMeta); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + SStreamState *pState = streamStateOpen((char *)path, pTask, 0, 0); + ASSERT(pState != NULL); + + pTask->pBackend = pState->pTdbState->pOwner->pBackend; + + code = taskDbDoCheckpoint(pTask->pBackend, checkpointId, 0); + ASSERT(code == 0); + + ECHECKPOINT_BACKUP_TYPE type = DATA_UPLOAD_DISABLE; + + int32_t result = uploadCheckpointData(pTask, checkpointId, dbRefId, type); + + EXPECT_NE(result, TSDB_CODE_SUCCESS) << "uploadCheckpointData should return 0 when backup type is disabled"; +} + +TEST(StreamTaskAlreadySendTriggerTest, AlreadySendTrigger) { + SStreamTask* pTask = NULL; + int64_t uid = 2222222222222; + SArray* array = taosArrayInit(4, sizeof(SStreamTask)); + int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array, + false, 1, &pTask); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + pTask->chkInfo.pActiveInfo->activeId = 123111; + pTask->chkInfo.pActiveInfo->transId = 4561111; + + streamTaskSetStatusReady(pTask); + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + int32_t downstreamNodeId = 1; + int64_t sendingCheckpointId = 123; + + STaskTriggerSendInfo triggerInfo = {.sendTs = taosGetTimestampMs(), .recved = false, .nodeId = downstreamNodeId }; + taosArrayPush(pTask->chkInfo.pActiveInfo->pDispatchTriggerList, &triggerInfo); + + pTask->chkInfo.pActiveInfo->dispatchTrigger = true; + bool result = streamTaskAlreadySendTrigger(pTask, downstreamNodeId); + + EXPECT_TRUE(result) << "The trigger message should have been sent to the downstream node"; + + taosArrayDestroy(pTask->chkInfo.pActiveInfo->pDispatchTriggerList); +} + +TEST(ChkptTriggerRecvMonitorHelperTest, chkptTriggerRecvMonitorHelper) { + SStreamTask* pTask = NULL; + int64_t uid = 2222222222222; + SArray* array = taosArrayInit(4, sizeof(SStreamTask)); + int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array, + false, 1, &pTask); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + const char *path = "/tmp/backend5/stream"; + code = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL, &pTask->pMeta); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + pTask->chkInfo.pActiveInfo->activeId = 123111; + pTask->chkInfo.pActiveInfo->chkptTriggerMsgTmr.launchChkptId = pTask->chkInfo.pActiveInfo->activeId; + pTask->chkInfo.pActiveInfo->transId = 4561111; + pTask->chkInfo.startTs = 11111; + + streamTaskSetStatusReady(pTask); + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + int32_t downstreamNodeId = 1; + int64_t sendingCheckpointId = 123; + + STaskTriggerSendInfo triggerInfo = {.sendTs = taosGetTimestampMs(), .recved = false, .nodeId = downstreamNodeId }; + taosArrayPush(pTask->chkInfo.pActiveInfo->pDispatchTriggerList, &triggerInfo); + + pTask->chkInfo.pActiveInfo->dispatchTrigger = true; + SArray* array1 = NULL; + code = chkptTriggerRecvMonitorHelper(pTask, NULL, &array1); + + EXPECT_EQ(code, TSDB_CODE_SUCCESS); +}