ci(test):delete invalid code&& add ci

This commit is contained in:
54liuyao 2024-12-26 11:16:02 +08:00
parent dcc2645691
commit f6500b3551
10 changed files with 77 additions and 39 deletions

View File

@ -389,7 +389,6 @@ typedef struct SStateStore {
int32_t (*streamStateFillGetGroupKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t (*streamStateGetKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
void (*streamStateSetFillInfo)(SStreamState* pState);
void (*streamStateClearExpiredState)(SStreamState* pState);
int32_t (*streamStateSessionAddIfNotExist)(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
@ -455,7 +454,6 @@ typedef struct SStateStore {
int32_t (*streamStateBegin)(SStreamState* pState);
void (*streamStateCommit)(SStreamState* pState);
void (*streamStateDestroy)(SStreamState* pState, bool remove);
int32_t (*streamStateDeleteCheckPoint)(SStreamState* pState, TSKEY mark);
void (*streamStateReloadInfo)(SStreamState* pState, TSKEY ts);
void (*streamStateCopyBackend)(SStreamState* src, SStreamState* dst);
} SStateStore;

View File

@ -34,7 +34,6 @@ void streamStateClose(SStreamState* pState, bool remove);
int32_t streamStateBegin(SStreamState* pState);
void streamStateCommit(SStreamState* pState);
void streamStateDestroy(SStreamState* pState, bool remove);
int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark);
int32_t streamStateDelTaskDb(SStreamState* pState);
int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
@ -108,7 +107,6 @@ int32_t streamStateFillGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, con
int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
// twa
void streamStateSetFillInfo(SStreamState* pState);
void streamStateClearExpiredState(SStreamState* pState);
void streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur);

View File

@ -67,7 +67,6 @@ SStreamSnapshot* getSnapshot(SStreamFileState* pFileState);
void flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState);
int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId);
int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list);
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark);
int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState);
void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts);

View File

@ -68,7 +68,6 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateFillGetGroupKVByCur = streamStateFillGetGroupKVByCur;
pStore->streamStateGetKVByCur = streamStateGetKVByCur;
pStore->streamStateSetFillInfo = streamStateSetFillInfo;
pStore->streamStateClearExpiredState = streamStateClearExpiredState;
pStore->streamStateSessionAddIfNotExist = streamStateSessionAddIfNotExist;
@ -117,7 +116,6 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateBegin = streamStateBegin;
pStore->streamStateCommit = streamStateCommit;
pStore->streamStateDestroy = streamStateDestroy;
pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint;
pStore->streamStateReloadInfo = streamStateReloadInfo;
pStore->streamStateCopyBackend = streamStateCopyBackend;
}

View File

@ -191,7 +191,6 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateFillGetGroupKVByCur = streamStateFillGetGroupKVByCur;
pStore->streamStateGetKVByCur = streamStateGetKVByCur;
pStore->streamStateSetFillInfo = streamStateSetFillInfo;
pStore->streamStateClearExpiredState = streamStateClearExpiredState;
pStore->streamStateSessionAddIfNotExist = streamStateSessionAddIfNotExist;
@ -243,7 +242,6 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateBegin = streamStateBegin;
pStore->streamStateCommit = streamStateCommit;
pStore->streamStateDestroy = streamStateDestroy;
pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint;
pStore->streamStateReloadInfo = streamStateReloadInfo;
pStore->streamStateCopyBackend = streamStateCopyBackend;
}

View File

@ -254,6 +254,9 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d
int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray** ppNotSendList);
int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName);
int32_t uploadCheckpointToS3(const char* id, const char* path);
int32_t deleteCheckpointFile(const char* id, const char* name);
int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t checkpointId, SStreamDataBlock* pBlock,
int32_t transId);
#ifdef __cplusplus
}

View File

