From 4adf24b466bba5999ced38929f92397eee7df96a Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Mon, 23 Dec 2024 18:25:25 +0800 Subject: [PATCH 1/3] ci(stream):add stream unit test --- source/libs/stream/inc/streamInt.h | 2 + source/libs/stream/src/streamCheckpoint.c | 3 +- .../libs/stream/test/streamCheckPointTest.cpp | 125 ++++++++++++++++++ 3 files changed, 128 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 41ac0117f3..7af64c041d 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -252,6 +252,8 @@ void chkptFailedByRetrieveReqToSource(SStreamTask* pTask, int64_t checkpointId); int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type); int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray** ppNotSendList); +int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName); +int32_t uploadCheckpointToS3(const char* id, const char* path); #ifdef __cplusplus } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 0ec66cd2ce..ebde9fe50e 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -24,7 +24,6 @@ static int32_t streamTaskUploadCheckpoint(const char* id, const char* path, int6 #ifdef BUILD_NO_CALL static int32_t deleteCheckpoint(const char* id); #endif -static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName); static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask); static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId, int32_t srcTaskId); @@ -1355,7 +1354,7 @@ void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) { } } -static int32_t uploadCheckpointToS3(const char* id, const char* path) { +int32_t uploadCheckpointToS3(const char* id, const char* path) { int32_t code = 0; int32_t nBytes = 0; /* diff --git a/source/libs/stream/test/streamCheckPointTest.cpp b/source/libs/stream/test/streamCheckPointTest.cpp index 80dd3ec142..17d8b55560 100644 --- a/source/libs/stream/test/streamCheckPointTest.cpp +++ b/source/libs/stream/test/streamCheckPointTest.cpp @@ -1,6 +1,7 @@ #include #include "tstream.h" #include "streamInt.h" +#include "tcs.h" #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wwrite-strings" @@ -217,6 +218,10 @@ TEST(StreamTaskAlreadySendTriggerTest, AlreadySendTrigger) { taosArrayDestroy(array); } +int32_t sendReq1111(const SEpSet *pEpSet, SRpcMsg *pMsg) { + return TSDB_CODE_SUCCESS; +} + TEST(ChkptTriggerRecvMonitorHelperTest, chkptTriggerRecvMonitorHelper) { SStreamTask* pTask = NULL; int64_t uid = 2222222222222; @@ -239,6 +244,11 @@ TEST(ChkptTriggerRecvMonitorHelperTest, chkptTriggerRecvMonitorHelper) { pTask->chkInfo.pActiveInfo->transId = 4561111; pTask->chkInfo.startTs = 11111; + SStreamTask upTask; + upTask = *pTask; + streamTaskSetUpstreamInfo(pTask, &upTask); + + streamTaskSetStatusReady(pTask); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); ASSERT_EQ(code, TSDB_CODE_SUCCESS); @@ -254,7 +264,21 @@ TEST(ChkptTriggerRecvMonitorHelperTest, chkptTriggerRecvMonitorHelper) { taosArrayPush(pTask->chkInfo.pActiveInfo->pDispatchTriggerList, &triggerInfo); + STaskCheckpointReadyInfo readyInfo; + readyInfo.upstreamNodeId = 789111; + void* pBuf = rpcMallocCont(sizeof(SMsgHead) + 1); + + initRpcMsg(&readyInfo.msg, 0, pBuf, sizeof(SMsgHead) + 1); + taosArrayPush(pTask->chkInfo.pActiveInfo->pReadyMsgList, &readyInfo); + + pTask->chkInfo.pActiveInfo->dispatchTrigger = true; + + SMsgCb msgCb = {0}; + msgCb.sendReqFp = sendReq1111; + msgCb.mgmt = (SMgmtWrapper*)(&msgCb); // hack + tmsgSetDefault(&msgCb); + SArray* array1 = NULL; code = chkptTriggerRecvMonitorHelper(pTask, NULL, &array1); EXPECT_EQ(code, TSDB_CODE_SUCCESS); @@ -268,3 +292,104 @@ TEST(ChkptTriggerRecvMonitorHelperTest, chkptTriggerRecvMonitorHelper) { taosArrayDestroy(array); taosArrayDestroy(array1); } + +TEST(StreamTaskSendCheckpointTriggerMsgTest, SendCheckpointTriggerMsgSuccessTest) { + SStreamTask* pTask = NULL; + int64_t uid = 2222222222222; + SArray* array = taosArrayInit(4, POINTER_BYTES); + int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array, + false, 1, &pTask); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + initTaskLock(pTask); + + const char *path = "/tmp/SendCheckpointTriggerMsgSuccessTest/stream"; + code = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL, &pTask->pMeta); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + SRpcHandleInfo rpcInfo; + + int32_t ret = streamTaskSendCheckpointTriggerMsg(pTask, 123, 456, &rpcInfo, code); + + EXPECT_EQ(ret, TSDB_CODE_SUCCESS); +} + +TEST(streamTaskBuildCheckpointTest, streamTaskBuildCheckpointFnTest) { + SStreamTask* pTask = NULL; + int64_t uid = 2222222222222; + SArray* array = taosArrayInit(4, POINTER_BYTES); + int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array, + false, 1, &pTask); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + initTaskLock(pTask); + + const char *path = "/tmp/streamTaskBuildCheckpoinTest/stream"; + code = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL, &pTask->pMeta); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + SStreamState *pState = streamStateOpen((char *)path, pTask, 0, 0); + ASSERT(pState != NULL); + + pTask->pBackend = pState->pTdbState->pOwner->pBackend; + + code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + char a[] = "localhost"; + memcpy(tsSnodeAddress, a, sizeof(a)); + + int32_t ret = streamTaskBuildCheckpoint(pTask); + + EXPECT_NE(ret, TSDB_CODE_SUCCESS); +} + +int32_t s3GetObjectToFileTest(const char *object_name, const char *fileName) { + return TSDB_CODE_SUCCESS; +} + +TEST(sstreamTaskGetTriggerRecvStatusTest, streamTaskGetTriggerRecvStatusFnTest) { + SStreamTask* pTask = NULL; + int64_t uid = 2222222222222; + SArray* array = taosArrayInit(4, POINTER_BYTES); + int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array, + false, 1, &pTask); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + initTaskLock(pTask); + + SStreamTask upTask; + upTask = *pTask; + code = streamTaskSetUpstreamInfo(pTask, &upTask); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + code = streamTaskSetUpstreamInfo(pTask, &upTask); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + int32_t recv = 0; + int32_t total = 0; + pTask->info.taskLevel = TASK_LEVEL__SOURCE; + streamTaskGetTriggerRecvStatus(pTask, &recv, &total); + EXPECT_EQ(total, 1); + + pTask->info.taskLevel = TASK_LEVEL__AGG; + streamTaskGetTriggerRecvStatus(pTask, &recv, &total); + EXPECT_EQ(total, 2); + + code = streamTaskDownloadCheckpointData("123", "/root/download", 123); + EXPECT_NE(code, TSDB_CODE_SUCCESS); + + tcsInit(); + + code = uploadCheckpointToS3("123", "/tmp/backend5/stream"); + EXPECT_EQ(code, TSDB_CODE_SUCCESS); + + code = downloadCheckpointByNameS3("123", "/root/download", "123"); + EXPECT_EQ(code, TSDB_CODE_SUCCESS); +} From 0452c21ff600926e6936861444bffbe25c24ebc3 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Mon, 23 Dec 2024 19:10:10 +0800 Subject: [PATCH 2/3] ci(stream):add stream unit test --- source/libs/stream/test/streamCheckPointTest.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/test/streamCheckPointTest.cpp b/source/libs/stream/test/streamCheckPointTest.cpp index 17d8b55560..c8297d56b7 100644 --- a/source/libs/stream/test/streamCheckPointTest.cpp +++ b/source/libs/stream/test/streamCheckPointTest.cpp @@ -2,6 +2,7 @@ #include "tstream.h" #include "streamInt.h" #include "tcs.h" +#include "tglobal.h" #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wwrite-strings" @@ -386,10 +387,12 @@ TEST(sstreamTaskGetTriggerRecvStatusTest, streamTaskGetTriggerRecvStatusFnTest) EXPECT_NE(code, TSDB_CODE_SUCCESS); tcsInit(); + extern int8_t tsS3EpNum; + tsS3EpNum = 1; code = uploadCheckpointToS3("123", "/tmp/backend5/stream"); EXPECT_EQ(code, TSDB_CODE_SUCCESS); - code = downloadCheckpointByNameS3("123", "/root/download", "123"); - EXPECT_EQ(code, TSDB_CODE_SUCCESS); + code = downloadCheckpointByNameS3("123", "/root/download", ""); + EXPECT_NE(code, TSDB_CODE_OUT_OF_RANGE); } From 0497642545e6b3894e8fc57d8bf340fa6a29bf2f Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Mon, 23 Dec 2024 19:38:13 +0800 Subject: [PATCH 3/3] fix issue --- include/common/tglobal.h | 1 + 1 file changed, 1 insertion(+) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 584c4b5775..501f1cabc1 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -293,6 +293,7 @@ extern int32_t tsMaxStreamBackendCache; extern int32_t tsPQSortMemThreshold; extern int32_t tsResolveFQDNRetryTime; extern bool tsStreamCoverage; +extern int8_t tsS3EpNum; extern bool tsExperimental; // #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)