fix(stream): remove backend files only after the in-memory task is dropped.
This commit is contained in:
parent
042ed3caff
commit
abd90c733a
|
@ -290,6 +290,7 @@ typedef struct SStreamStatus {
|
||||||
int64_t lastExecTs; // last exec time stamp
|
int64_t lastExecTs; // last exec time stamp
|
||||||
int32_t inScanHistorySentinel;
|
int32_t inScanHistorySentinel;
|
||||||
bool appendTranstateBlock; // has append the transfer state data block already
|
bool appendTranstateBlock; // has append the transfer state data block already
|
||||||
|
bool removeBackendFiles; // remove backend files on disk when free stream tasks
|
||||||
} SStreamStatus;
|
} SStreamStatus;
|
||||||
|
|
||||||
typedef struct SDataRange {
|
typedef struct SDataRange {
|
||||||
|
@ -675,6 +676,7 @@ void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId);
|
||||||
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
|
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
|
||||||
int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key);
|
int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key);
|
||||||
bool streamTaskIsSinkTask(const SStreamTask* pTask);
|
bool streamTaskIsSinkTask(const SStreamTask* pTask);
|
||||||
|
void streamTaskSetRemoveBackendFiles(SStreamTask* pTask);
|
||||||
|
|
||||||
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask);
|
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask);
|
||||||
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);
|
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);
|
||||||
|
|
|
@ -584,18 +584,6 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tqStreamRemoveTaskBackend(SStreamMeta* pMeta, const STaskId* pId) {
|
|
||||||
char taskKey[128] = {0};
|
|
||||||
sprintf(taskKey, "0x%" PRIx64 "-0x%x", pId->streamId, (int32_t)pId->taskId);
|
|
||||||
|
|
||||||
char* path = taosMemoryCalloc(1, strlen(pMeta->path) + 128);
|
|
||||||
sprintf(path, "%s%s%s", pMeta->path, TD_DIRSEP, taskKey);
|
|
||||||
taosRemoveDir(path);
|
|
||||||
|
|
||||||
tqInfo("vgId:%d drop stream task:0x%x file:%s", pMeta->vgId, (int32_t)pId->taskId, path);
|
|
||||||
taosMemoryFree(path);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
|
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
|
||||||
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
|
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
|
||||||
|
|
||||||
|
@ -616,6 +604,7 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
|
||||||
hTaskId.taskId = pTask->hTaskInfo.id.taskId;
|
hTaskId.taskId = pTask->hTaskInfo.id.taskId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
streamTaskSetRemoveBackendFiles(pTask);
|
||||||
streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt);
|
streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
}
|
}
|
||||||
|
@ -642,7 +631,7 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
|
||||||
|
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
tqStreamRemoveTaskBackend(pMeta, &id);
|
// tqStreamRemoveTaskBackend(pMeta, &id);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1968,11 +1968,15 @@ STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) {
|
||||||
if (cfNames != NULL) {
|
if (cfNames != NULL) {
|
||||||
rocksdb_list_column_families_destroy(cfNames, nCf);
|
rocksdb_list_column_families_destroy(cfNames, nCf);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(err);
|
taosMemoryFree(err);
|
||||||
err = NULL;
|
err = NULL;
|
||||||
|
|
||||||
cfNames = rocksdb_list_column_families(pTaskDb->dbOpt, dbPath, &nCf, &err);
|
cfNames = rocksdb_list_column_families(pTaskDb->dbOpt, dbPath, &nCf, &err);
|
||||||
ASSERT(err == NULL);
|
if (err != NULL) {
|
||||||
|
stError("%s failed to create column-family, %s, %d, reason:%s", key, dbPath, nCf, err);
|
||||||
|
goto _EXIT;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taskDbOpenCfs(pTaskDb, dbPath, cfNames, nCf) != 0) {
|
if (taskDbOpenCfs(pTaskDb, dbPath, cfNames, nCf) != 0) {
|
||||||
|
|
|
@ -1521,7 +1521,9 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
|
||||||
code = expandFn(pHTask);
|
code = expandFn(pHTask);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
streamMetaAddFailedTaskSelf(pHTask, pInfo->readyTs);
|
streamMetaAddFailedTaskSelf(pHTask, pInfo->readyTs);
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pHTask);
|
streamMetaReleaseTask(pMeta, pHTask);
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -189,6 +189,7 @@ int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId) {
|
||||||
void tFreeStreamTask(SStreamTask* pTask) {
|
void tFreeStreamTask(SStreamTask* pTask) {
|
||||||
char* p = NULL;
|
char* p = NULL;
|
||||||
int32_t taskId = pTask->id.taskId;
|
int32_t taskId = pTask->id.taskId;
|
||||||
|
|
||||||
STaskExecStatisInfo* pStatis = &pTask->execInfo;
|
STaskExecStatisInfo* pStatis = &pTask->execInfo;
|
||||||
|
|
||||||
ETaskStatus status1 = TASK_STATUS__UNINIT;
|
ETaskStatus status1 = TASK_STATUS__UNINIT;
|
||||||
|
@ -200,7 +201,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
stDebug("start to free s-task:0x%x, %p, state:%s", taskId, pTask, p);
|
stDebug("start to free s-task:0x%x %p, state:%s", taskId, pTask, p);
|
||||||
|
|
||||||
SCheckpointInfo* pCkInfo = &pTask->chkInfo;
|
SCheckpointInfo* pCkInfo = &pTask->chkInfo;
|
||||||
stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64
|
stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64
|
||||||
|
@ -275,10 +276,6 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
||||||
taskDbRemoveRef(pTask->pBackend);
|
taskDbRemoveRef(pTask->pBackend);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->id.idStr != NULL) {
|
|
||||||
taosMemoryFree((void*)pTask->id.idStr);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTask->pNameMap) {
|
if (pTask->pNameMap) {
|
||||||
tSimpleHashCleanup(pTask->pNameMap);
|
tSimpleHashCleanup(pTask->pNameMap);
|
||||||
}
|
}
|
||||||
|
@ -292,6 +289,19 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
||||||
|
|
||||||
pTask->outputInfo.pNodeEpsetUpdateList = taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList);
|
pTask->outputInfo.pNodeEpsetUpdateList = taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList);
|
||||||
|
|
||||||
|
if ((pTask->status.removeBackendFiles) && (pTask->pMeta != NULL)) {
|
||||||
|
char* path = taosMemoryCalloc(1, strlen(pTask->pMeta->path) + 128);
|
||||||
|
sprintf(path, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, pTask->id.idStr);
|
||||||
|
taosRemoveDir(path);
|
||||||
|
|
||||||
|
stInfo("s-task:0x%x vgId:%d remove all backend files:%s", taskId, pTask->pMeta->vgId, path);
|
||||||
|
taosMemoryFree(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pTask->id.idStr != NULL) {
|
||||||
|
taosMemoryFree((void*)pTask->id.idStr);
|
||||||
|
}
|
||||||
|
|
||||||
taosMemoryFree(pTask);
|
taosMemoryFree(pTask);
|
||||||
stDebug("s-task:0x%x free task completed", taskId);
|
stDebug("s-task:0x%x free task completed", taskId);
|
||||||
}
|
}
|
||||||
|
@ -897,3 +907,7 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
|
||||||
}
|
}
|
||||||
return streamTrySchedExec(pTask);
|
return streamTrySchedExec(pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void streamTaskSetRemoveBackendFiles(SStreamTask* pTask) {
|
||||||
|
pTask->status.removeBackendFiles = true;
|
||||||
|
}
|
|
@ -20,6 +20,7 @@ from util.common import *
|
||||||
from util.sqlset import *
|
from util.sqlset import *
|
||||||
|
|
||||||
class TDTestCase:
|
class TDTestCase:
|
||||||
|
updatecfgDict = {'stdebugflag':143}
|
||||||
def init(self, conn, logSql, replicaVar=1):
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
self.replicaVar = int(replicaVar)
|
self.replicaVar = int(replicaVar)
|
||||||
tdLog.debug("start to execute %s" % __file__)
|
tdLog.debug("start to execute %s" % __file__)
|
||||||
|
|
Loading…
Reference in New Issue