Merge branch 'fix/syntaxRefact' into fix/syntax

This commit is contained in:
yihaoDeng 2024-08-05 12:04:21 +08:00
commit 59586b0169
5 changed files with 32 additions and 14 deletions

View File

@ -534,7 +534,7 @@ void tFreeStreamTask(SStreamTask* pTask);
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask); int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver); int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver);
void streamFreeTaskState(SStreamTask* pTask, ETaskStatus status); void streamFreeTaskState(SStreamTask* pTask, int8_t remove);
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo); int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo);
int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId); int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId);

View File

@ -81,6 +81,7 @@ typedef struct {
int64_t dataWritten; int64_t dataWritten;
void* pMeta; void* pMeta;
int8_t removeAllFiles;
} STaskDbWrapper; } STaskDbWrapper;
@ -152,6 +153,8 @@ void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId);
void* taskDbAddRef(void* pTaskDb); void* taskDbAddRef(void* pTaskDb);
void taskDbRemoveRef(void* pTaskDb); void taskDbRemoveRef(void* pTaskDb);
void taskDbRemoveAllFiles(void* pTaskDb);
int streamStateOpenBackend(void* backend, SStreamState* pState); int streamStateOpenBackend(void* backend, SStreamState* pState);
void streamStateCloseBackend(SStreamState* pState, bool remove); void streamStateCloseBackend(SStreamState* pState, bool remove);
void streamStateDestroyCompar(void* arg); void streamStateDestroyCompar(void* arg);

View File

@ -2331,6 +2331,15 @@ void taskDbRemoveRef(void* pTaskDb) {
(void)taosReleaseRef(taskDbWrapperId, pBackend->refId); (void)taosReleaseRef(taskDbWrapperId, pBackend->refId);
} }
void taskDbRemoveAllFiles(void* pTaskDb) {
if (pTaskDb == NULL) {
return;
}
STaskDbWrapper* pBackend = pTaskDb;
atomic_store_8(&pBackend->removeAllFiles, 1);
}
void taskDbInitOpt(STaskDbWrapper* pTaskDb) { void taskDbInitOpt(STaskDbWrapper* pTaskDb) {
rocksdb_env_t* env = rocksdb_create_default_env(); rocksdb_env_t* env = rocksdb_create_default_env();
@ -2573,8 +2582,7 @@ void taskDbDestroy(void* pDb, bool flush) {
stDebug("succ to destroy stream backend:%p", wrapper); stDebug("succ to destroy stream backend:%p", wrapper);
int8_t nCf = tListLen(ginitDict); int8_t nCf = tListLen(ginitDict);
if (flush && wrapper->removeAllFiles == 0) {
if (flush) {
if (wrapper->db && wrapper->pCf) { if (wrapper->db && wrapper->pCf) {
rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
rocksdb_flushoptions_set_wait(flushOpt, 1); rocksdb_flushoptions_set_wait(flushOpt, 1);
@ -2636,6 +2644,11 @@ void taskDbDestroy(void* pDb, bool flush) {
taskDbDestroyChkpOpt(wrapper); taskDbDestroyChkpOpt(wrapper);
taosMemoryFree(wrapper->idstr); taosMemoryFree(wrapper->idstr);
if (wrapper->removeAllFiles) {
char* err = NULL;
taosRemoveDir(wrapper->path);
}
taosMemoryFree(wrapper->path); taosMemoryFree(wrapper->path);
taosMemoryFree(wrapper); taosMemoryFree(wrapper);

View File

@ -275,7 +275,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
} }
streamTaskCleanupCheckInfo(&pTask->taskCheckInfo); streamTaskCleanupCheckInfo(&pTask->taskCheckInfo);
streamFreeTaskState(pTask, status1); streamFreeTaskState(pTask, pTask->status.removeBackendFiles ? 1 : 0);
if (pTask->pNameMap) { if (pTask->pNameMap) {
tSimpleHashCleanup(pTask->pNameMap); tSimpleHashCleanup(pTask->pNameMap);
@ -296,14 +296,14 @@ void tFreeStreamTask(SStreamTask* pTask) {
taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList); taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList);
pTask->outputInfo.pNodeEpsetUpdateList = NULL; pTask->outputInfo.pNodeEpsetUpdateList = NULL;
if ((pTask->status.removeBackendFiles) && (pTask->pMeta != NULL)) { // if ((pTask->status.removeBackendFiles) && (pTask->pMeta != NULL)) {
char* path = taosMemoryCalloc(1, strlen(pTask->pMeta->path) + 128); // char* path = taosMemoryCalloc(1, strlen(pTask->pMeta->path) + 128);
sprintf(path, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, pTask->id.idStr); // sprintf(path, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, pTask->id.idStr);
taosRemoveDir(path); // taosRemoveDir(path);
stInfo("s-task:0x%x vgId:%d remove all backend files:%s", taskId, pTask->pMeta->vgId, path); // stInfo("s-task:0x%x vgId:%d remove all backend files:%s", taskId, pTask->pMeta->vgId, path);
taosMemoryFree(path); // taosMemoryFree(path);
} // }
if (pTask->id.idStr != NULL) { if (pTask->id.idStr != NULL) {
taosMemoryFree((void*)pTask->id.idStr); taosMemoryFree((void*)pTask->id.idStr);
@ -316,10 +316,12 @@ void tFreeStreamTask(SStreamTask* pTask) {
stDebug("s-task:0x%x free task completed", taskId); stDebug("s-task:0x%x free task completed", taskId);
} }
void streamFreeTaskState(SStreamTask* pTask, ETaskStatus status) { void streamFreeTaskState(SStreamTask* pTask, int8_t remove) {
if (pTask->pState != NULL) { if (pTask->pState != NULL) {
stDebug("s-task:0x%x start to free task state", pTask->id.taskId); stDebug("s-task:0x%x start to free task state", pTask->id.taskId);
streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING); streamStateClose(pTask->pState, remove);
taskDbRemoveAllFiles(pTask->pBackend);
taskDbRemoveRef(pTask->pBackend); taskDbRemoveRef(pTask->pBackend);
pTask->pBackend = NULL; pTask->pBackend = NULL;
pTask->pState = NULL; pTask->pState = NULL;

View File

@ -98,7 +98,7 @@ static int32_t attachWaitedEvent(SStreamTask* pTask, SFutureHandleEventInfo* pEv
static int32_t stopTaskSuccFn(SStreamTask* pTask) { static int32_t stopTaskSuccFn(SStreamTask* pTask) {
SStreamTaskSM* pSM = pTask->status.pSM; SStreamTaskSM* pSM = pTask->status.pSM;
streamFreeTaskState(pTask, pSM->current.state); streamFreeTaskState(pTask,pSM->current.state == TASK_STATUS__DROPPING ? 1 : 0);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }