diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 7cd8391c80..0840694964 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -290,6 +290,7 @@ typedef struct SStreamStatus { int64_t lastExecTs; // last exec time stamp int32_t inScanHistorySentinel; bool appendTranstateBlock; // has append the transfer state data block already + bool removeBackendFiles; // remove backend files on disk when free stream tasks } SStreamStatus; typedef struct SDataRange { @@ -675,6 +676,7 @@ void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId); void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key); bool streamTaskIsSinkTask(const SStreamTask* pTask); +void streamTaskSetRemoveBackendFiles(SStreamTask* pTask); void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 544e820695..b5b5ef8755 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -584,18 +584,6 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve return code; } -static void tqStreamRemoveTaskBackend(SStreamMeta* pMeta, const STaskId* pId) { - char taskKey[128] = {0}; - sprintf(taskKey, "0x%" PRIx64 "-0x%x", pId->streamId, (int32_t)pId->taskId); - - char* path = taosMemoryCalloc(1, strlen(pMeta->path) + 128); - sprintf(path, "%s%s%s", pMeta->path, TD_DIRSEP, taskKey); - taosRemoveDir(path); - - tqInfo("vgId:%d drop stream task:0x%x file:%s", pMeta->vgId, (int32_t)pId->taskId, path); - taosMemoryFree(path); -} - int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) { SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; @@ -616,6 +604,7 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen hTaskId.taskId = pTask->hTaskInfo.id.taskId; } + streamTaskSetRemoveBackendFiles(pTask); streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt); streamMetaReleaseTask(pMeta, pTask); } @@ -642,7 +631,7 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen streamMetaWUnLock(pMeta); - tqStreamRemoveTaskBackend(pMeta, &id); +// tqStreamRemoveTaskBackend(pMeta, &id); return 0; } diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index c6e580dce9..b6bbe2e383 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1968,11 +1968,15 @@ STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) { if (cfNames != NULL) { rocksdb_list_column_families_destroy(cfNames, nCf); } + taosMemoryFree(err); err = NULL; cfNames = rocksdb_list_column_families(pTaskDb->dbOpt, dbPath, &nCf, &err); - ASSERT(err == NULL); + if (err != NULL) { + stError("%s failed to create column-family, %s, %d, reason:%s", key, dbPath, nCf, err); + goto _EXIT; + } } if (taskDbOpenCfs(pTaskDb, dbPath, cfNames, nCf) != 0) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 39ed23730e..27bd001588 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1521,7 +1521,9 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas code = expandFn(pHTask); if (code != TSDB_CODE_SUCCESS) { streamMetaAddFailedTaskSelf(pHTask, pInfo->readyTs); + streamMetaReleaseTask(pMeta, pHTask); + streamMetaReleaseTask(pMeta, pTask); return code; } } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 72302f981d..2cb388954d 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -187,8 +187,9 @@ int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId) { } void tFreeStreamTask(SStreamTask* pTask) { - char* p = NULL; - int32_t taskId = pTask->id.taskId; + char* p = NULL; + int32_t taskId = pTask->id.taskId; + STaskExecStatisInfo* pStatis = &pTask->execInfo; ETaskStatus status1 = TASK_STATUS__UNINIT; @@ -200,7 +201,7 @@ void tFreeStreamTask(SStreamTask* pTask) { } taosThreadMutexUnlock(&pTask->lock); - stDebug("start to free s-task:0x%x, %p, state:%s", taskId, pTask, p); + stDebug("start to free s-task:0x%x %p, state:%s", taskId, pTask, p); SCheckpointInfo* pCkInfo = &pTask->chkInfo; stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64 @@ -275,10 +276,6 @@ void tFreeStreamTask(SStreamTask* pTask) { taskDbRemoveRef(pTask->pBackend); } - if (pTask->id.idStr != NULL) { - taosMemoryFree((void*)pTask->id.idStr); - } - if (pTask->pNameMap) { tSimpleHashCleanup(pTask->pNameMap); } @@ -292,6 +289,19 @@ void tFreeStreamTask(SStreamTask* pTask) { pTask->outputInfo.pNodeEpsetUpdateList = taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList); + if ((pTask->status.removeBackendFiles) && (pTask->pMeta != NULL)) { + char* path = taosMemoryCalloc(1, strlen(pTask->pMeta->path) + 128); + sprintf(path, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, pTask->id.idStr); + taosRemoveDir(path); + + stInfo("s-task:0x%x vgId:%d remove all backend files:%s", taskId, pTask->pMeta->vgId, path); + taosMemoryFree(path); + } + + if (pTask->id.idStr != NULL) { + taosMemoryFree((void*)pTask->id.idStr); + } + taosMemoryFree(pTask); stDebug("s-task:0x%x free task completed", taskId); } @@ -896,4 +906,8 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) { return code; } return streamTrySchedExec(pTask); +} + +void streamTaskSetRemoveBackendFiles(SStreamTask* pTask) { + pTask->status.removeBackendFiles = true; } \ No newline at end of file diff --git a/tests/system-test/1-insert/drop.py b/tests/system-test/1-insert/drop.py index 21817ef20d..493e1491b8 100644 --- a/tests/system-test/1-insert/drop.py +++ b/tests/system-test/1-insert/drop.py @@ -20,6 +20,7 @@ from util.common import * from util.sqlset import * class TDTestCase: + updatecfgDict = {'stdebugflag':143} def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) tdLog.debug("start to execute %s" % __file__)