diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 584c4b5775..501f1cabc1 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -293,6 +293,7 @@ extern int32_t tsMaxStreamBackendCache; extern int32_t tsPQSortMemThreshold; extern int32_t tsResolveFQDNRetryTime; extern bool tsStreamCoverage; +extern int8_t tsS3EpNum; extern bool tsExperimental; // #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 41ac0117f3..7af64c041d 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -252,6 +252,8 @@ void chkptFailedByRetrieveReqToSource(SStreamTask* pTask, int64_t checkpointId); int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type); int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray** ppNotSendList); +int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName); +int32_t uploadCheckpointToS3(const char* id, const char* path); #ifdef __cplusplus } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 0ec66cd2ce..ebde9fe50e 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -24,7 +24,6 @@ static int32_t streamTaskUploadCheckpoint(const char* id, const char* path, int6 #ifdef BUILD_NO_CALL static int32_t deleteCheckpoint(const char* id); #endif -static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName); static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask); static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId, int32_t srcTaskId); @@ -1355,7 +1354,7 @@ void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) { } } -static int32_t uploadCheckpointToS3(const char* id, const char* path) { +int32_t uploadCheckpointToS3(const char* id, const char* path) { int32_t code = 0; int32_t nBytes = 0; /* diff --git a/source/libs/stream/test/streamCheckPointTest.cpp b/source/libs/stream/test/streamCheckPointTest.cpp index 80dd3ec142..c8297d56b7 100644 --- a/source/libs/stream/test/streamCheckPointTest.cpp +++ b/source/libs/stream/test/streamCheckPointTest.cpp @@ -1,6 +1,8 @@ #include #include "tstream.h" #include "streamInt.h" +#include "tcs.h" +#include "tglobal.h" #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wwrite-strings" @@ -217,6 +219,10 @@ TEST(StreamTaskAlreadySendTriggerTest, AlreadySendTrigger) { taosArrayDestroy(array); } +int32_t sendReq1111(const SEpSet *pEpSet, SRpcMsg *pMsg) { + return TSDB_CODE_SUCCESS; +} + TEST(ChkptTriggerRecvMonitorHelperTest, chkptTriggerRecvMonitorHelper) { SStreamTask* pTask = NULL; int64_t uid = 2222222222222; @@ -239,6 +245,11 @@ TEST(ChkptTriggerRecvMonitorHelperTest, chkptTriggerRecvMonitorHelper) { pTask->chkInfo.pActiveInfo->transId = 4561111; pTask->chkInfo.startTs = 11111; + SStreamTask upTask; + upTask = *pTask; + streamTaskSetUpstreamInfo(pTask, &upTask); + + streamTaskSetStatusReady(pTask); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); ASSERT_EQ(code, TSDB_CODE_SUCCESS); @@ -254,7 +265,21 @@ TEST(ChkptTriggerRecvMonitorHelperTest, chkptTriggerRecvMonitorHelper) { taosArrayPush(pTask->chkInfo.pActiveInfo->pDispatchTriggerList, &triggerInfo); + STaskCheckpointReadyInfo readyInfo; + readyInfo.upstreamNodeId = 789111; + void* pBuf = rpcMallocCont(sizeof(SMsgHead) + 1); + + initRpcMsg(&readyInfo.msg, 0, pBuf, sizeof(SMsgHead) + 1); + taosArrayPush(pTask->chkInfo.pActiveInfo->pReadyMsgList, &readyInfo); + + pTask->chkInfo.pActiveInfo->dispatchTrigger = true; + + SMsgCb msgCb = {0}; + msgCb.sendReqFp = sendReq1111; + msgCb.mgmt = (SMgmtWrapper*)(&msgCb); // hack + tmsgSetDefault(&msgCb); + SArray* array1 = NULL; code = chkptTriggerRecvMonitorHelper(pTask, NULL, &array1); EXPECT_EQ(code, TSDB_CODE_SUCCESS); @@ -268,3 +293,106 @@ TEST(ChkptTriggerRecvMonitorHelperTest, chkptTriggerRecvMonitorHelper) { taosArrayDestroy(array); taosArrayDestroy(array1); } + +TEST(StreamTaskSendCheckpointTriggerMsgTest, SendCheckpointTriggerMsgSuccessTest) { + SStreamTask* pTask = NULL; + int64_t uid = 2222222222222; + SArray* array = taosArrayInit(4, POINTER_BYTES); + int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array, + false, 1, &pTask); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + initTaskLock(pTask); + + const char *path = "/tmp/SendCheckpointTriggerMsgSuccessTest/stream"; + code = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL, &pTask->pMeta); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + SRpcHandleInfo rpcInfo; + + int32_t ret = streamTaskSendCheckpointTriggerMsg(pTask, 123, 456, &rpcInfo, code); + + EXPECT_EQ(ret, TSDB_CODE_SUCCESS); +} + +TEST(streamTaskBuildCheckpointTest, streamTaskBuildCheckpointFnTest) { + SStreamTask* pTask = NULL; + int64_t uid = 2222222222222; + SArray* array = taosArrayInit(4, POINTER_BYTES); + int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array, + false, 1, &pTask); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + initTaskLock(pTask); + + const char *path = "/tmp/streamTaskBuildCheckpoinTest/stream"; + code = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL, &pTask->pMeta); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + SStreamState *pState = streamStateOpen((char *)path, pTask, 0, 0); + ASSERT(pState != NULL); + + pTask->pBackend = pState->pTdbState->pOwner->pBackend; + + code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + char a[] = "localhost"; + memcpy(tsSnodeAddress, a, sizeof(a)); + + int32_t ret = streamTaskBuildCheckpoint(pTask); + + EXPECT_NE(ret, TSDB_CODE_SUCCESS); +} + +int32_t s3GetObjectToFileTest(const char *object_name, const char *fileName) { + return TSDB_CODE_SUCCESS; +} + +TEST(sstreamTaskGetTriggerRecvStatusTest, streamTaskGetTriggerRecvStatusFnTest) { + SStreamTask* pTask = NULL; + int64_t uid = 2222222222222; + SArray* array = taosArrayInit(4, POINTER_BYTES); + int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array, + false, 1, &pTask); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + initTaskLock(pTask); + + SStreamTask upTask; + upTask = *pTask; + code = streamTaskSetUpstreamInfo(pTask, &upTask); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + code = streamTaskSetUpstreamInfo(pTask, &upTask); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + int32_t recv = 0; + int32_t total = 0; + pTask->info.taskLevel = TASK_LEVEL__SOURCE; + streamTaskGetTriggerRecvStatus(pTask, &recv, &total); + EXPECT_EQ(total, 1); + + pTask->info.taskLevel = TASK_LEVEL__AGG; + streamTaskGetTriggerRecvStatus(pTask, &recv, &total); + EXPECT_EQ(total, 2); + + code = streamTaskDownloadCheckpointData("123", "/root/download", 123); + EXPECT_NE(code, TSDB_CODE_SUCCESS); + + tcsInit(); + extern int8_t tsS3EpNum; + tsS3EpNum = 1; + + code = uploadCheckpointToS3("123", "/tmp/backend5/stream"); + EXPECT_EQ(code, TSDB_CODE_SUCCESS); + + code = downloadCheckpointByNameS3("123", "/root/download", ""); + EXPECT_NE(code, TSDB_CODE_OUT_OF_RANGE); +}