From 74b7bf6bbcffeee18d293a59ee2a6363b532ea12 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 17 Apr 2024 05:37:35 +0000 Subject: [PATCH] drop stream data --- source/dnode/vnode/src/tqCommon/tqCommon.c | 43 ++++++++++++++-------- tests/script/tsim/stream/basic0.sim | 2 + 2 files changed, 29 insertions(+), 16 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 1c3a760bab..b304ef3531 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -552,7 +552,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe typedef struct SMStreamCheckpointReadyRspMsg { SMsgHead head; -}SMStreamCheckpointReadyRspMsg; +} SMStreamCheckpointReadyRspMsg; int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) { int32_t vgId = pMeta->vgId; @@ -675,6 +675,17 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve return code; } +void tqStreamRmTaskBackend(SStreamMeta* pMeta, STaskId* id) { + char taskKey[128] = {0}; + sprintf(taskKey, "0x%" PRIx64 "-0x%x", id->streamId, (int32_t)id->taskId); + + char* path = taosMemoryCalloc(1, strlen(pMeta->path) + 128); + sprintf(path, "%s%s%s", pMeta->path, TD_DIRSEP, taskKey); + taosRemoveDir(path); + taosMemoryFree(path); + // do nothing +} + int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) { SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; int32_t vgId = pMeta->vgId; @@ -718,6 +729,7 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen if (streamMetaCommit(pMeta) < 0) { // persist to disk } + tqStreamRmTaskBackend(pMeta, &id); streamMetaWUnLock(pMeta); return 0; @@ -823,7 +835,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead } else if (type == STREAM_EXEC_T_STOP_ALL_TASKS) { streamMetaStopAllTasks(pMeta); return 0; - } else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while + } else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); if (pTask != NULL) { @@ -849,8 +861,8 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead if (pTask != NULL) { // even in halt status, the data in inputQ must be processed char* p = NULL; if (streamTaskReadyToRun(pTask, &p)) { - tqDebug("vgId:%d s-task:%s status:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr, - p, pTask->chkInfo.nextProcessVer); + tqDebug("vgId:%d s-task:%s status:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, + pTask->id.idStr, p, pTask->chkInfo.nextProcessVer); streamExecTask(pTask); } else { int8_t status = streamTaskSetSchedStatusInactive(pTask); @@ -938,7 +950,7 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { return TSDB_CODE_SUCCESS; } -int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg){ +int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg) { SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg; SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); @@ -975,9 +987,10 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg){ return TSDB_CODE_SUCCESS; } -static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t sversion, int8_t igUntreated, bool fromVnode) { - SStreamMeta *pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle; - int32_t vgId = pMeta->vgId; +static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t sversion, int8_t igUntreated, + bool fromVnode) { + SStreamMeta* pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle; + int32_t vgId = pMeta->vgId; if (pTask == NULL) { return -1; } @@ -999,7 +1012,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t // discard all the data when the stream task is suspended. walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion); tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64 - ", schedStatus:%d", + ", schedStatus:%d", vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus); } else { // from the previous paused version and go on tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d", @@ -1025,11 +1038,11 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t return 0; } -int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* msg, bool fromVnode){ +int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* msg, bool fromVnode) { SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg; - SStreamMeta *pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle; - SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); - int32_t code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode); + SStreamMeta* pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle; + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); + int32_t code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode); if (code != 0) { return code; } @@ -1043,9 +1056,7 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m return code; } -int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) { - return taosArrayGetSize(pMeta->pTaskList); -} +int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) { return taosArrayGetSize(pMeta->pTaskList); } static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) { rpcFreeCont(pMsg->pCont); diff --git a/tests/script/tsim/stream/basic0.sim b/tests/script/tsim/stream/basic0.sim index a525594861..5a99aadf36 100644 --- a/tests/script/tsim/stream/basic0.sim +++ b/tests/script/tsim/stream/basic0.sim @@ -171,6 +171,8 @@ if $data13 != 789 then goto loop2 endi +sql drop stream s1 + _OVER: system sh/exec.sh -n dnode1 -s stop -x SIGINT print =============== check