diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index a2be706dc0..7a4401827c 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -389,7 +389,6 @@ typedef struct SStateStore { int32_t (*streamStateFillGetGroupKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); int32_t (*streamStateGetKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); - void (*streamStateSetFillInfo)(SStreamState* pState); void (*streamStateClearExpiredState)(SStreamState* pState); int32_t (*streamStateSessionAddIfNotExist)(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, @@ -455,7 +454,6 @@ typedef struct SStateStore { int32_t (*streamStateBegin)(SStreamState* pState); void (*streamStateCommit)(SStreamState* pState); void (*streamStateDestroy)(SStreamState* pState, bool remove); - int32_t (*streamStateDeleteCheckPoint)(SStreamState* pState, TSKEY mark); void (*streamStateReloadInfo)(SStreamState* pState, TSKEY ts); void (*streamStateCopyBackend)(SStreamState* src, SStreamState* dst); } SStateStore; diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 2179547352..b4e0087b1a 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -34,7 +34,6 @@ void streamStateClose(SStreamState* pState, bool remove); int32_t streamStateBegin(SStreamState* pState); void streamStateCommit(SStreamState* pState); void streamStateDestroy(SStreamState* pState, bool remove); -int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark); int32_t streamStateDelTaskDb(SStreamState* pState); int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); @@ -108,7 +107,6 @@ int32_t streamStateFillGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, con int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); // twa -void streamStateSetFillInfo(SStreamState* pState); void streamStateClearExpiredState(SStreamState* pState); void streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur); diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index f1f5b00e38..f47c308e18 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -67,7 +67,6 @@ SStreamSnapshot* getSnapshot(SStreamFileState* pFileState); void flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState); int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId); -int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list); int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark); int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState); void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts); diff --git a/source/dnode/snode/src/snodeInitApi.c b/source/dnode/snode/src/snodeInitApi.c index 4fe4333534..68dc981338 100644 --- a/source/dnode/snode/src/snodeInitApi.c +++ b/source/dnode/snode/src/snodeInitApi.c @@ -68,7 +68,6 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateFillGetGroupKVByCur = streamStateFillGetGroupKVByCur; pStore->streamStateGetKVByCur = streamStateGetKVByCur; - pStore->streamStateSetFillInfo = streamStateSetFillInfo; pStore->streamStateClearExpiredState = streamStateClearExpiredState; pStore->streamStateSessionAddIfNotExist = streamStateSessionAddIfNotExist; @@ -117,7 +116,6 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateBegin = streamStateBegin; pStore->streamStateCommit = streamStateCommit; pStore->streamStateDestroy = streamStateDestroy; - pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint; pStore->streamStateReloadInfo = streamStateReloadInfo; pStore->streamStateCopyBackend = streamStateCopyBackend; } diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index df6fb17730..41e6c6c2c5 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -191,7 +191,6 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateFillGetGroupKVByCur = streamStateFillGetGroupKVByCur; pStore->streamStateGetKVByCur = streamStateGetKVByCur; - pStore->streamStateSetFillInfo = streamStateSetFillInfo; pStore->streamStateClearExpiredState = streamStateClearExpiredState; pStore->streamStateSessionAddIfNotExist = streamStateSessionAddIfNotExist; @@ -243,7 +242,6 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateBegin = streamStateBegin; pStore->streamStateCommit = streamStateCommit; pStore->streamStateDestroy = streamStateDestroy; - pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint; pStore->streamStateReloadInfo = streamStateReloadInfo; pStore->streamStateCopyBackend = streamStateCopyBackend; } diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 7af64c041d..f922a5e03e 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -254,6 +254,9 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray** ppNotSendList); int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName); int32_t uploadCheckpointToS3(const char* id, const char* path); +int32_t deleteCheckpointFile(const char* id, const char* name); +int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t checkpointId, SStreamDataBlock* pBlock, + int32_t transId); #ifdef __cplusplus } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index ebde9fe50e..d3eba382c9 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -19,7 +19,6 @@ #include "tcs.h" static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName); -static int32_t deleteCheckpointFile(const char* id, const char* name); static int32_t streamTaskUploadCheckpoint(const char* id, const char* path, int64_t checkpointId); #ifdef BUILD_NO_CALL static int32_t deleteCheckpoint(const char* id); @@ -230,8 +229,8 @@ int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStream return code; } -static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t checkpointId, SStreamDataBlock* pBlock, - int32_t transId) { +int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t checkpointId, SStreamDataBlock* pBlock, + int32_t transId) { int32_t code = 0; int32_t vgId = pTask->pMeta->vgId; int32_t taskLevel = pTask->info.taskLevel; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 89f0ea9e1f..7259c0e49a 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -546,10 +546,6 @@ void streamStateDestroy(SStreamState* pState, bool remove) { taosMemoryFreeClear(pState); } -int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark) { - return deleteExpiredCheckPoint(pState->pFileState, mark); -} - void streamStateReloadInfo(SStreamState* pState, TSKEY ts) { streamFileStateReloadInfo(pState->pFileState, ts); } void streamStateCopyBackend(SStreamState* src, SStreamState* dst) { @@ -617,8 +613,6 @@ int32_t streamStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** void streamStateClearExpiredState(SStreamState* pState) { clearExpiredState(pState->pFileState); } -void streamStateSetFillInfo(SStreamState* pState) { setFillInfo(pState->pFileState); } - int32_t streamStateGetPrev(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen, int32_t* pWinCode) { return getRowStatePrevRow(pState->pFileState, pKey, pResKey, pVal, pVLen, pWinCode); diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 05edad0f5f..aaff58d1b4 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -667,18 +667,6 @@ void deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLe } } -int32_t resetRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen) { - int32_t code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, keyLen); - int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey); - if (pFileState->searchBuff != NULL) { - deleteHashSortRowBuff(pFileState, pKey); - } - if (code_buff == TSDB_CODE_SUCCESS || code_file == TSDB_CODE_SUCCESS) { - return TSDB_CODE_SUCCESS; - } - return TSDB_CODE_FAILED; -} - static int32_t recoverSessionRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -868,10 +856,6 @@ int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId return streamDefaultDel_rocksdb(pFileState->pFileStore, keyBuf); } -int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list) { - return streamDefaultIterGet_rocksdb(pFileState->pFileStore, TASK_KEY, NULL, list); -} - int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { int32_t code = TSDB_CODE_SUCCESS; int64_t maxCheckPointId = 0; @@ -1227,10 +1211,6 @@ SSHashObj* getGroupIdCache(SStreamFileState* pFileState) { return pFileState->pGroupIdMap; } -void setFillInfo(SStreamFileState* pFileState) { - pFileState->hasFillCatch = false; -} - void clearExpiredState(SStreamFileState* pFileState) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -1261,6 +1241,7 @@ _end: } } +#ifdef BUILD_NO_CALL int32_t getStateSearchRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen, int32_t* pWinCode) { int32_t code = TSDB_CODE_SUCCESS; @@ -1328,6 +1309,7 @@ _end: } return code; } +#endif int32_t getRowStatePrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** ppVal, int32_t* pVLen, int32_t* pWinCode) { diff --git a/source/libs/stream/test/streamCheckPointTest.cpp b/source/libs/stream/test/streamCheckPointTest.cpp index c8297d56b7..0f6370d1b6 100644 --- a/source/libs/stream/test/streamCheckPointTest.cpp +++ b/source/libs/stream/test/streamCheckPointTest.cpp @@ -390,9 +390,78 @@ TEST(sstreamTaskGetTriggerRecvStatusTest, streamTaskGetTriggerRecvStatusFnTest) extern int8_t tsS3EpNum; tsS3EpNum = 1; - code = uploadCheckpointToS3("123", "/tmp/backend5/stream"); + code = uploadCheckpointToS3("123", "/tmp/backend5/stream/stream"); EXPECT_EQ(code, TSDB_CODE_SUCCESS); code = downloadCheckpointByNameS3("123", "/root/download", ""); EXPECT_NE(code, TSDB_CODE_OUT_OF_RANGE); + + code = deleteCheckpointFile("aaa123", "bbb"); + EXPECT_NE(code, TSDB_CODE_OUT_OF_RANGE); } + +TEST(doCheckBeforeHandleChkptTriggerTest, doCheckBeforeHandleChkptTriggerFnTest) { + SStreamTask* pTask = NULL; + int64_t uid = 2222222222222; + SArray* array = taosArrayInit(4, POINTER_BYTES); + int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array, + false, 1, &pTask); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + initTaskLock(pTask); + + const char *path = "/tmp/doCheckBeforeHandleChkptTriggerTest/stream"; + code = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL, &pTask->pMeta); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + SStreamState *pState = streamStateOpen((char *)path, pTask, 0, 0); + ASSERT(pState != NULL); + + pTask->pBackend = pState->pTdbState->pOwner->pBackend; + + code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + pTask->chkInfo.checkpointId = 123; + code = doCheckBeforeHandleChkptTrigger(pTask, 100, NULL, 0); + ASSERT_EQ(code, TSDB_CODE_STREAM_INVLD_CHKPT); + + pTask->chkInfo.pActiveInfo->failedId = 223; + code = doCheckBeforeHandleChkptTrigger(pTask, 200, NULL, 0); + ASSERT_EQ(code, TSDB_CODE_STREAM_INVLD_CHKPT); + + SStreamDataBlock block; + block.srcTaskId = 456; + SStreamTask upTask; + upTask = *pTask; + upTask.id.taskId = 456; + streamTaskSetUpstreamInfo(pTask, &upTask); + pTask->chkInfo.pActiveInfo->failedId = 23; + code = doCheckBeforeHandleChkptTrigger(pTask, 123, &block, 0); + ASSERT_EQ(code, TSDB_CODE_STREAM_INVLD_CHKPT); + + streamTaskSetUpstreamInfo(pTask, &upTask); + streamTaskSetStatusReady(pTask); + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + pTask->chkInfo.pActiveInfo->activeId = 223; + + STaskCheckpointReadyInfo readyInfo; + readyInfo.upstreamTaskId = 4567; + block.srcTaskId = 4567; + void* pBuf = rpcMallocCont(sizeof(SMsgHead) + 1); + + initRpcMsg(&readyInfo.msg, 0, pBuf, sizeof(SMsgHead) + 1); + taosArrayPush(pTask->chkInfo.pActiveInfo->pReadyMsgList, &readyInfo); + code = doCheckBeforeHandleChkptTrigger(pTask, 223, &block, 0); + ASSERT_NE(code, TSDB_CODE_SUCCESS); + + pTask->chkInfo.pActiveInfo->allUpstreamTriggerRecv = 1; + code = doCheckBeforeHandleChkptTrigger(pTask, 223, &block, 0); + ASSERT_NE(code, TSDB_CODE_SUCCESS); + + pTask->chkInfo.pActiveInfo->activeId = 1111; + code = doCheckBeforeHandleChkptTrigger(pTask, 223, &block, 0); + ASSERT_EQ(code, TSDB_CODE_STREAM_INVLD_CHKPT); +} \ No newline at end of file