@ -19,7 +19,6 @@
#include "tcs.h"
static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName);
static int32_t deleteCheckpointFile(const char* id, const char* name);
static int32_t streamTaskUploadCheckpoint(const char* id, const char* path, int64_t checkpointId);
#ifdef BUILD_NO_CALL
static int32_t deleteCheckpoint(const char* id);
@ -230,7 +229,7 @@ int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStream
return code;
}
static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t checkpointId, SStreamDataBlock* pBlock,
int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t checkpointId, SStreamDataBlock* pBlock,
int32_t transId) {
int32_t code = 0;
int32_t vgId = pTask->pMeta->vgId;

View File

@ -546,10 +546,6 @@ void streamStateDestroy(SStreamState* pState, bool remove) {
taosMemoryFreeClear(pState);
}
int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark) {
return deleteExpiredCheckPoint(pState->pFileState, mark);
}
void streamStateReloadInfo(SStreamState* pState, TSKEY ts) { streamFileStateReloadInfo(pState->pFileState, ts); }
void streamStateCopyBackend(SStreamState* src, SStreamState* dst) {
@ -617,8 +613,6 @@ int32_t streamStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void**
void streamStateClearExpiredState(SStreamState* pState) { clearExpiredState(pState->pFileState); }
void streamStateSetFillInfo(SStreamState* pState) { setFillInfo(pState->pFileState); }
int32_t streamStateGetPrev(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen,
int32_t* pWinCode) {
return getRowStatePrevRow(pState->pFileState, pKey, pResKey, pVal, pVLen, pWinCode);

View File

@ -667,18 +667,6 @@ void deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLe
}
}
int32_t resetRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen) {
int32_t code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, keyLen);
int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
if (pFileState->searchBuff != NULL) {
deleteHashSortRowBuff(pFileState, pKey);
}
if (code_buff == TSDB_CODE_SUCCESS || code_file == TSDB_CODE_SUCCESS) {
return TSDB_CODE_SUCCESS;
}
return TSDB_CODE_FAILED;
}
static int32_t recoverSessionRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -868,10 +856,6 @@ int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId
return streamDefaultDel_rocksdb(pFileState->pFileStore, keyBuf);
}
int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list) {
return streamDefaultIterGet_rocksdb(pFileState->pFileStore, TASK_KEY, NULL, list);
}
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
int32_t code = TSDB_CODE_SUCCESS;
int64_t maxCheckPointId = 0;
@ -1227,10 +1211,6 @@ SSHashObj* getGroupIdCache(SStreamFileState* pFileState) {
return pFileState->pGroupIdMap;
}
void setFillInfo(SStreamFileState* pFileState) {
pFileState->hasFillCatch = false;
}
void clearExpiredState(SStreamFileState* pFileState) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -1261,6 +1241,7 @@ _end:
}
}
#ifdef BUILD_NO_CALL
int32_t getStateSearchRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen,
int32_t* pWinCode) {
int32_t code = TSDB_CODE_SUCCESS;
@ -1328,6 +1309,7 @@ _end:
}
return code;
}
#endif
int32_t getRowStatePrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** ppVal,
int32_t* pVLen, int32_t* pWinCode) {

View File

@ -390,9 +390,78 @@ TEST(sstreamTaskGetTriggerRecvStatusTest, streamTaskGetTriggerRecvStatusFnTest)
extern int8_t tsS3EpNum;
tsS3EpNum = 1;
code = uploadCheckpointToS3("123", "/tmp/backend5/stream");
code = uploadCheckpointToS3("123", "/tmp/backend5/stream/stream");
EXPECT_EQ(code, TSDB_CODE_SUCCESS);
code = downloadCheckpointByNameS3("123", "/root/download", "");
EXPECT_NE(code, TSDB_CODE_OUT_OF_RANGE);
code = deleteCheckpointFile("aaa123", "bbb");
EXPECT_NE(code, TSDB_CODE_OUT_OF_RANGE);
}
TEST(doCheckBeforeHandleChkptTriggerTest, doCheckBeforeHandleChkptTriggerFnTest) {
SStreamTask* pTask = NULL;
int64_t uid = 2222222222222;
SArray* array = taosArrayInit(4, POINTER_BYTES);
int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array,
false, 1, &pTask);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
initTaskLock(pTask);
const char *path = "/tmp/doCheckBeforeHandleChkptTriggerTest/stream";
code = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL, &pTask->pMeta);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
SStreamState *pState = streamStateOpen((char *)path, pTask, 0, 0);
ASSERT(pState != NULL);
pTask->pBackend = pState->pTdbState->pOwner->pBackend;
code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
pTask->chkInfo.checkpointId = 123;
code = doCheckBeforeHandleChkptTrigger(pTask, 100, NULL, 0);
ASSERT_EQ(code, TSDB_CODE_STREAM_INVLD_CHKPT);
pTask->chkInfo.pActiveInfo->failedId = 223;
code = doCheckBeforeHandleChkptTrigger(pTask, 200, NULL, 0);
ASSERT_EQ(code, TSDB_CODE_STREAM_INVLD_CHKPT);
SStreamDataBlock block;
block.srcTaskId = 456;
SStreamTask upTask;
upTask = *pTask;
upTask.id.taskId = 456;
streamTaskSetUpstreamInfo(pTask, &upTask);
pTask->chkInfo.pActiveInfo->failedId = 23;
code = doCheckBeforeHandleChkptTrigger(pTask, 123, &block, 0);
ASSERT_EQ(code, TSDB_CODE_STREAM_INVLD_CHKPT);
streamTaskSetUpstreamInfo(pTask, &upTask);
streamTaskSetStatusReady(pTask);
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
pTask->chkInfo.pActiveInfo->activeId = 223;
STaskCheckpointReadyInfo readyInfo;
readyInfo.upstreamTaskId = 4567;
block.srcTaskId = 4567;
void* pBuf = rpcMallocCont(sizeof(SMsgHead) + 1);
initRpcMsg(&readyInfo.msg, 0, pBuf, sizeof(SMsgHead) + 1);
taosArrayPush(pTask->chkInfo.pActiveInfo->pReadyMsgList, &readyInfo);
code = doCheckBeforeHandleChkptTrigger(pTask, 223, &block, 0);
ASSERT_NE(code, TSDB_CODE_SUCCESS);
pTask->chkInfo.pActiveInfo->allUpstreamTriggerRecv = 1;
code = doCheckBeforeHandleChkptTrigger(pTask, 223, &block, 0);
ASSERT_NE(code, TSDB_CODE_SUCCESS);
pTask->chkInfo.pActiveInfo->activeId = 1111;
code = doCheckBeforeHandleChkptTrigger(pTask, 223, &block, 0);
ASSERT_EQ(code, TSDB_CODE_STREAM_INVLD_CHKPT);
}