From 0bcfe11e84d57e4d13a14548237f7c7fb987a73b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 5 Aug 2024 11:55:36 +0800 Subject: [PATCH] refactor remve backend code --- include/libs/stream/tstream.h | 2 +- source/libs/stream/inc/streamBackendRocksdb.h | 3 +++ source/libs/stream/src/streamBackendRocksdb.c | 17 ++++++++++++-- source/libs/stream/src/streamTask.c | 22 ++++++++++--------- source/libs/stream/src/streamTaskSm.c | 2 +- 5 files changed, 32 insertions(+), 14 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 90cb06ff42..9c59e3f3ec 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -534,7 +534,7 @@ void tFreeStreamTask(SStreamTask* pTask); int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask); int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver); -void streamFreeTaskState(SStreamTask* pTask, ETaskStatus status); +void streamFreeTaskState(SStreamTask* pTask, int8_t remove); int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo); int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId); diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 0f158591b4..3bb4532db3 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -81,6 +81,7 @@ typedef struct { int64_t dataWritten; void* pMeta; + int8_t removeAllFiles; } STaskDbWrapper; @@ -152,6 +153,8 @@ void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId); void* taskDbAddRef(void* pTaskDb); void taskDbRemoveRef(void* pTaskDb); +void taskDbRemoveAllFiles(void* pTaskDb); + int streamStateOpenBackend(void* backend, SStreamState* pState); void streamStateCloseBackend(SStreamState* pState, bool remove); void streamStateDestroyCompar(void* arg); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 7396c6b7c6..8498c9118a 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -2331,6 +2331,15 @@ void taskDbRemoveRef(void* pTaskDb) { (void)taosReleaseRef(taskDbWrapperId, pBackend->refId); } +void taskDbRemoveAllFiles(void* pTaskDb) { + if (pTaskDb == NULL) { + return; + } + + STaskDbWrapper* pBackend = pTaskDb; + atomic_store_8(&pBackend->removeAllFiles, 1); +} + void taskDbInitOpt(STaskDbWrapper* pTaskDb) { rocksdb_env_t* env = rocksdb_create_default_env(); @@ -2573,8 +2582,7 @@ void taskDbDestroy(void* pDb, bool flush) { stDebug("succ to destroy stream backend:%p", wrapper); int8_t nCf = tListLen(ginitDict); - - if (flush) { + if (flush && wrapper->removeAllFiles == 0) { if (wrapper->db && wrapper->pCf) { rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); rocksdb_flushoptions_set_wait(flushOpt, 1); @@ -2636,6 +2644,11 @@ void taskDbDestroy(void* pDb, bool flush) { taskDbDestroyChkpOpt(wrapper); taosMemoryFree(wrapper->idstr); + + if (wrapper->removeAllFiles) { + char* err = NULL; + taosRemoveDir(wrapper->path); + } taosMemoryFree(wrapper->path); taosMemoryFree(wrapper); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index f07fd81953..90167e446e 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -275,7 +275,7 @@ void tFreeStreamTask(SStreamTask* pTask) { } streamTaskCleanupCheckInfo(&pTask->taskCheckInfo); - streamFreeTaskState(pTask, status1); + streamFreeTaskState(pTask, pTask->status.removeBackendFiles ? 1 : 0); if (pTask->pNameMap) { tSimpleHashCleanup(pTask->pNameMap); @@ -296,14 +296,14 @@ void tFreeStreamTask(SStreamTask* pTask) { taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList); pTask->outputInfo.pNodeEpsetUpdateList = NULL; - if ((pTask->status.removeBackendFiles) && (pTask->pMeta != NULL)) { - char* path = taosMemoryCalloc(1, strlen(pTask->pMeta->path) + 128); - sprintf(path, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, pTask->id.idStr); - taosRemoveDir(path); + // if ((pTask->status.removeBackendFiles) && (pTask->pMeta != NULL)) { + // char* path = taosMemoryCalloc(1, strlen(pTask->pMeta->path) + 128); + // sprintf(path, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, pTask->id.idStr); + // taosRemoveDir(path); - stInfo("s-task:0x%x vgId:%d remove all backend files:%s", taskId, pTask->pMeta->vgId, path); - taosMemoryFree(path); - } + // stInfo("s-task:0x%x vgId:%d remove all backend files:%s", taskId, pTask->pMeta->vgId, path); + // taosMemoryFree(path); + // } if (pTask->id.idStr != NULL) { taosMemoryFree((void*)pTask->id.idStr); @@ -316,10 +316,12 @@ void tFreeStreamTask(SStreamTask* pTask) { stDebug("s-task:0x%x free task completed", taskId); } -void streamFreeTaskState(SStreamTask* pTask, ETaskStatus status) { +void streamFreeTaskState(SStreamTask* pTask, int8_t remove) { if (pTask->pState != NULL) { stDebug("s-task:0x%x start to free task state", pTask->id.taskId); - streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING); + streamStateClose(pTask->pState, remove); + + taskDbRemoveAllFiles(pTask->pBackend); taskDbRemoveRef(pTask->pBackend); pTask->pBackend = NULL; pTask->pState = NULL; diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index d3c39da6bd..04969c2b48 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -98,7 +98,7 @@ static int32_t attachWaitedEvent(SStreamTask* pTask, SFutureHandleEventInfo* pEv static int32_t stopTaskSuccFn(SStreamTask* pTask) { SStreamTaskSM* pSM = pTask->status.pSM; - streamFreeTaskState(pTask, pSM->current.state); + streamFreeTaskState(pTask,pSM->current.state == TASK_STATUS__DROPPING ? 1 : 0); return TSDB_CODE_SUCCESS; }