From e547ed36990f45e24219156bb5b9362e69a38f39 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 11 Jul 2023 16:30:14 +0800 Subject: [PATCH 1/2] rm expire checkpoint --- source/libs/stream/inc/streamBackendRocksdb.h | 1 + source/libs/stream/src/streamBackendRocksdb.c | 68 ++++++++++++++++--- source/libs/stream/src/streamMeta.c | 9 ++- 3 files changed, 69 insertions(+), 9 deletions(-) diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index aa48f5cb29..17a28b8b82 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -47,6 +47,7 @@ typedef struct { void* streamBackendInit(const char* path); void streamBackendCleanup(void* arg); void streamBackendHandleCleanup(void* arg); +int32_t streamBackendLoadCheckpointInfo(void* pMeta); int32_t streamBackendDoCheckpoint(void* pMeta, uint64_t checkpointId); SListNode* streamBackendAddCompare(void* backend, void* arg); void streamBackendDelCompare(void* backend, void* arg); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 9ceea4545b..bfcedd2d53 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -393,7 +393,7 @@ int32_t delObsoleteCheckpoint(void* arg, const char* path) { for (int i = 0; i < taosArrayGetSize(checkpointDel); i++) { int64_t id = *(int64_t*)taosArrayGet(checkpointDel, i); char tbuf[256] = {0}; - sprintf(tbuf, "%s/checkpoint_%" PRId64 "", path, id); + sprintf(tbuf, "%s/checkpoint-%" PRId64 "", path, id); if (taosIsDir(tbuf)) { taosRemoveDir(tbuf); } @@ -402,11 +402,63 @@ int32_t delObsoleteCheckpoint(void* arg, const char* path) { return 0; } +static int32_t compareCheckpoint(const void* a, const void* b) { + int64_t x = *(int64_t*)a; + int64_t y = *(int64_t*)b; + return x < y ? -1 : 1; +} + +int32_t streamBackendLoadCheckpointInfo(void* arg) { + SStreamMeta* pMeta = arg; + int32_t code = 0; + + int32_t len = strlen(pMeta->path) + 30; + char* checkpointPath = taosMemoryCalloc(1, len); + sprintf(checkpointPath, "%s/%s", pMeta->path, "checkpoints"); + + if (!taosDirExist(checkpointPath)) { + return 0; + // no checkpoint, nothing to load + } + + TdDirPtr pDir = taosOpenDir(checkpointPath); + if (pDir == NULL) return 0; + + TdDirEntryPtr de = NULL; + SArray* suffix = taosArrayInit(4, sizeof(int64_t)); + + while ((de = taosReadDir(pDir)) != NULL) { + if (strcmp(taosGetDirEntryName(de), ".") == 0 || strcmp(taosGetDirEntryName(de), "..") == 0) continue; + + if (taosDirEntryIsDir(de)) { + char checkpointPrefix[32] = {0}; + int64_t checkpointId = 0; + + int ret = sscanf(taosGetDirEntryName(de), "checkpoint-%" PRId64 "", &checkpointId); + if (ret == 1) { + taosArrayPush(suffix, &checkpointId); + } + } else { + continue; + } + } + taosArraySort(suffix, compareCheckpoint); + + for (int i = 0; i < taosArrayGetSize(suffix); i++) { + int64_t id = *(int64_t*)taosArrayGet(suffix, i); + taosArrayPush(pMeta->checkpointSaved, &id); + } + + taosArrayDestroy(suffix); + taosCloseDir(&pDir); + taosMemoryFree(checkpointPath); + return 0; +} int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { - SStreamMeta* pMeta = arg; - int64_t backendRid = pMeta->streamBackendRid; - int64_t st = taosGetTimestampMs(); - int32_t code = -1; + SStreamMeta* pMeta = arg; + int64_t backendRid = pMeta->streamBackendRid; + int64_t st = taosGetTimestampMs(); + int32_t code = -1; char path[256] = {0}; sprintf(path, "%s/%s", pMeta->path, "checkpoints"); @@ -417,7 +469,7 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { } char checkpointDir[256] = {0}; - snprintf(checkpointDir, tListLen(checkpointDir),"%s/checkpoint_%" PRIu64, path, checkpointId); + snprintf(checkpointDir, tListLen(checkpointDir), "%s/checkpoint-%" PRId64, path, checkpointId); SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid); if (pHandle == NULL) { @@ -1203,8 +1255,8 @@ bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len } return true; } -rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* pChkptFileName, rocksdb_snapshot_t** snapshot, - rocksdb_readoptions_t** readOpt) { +rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* pChkptFileName, + rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt) { int idx = streamStateGetCfIdx(pState, pChkptFileName); SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 7d4e7e4615..b2dabd6356 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -100,6 +100,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF if (pMeta->streamBackend == NULL) { goto _err; } + pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); pMeta->pTaskBackendUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); @@ -108,6 +109,12 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->checkpointCap = 4; taosInitRWLatch(&pMeta->checkpointDirLock); + code = streamBackendLoadCheckpointInfo(pMeta); + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(code); + goto _err; + } + taosMemoryFree(streamPath); taosInitRWLatch(&pMeta->lock); @@ -310,7 +317,7 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { qDebug("s-task:0x%x set task status:%s", taskId, streamGetTaskStatusStr(TASK_STATUS__DROPPING)); - while(1) { + while (1) { taosRLockLatch(&pMeta->lock); ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); From 6298f17c45921aaaf8c34b71eb2b4d95d8da0170 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Tue, 11 Jul 2023 19:21:27 +0800 Subject: [PATCH 2/2] delete invalid code --- include/libs/executor/executor.h | 3 --- source/libs/executor/inc/querytask.h | 2 -- source/libs/executor/src/executor.c | 6 ------ source/libs/executor/src/timewindowoperator.c | 14 -------------- source/libs/stream/src/streamExec.c | 2 -- 5 files changed, 27 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 3bef15f3a7..fe98007109 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -99,9 +99,6 @@ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId); int32_t qSetStreamOpOpen(qTaskInfo_t tinfo); -// todo refactor -void qGetCheckpointVersion(qTaskInfo_t tinfo, int64_t* dataVer, int64_t* ckId); - /** * Set multiple input data blocks for the stream scan. * @param tinfo diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h index cdf37bcc6b..0d7c3925af 100644 --- a/source/libs/executor/inc/querytask.h +++ b/source/libs/executor/inc/querytask.h @@ -69,8 +69,6 @@ typedef struct { SVersionRange fillHistoryVer; STimeWindow fillHistoryWindow; SStreamState* pState; - int64_t dataVersion; - int64_t checkPointId; } SStreamTaskInfo; struct SExecTaskInfo { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 531be3ea62..f3d4882f00 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -223,12 +223,6 @@ int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) { return code; } -void qGetCheckpointVersion(qTaskInfo_t tinfo, int64_t* dataVer, int64_t* ckId) { - SExecTaskInfo* pTaskInfo = tinfo; - *dataVer = pTaskInfo->streamInfo.dataVersion; - *ckId = pTaskInfo->streamInfo.checkPointId; -} - int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) { if (tinfo == NULL) { return TSDB_CODE_APP_ERROR; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index fd04bdac04..7d90c7e644 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2318,11 +2318,6 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN return startPos; } -static void setStreamDataVersion(SExecTaskInfo* pTaskInfo, int64_t version, int64_t ckId) { - pTaskInfo->streamInfo.dataVersion = version; - pTaskInfo->streamInfo.checkPointId = ckId; -} - static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t groupId, SSHashObj* pUpdatedMap) { SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperatorInfo->info; @@ -2823,7 +2818,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } else if (pBlock->info.type == STREAM_CHECKPOINT) { doStreamIntervalSaveCheckpoint(pOperator); pAPI->stateStore.streamStateCommit(pInfo->pState); - setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId); copyDataBlock(pInfo->pCheckpointRes, pBlock); pOperator->status = OP_RES_TO_RETURN; qDebug("===stream===return data:%s. recv datablock num:%" PRIu64, @@ -3086,7 +3080,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, if (res == TSDB_CODE_SUCCESS) { doStreamIntervalDecodeOpState(buff, pOperator); taosMemoryFree(buff); - setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId); } return pOperator; @@ -3953,7 +3946,6 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { } else if (pBlock->info.type == STREAM_CHECKPOINT) { doStreamSessionSaveCheckpoint(pOperator); pAggSup->stateStore.streamStateCommit(pAggSup->pState); - setStreamDataVersion(pOperator->pTaskInfo, pInfo->dataVersion, pAggSup->pState->checkPointId); copyDataBlock(pInfo->pCheckpointRes, pBlock); continue; } else { @@ -4154,7 +4146,6 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh if (res == TSDB_CODE_SUCCESS) { doStreamSessionDecodeOpState(buff, pOperator); taosMemoryFree(buff); - setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->streamAggSup.pState->checkPointId); } setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true, @@ -4256,7 +4247,6 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { } else if (pBlock->info.type == STREAM_CHECKPOINT) { doStreamSessionSaveCheckpoint(pOperator); pAggSup->stateStore.streamStateCommit(pAggSup->pState); - setStreamDataVersion(pOperator->pTaskInfo, pInfo->dataVersion, pAggSup->pState->checkPointId); pOperator->status = OP_RES_TO_RETURN; continue; } else { @@ -4681,7 +4671,6 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { } else if (pBlock->info.type == STREAM_CHECKPOINT) { doStreamSessionSaveCheckpoint(pOperator); pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState); - setStreamDataVersion(pOperator->pTaskInfo, pInfo->dataVersion, pInfo->streamAggSup.pState->checkPointId); copyDataBlock(pInfo->pCheckpointRes, pBlock); continue; } else { @@ -4878,7 +4867,6 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys if (res == TSDB_CODE_SUCCESS) { doStreamStateDecodeOpState(buff, pOperator); taosMemoryFree(buff); - setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->streamAggSup.pState->checkPointId); } setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED, @@ -5548,7 +5536,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { } else if (pBlock->info.type == STREAM_CHECKPOINT) { doStreamIntervalSaveCheckpoint(pOperator); pAPI->stateStore.streamStateCommit(pInfo->pState); - setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId); pInfo->reCkBlock = true; copyDataBlock(pInfo->pCheckpointRes, pBlock); qDebug("===stream===return data:single interval. recv datablock num:%" PRIu64, pInfo->numOfDatapack); @@ -5735,7 +5722,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys if (res == TSDB_CODE_SUCCESS) { doStreamIntervalDecodeOpState(buff, pOperator); taosMemoryFree(buff); - setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId); } initIntervalDownStream(downstream, pPhyNode->type, pInfo); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index d4b6f0927d..73e4c00627 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -18,8 +18,6 @@ // maximum allowed processed block batches. One block may include several submit blocks #define MAX_STREAM_RESULT_DUMP_THRESHOLD 100 -static int32_t updateCheckPointInfo(SStreamTask* pTask, int64_t checkpointId); - bool streamTaskShouldStop(const SStreamStatus* pStatus) { int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus); return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING);