Merge pull request #29289 from taosdata/fix/TD-33270-2

ci(stream):add stream unit test
This commit is contained in:
Shengliang Guan 2024-12-24 13:40:38 +08:00 committed by GitHub
commit 46c00ce4a6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 132 additions and 2 deletions

View File

@ -293,6 +293,7 @@ extern int32_t tsMaxStreamBackendCache;
extern int32_t tsPQSortMemThreshold; extern int32_t tsPQSortMemThreshold;
extern int32_t tsResolveFQDNRetryTime; extern int32_t tsResolveFQDNRetryTime;
extern bool tsStreamCoverage; extern bool tsStreamCoverage;
extern int8_t tsS3EpNum;
extern bool tsExperimental; extern bool tsExperimental;
// #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) // #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)

View File

@ -252,6 +252,8 @@ void chkptFailedByRetrieveReqToSource(SStreamTask* pTask, int64_t checkpointId);
int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type); int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type);
int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray** ppNotSendList); int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray** ppNotSendList);
int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName);
int32_t uploadCheckpointToS3(const char* id, const char* path);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -24,7 +24,6 @@ static int32_t streamTaskUploadCheckpoint(const char* id, const char* path, int6
#ifdef BUILD_NO_CALL #ifdef BUILD_NO_CALL
static int32_t deleteCheckpoint(const char* id); static int32_t deleteCheckpoint(const char* id);
#endif #endif
static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName);
static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask); static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask);
static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId,
int32_t transId, int32_t srcTaskId); int32_t transId, int32_t srcTaskId);
@ -1355,7 +1354,7 @@ void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) {
} }
} }
static int32_t uploadCheckpointToS3(const char* id, const char* path) { int32_t uploadCheckpointToS3(const char* id, const char* path) {
int32_t code = 0; int32_t code = 0;
int32_t nBytes = 0; int32_t nBytes = 0;
/* /*

View File

@ -1,6 +1,8 @@
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include "tstream.h" #include "tstream.h"
#include "streamInt.h" #include "streamInt.h"
#include "tcs.h"
#include "tglobal.h"
#pragma GCC diagnostic push #pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings" #pragma GCC diagnostic ignored "-Wwrite-strings"
@ -217,6 +219,10 @@ TEST(StreamTaskAlreadySendTriggerTest, AlreadySendTrigger) {
taosArrayDestroy(array); taosArrayDestroy(array);
} }
int32_t sendReq1111(const SEpSet *pEpSet, SRpcMsg *pMsg) {
return TSDB_CODE_SUCCESS;
}
TEST(ChkptTriggerRecvMonitorHelperTest, chkptTriggerRecvMonitorHelper) { TEST(ChkptTriggerRecvMonitorHelperTest, chkptTriggerRecvMonitorHelper) {
SStreamTask* pTask = NULL; SStreamTask* pTask = NULL;
int64_t uid = 2222222222222; int64_t uid = 2222222222222;
@ -239,6 +245,11 @@ TEST(ChkptTriggerRecvMonitorHelperTest, chkptTriggerRecvMonitorHelper) {
pTask->chkInfo.pActiveInfo->transId = 4561111; pTask->chkInfo.pActiveInfo->transId = 4561111;
pTask->chkInfo.startTs = 11111; pTask->chkInfo.startTs = 11111;
SStreamTask upTask;
upTask = *pTask;
streamTaskSetUpstreamInfo(pTask, &upTask);
streamTaskSetStatusReady(pTask); streamTaskSetStatusReady(pTask);
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
ASSERT_EQ(code, TSDB_CODE_SUCCESS); ASSERT_EQ(code, TSDB_CODE_SUCCESS);
@ -254,7 +265,21 @@ TEST(ChkptTriggerRecvMonitorHelperTest, chkptTriggerRecvMonitorHelper) {
taosArrayPush(pTask->chkInfo.pActiveInfo->pDispatchTriggerList, &triggerInfo); taosArrayPush(pTask->chkInfo.pActiveInfo->pDispatchTriggerList, &triggerInfo);
STaskCheckpointReadyInfo readyInfo;
readyInfo.upstreamNodeId = 789111;
void* pBuf = rpcMallocCont(sizeof(SMsgHead) + 1);
initRpcMsg(&readyInfo.msg, 0, pBuf, sizeof(SMsgHead) + 1);
taosArrayPush(pTask->chkInfo.pActiveInfo->pReadyMsgList, &readyInfo);
pTask->chkInfo.pActiveInfo->dispatchTrigger = true; pTask->chkInfo.pActiveInfo->dispatchTrigger = true;
SMsgCb msgCb = {0};
msgCb.sendReqFp = sendReq1111;
msgCb.mgmt = (SMgmtWrapper*)(&msgCb); // hack
tmsgSetDefault(&msgCb);
SArray* array1 = NULL; SArray* array1 = NULL;
code = chkptTriggerRecvMonitorHelper(pTask, NULL, &array1); code = chkptTriggerRecvMonitorHelper(pTask, NULL, &array1);
EXPECT_EQ(code, TSDB_CODE_SUCCESS); EXPECT_EQ(code, TSDB_CODE_SUCCESS);
@ -268,3 +293,106 @@ TEST(ChkptTriggerRecvMonitorHelperTest, chkptTriggerRecvMonitorHelper) {
taosArrayDestroy(array); taosArrayDestroy(array);
taosArrayDestroy(array1); taosArrayDestroy(array1);
} }
TEST(StreamTaskSendCheckpointTriggerMsgTest, SendCheckpointTriggerMsgSuccessTest) {
SStreamTask* pTask = NULL;
int64_t uid = 2222222222222;
SArray* array = taosArrayInit(4, POINTER_BYTES);
int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array,
false, 1, &pTask);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
initTaskLock(pTask);
const char *path = "/tmp/SendCheckpointTriggerMsgSuccessTest/stream";
code = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL, &pTask->pMeta);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
SRpcHandleInfo rpcInfo;
int32_t ret = streamTaskSendCheckpointTriggerMsg(pTask, 123, 456, &rpcInfo, code);
EXPECT_EQ(ret, TSDB_CODE_SUCCESS);
}
TEST(streamTaskBuildCheckpointTest, streamTaskBuildCheckpointFnTest) {
SStreamTask* pTask = NULL;
int64_t uid = 2222222222222;
SArray* array = taosArrayInit(4, POINTER_BYTES);
int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array,
false, 1, &pTask);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
initTaskLock(pTask);
const char *path = "/tmp/streamTaskBuildCheckpoinTest/stream";
code = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL, &pTask->pMeta);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
SStreamState *pState = streamStateOpen((char *)path, pTask, 0, 0);
ASSERT(pState != NULL);
pTask->pBackend = pState->pTdbState->pOwner->pBackend;
code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
char a[] = "localhost";
memcpy(tsSnodeAddress, a, sizeof(a));
int32_t ret = streamTaskBuildCheckpoint(pTask);
EXPECT_NE(ret, TSDB_CODE_SUCCESS);
}
int32_t s3GetObjectToFileTest(const char *object_name, const char *fileName) {
return TSDB_CODE_SUCCESS;
}
TEST(sstreamTaskGetTriggerRecvStatusTest, streamTaskGetTriggerRecvStatusFnTest) {
SStreamTask* pTask = NULL;
int64_t uid = 2222222222222;
SArray* array = taosArrayInit(4, POINTER_BYTES);
int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array,
false, 1, &pTask);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
initTaskLock(pTask);
SStreamTask upTask;
upTask = *pTask;
code = streamTaskSetUpstreamInfo(pTask, &upTask);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
code = streamTaskSetUpstreamInfo(pTask, &upTask);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
int32_t recv = 0;
int32_t total = 0;
pTask->info.taskLevel = TASK_LEVEL__SOURCE;
streamTaskGetTriggerRecvStatus(pTask, &recv, &total);
EXPECT_EQ(total, 1);
pTask->info.taskLevel = TASK_LEVEL__AGG;
streamTaskGetTriggerRecvStatus(pTask, &recv, &total);
EXPECT_EQ(total, 2);
code = streamTaskDownloadCheckpointData("123", "/root/download", 123);
EXPECT_NE(code, TSDB_CODE_SUCCESS);
tcsInit();
extern int8_t tsS3EpNum;
tsS3EpNum = 1;
code = uploadCheckpointToS3("123", "/tmp/backend5/stream");
EXPECT_EQ(code, TSDB_CODE_SUCCESS);
code = downloadCheckpointByNameS3("123", "/root/download", "");
EXPECT_NE(code, TSDB_CODE_OUT_OF_RANGE);
}