From fd96b096ea98b584a74fba0cf871703831362bfe Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 13 Feb 2025 23:41:06 +0800 Subject: [PATCH 1/8] refactor(stream): drop task in async ways --- include/dnode/vnode/tqCommon.h | 2 +- include/libs/executor/executor.h | 2 +- include/libs/stream/tstream.h | 10 ++++-- source/dnode/snode/src/snode.c | 2 +- source/dnode/vnode/src/sma/smaRollup.c | 2 +- source/dnode/vnode/src/tq/tq.c | 2 +- source/dnode/vnode/src/tqCommon/tqCommon.c | 34 ++++++++++++------ source/libs/executor/src/executor.c | 14 ++++++-- source/libs/stream/src/streamCheckpoint.c | 24 ++++++------- source/libs/stream/src/streamMeta.c | 42 ++++++++++++++++++---- source/libs/stream/src/streamTask.c | 8 +++-- 11 files changed, 99 insertions(+), 43 deletions(-) diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index 4d5e18520c..69a11638b1 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -34,7 +34,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored); -int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen); +int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, SMsgCb* cb, char* msg, int32_t msgLen); int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader); int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta); int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta); diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 883c5f7b99..5b4224d575 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -180,7 +180,7 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo); */ int32_t qAsyncKillTask(qTaskInfo_t tinfo, int32_t rspCode); -int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode); +int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waiting); bool qTaskIsExecuting(qTaskInfo_t qinfo); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index a4d89dcdcc..cc12566389 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -58,6 +58,7 @@ extern "C" { #define STREAM_EXEC_T_STOP_ALL_TASKS (-5) #define STREAM_EXEC_T_RESUME_TASK (-6) #define STREAM_EXEC_T_ADD_FAILED_TASK (-7) +#define STREAM_EXEC_T_DROP_ONE_TASK (-8) typedef struct SStreamTask SStreamTask; typedef struct SStreamQueue SStreamQueue; @@ -752,15 +753,20 @@ void streamMetaCleanup(); int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild expandFunc, FTaskExpand expandTaskFn, int32_t vgId, int64_t stage, startComplete_fn_t fn, SStreamMeta** pMeta); void streamMetaClose(SStreamMeta* streamMeta); -int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); // save to stream meta store -int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pKey); + +int32_t streamMetaSaveTaskInMeta(SStreamMeta* pMeta, SStreamTask* pTask); // save to stream meta store +int32_t streamMetaRemoveTaskInMeta(SStreamMeta* pMeta, STaskId* pKey); + int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded); int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); +int32_t streamMetaDropTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); + int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask); int32_t streamMetaAcquireTaskUnsafe(SStreamMeta* pMeta, STaskId* pId, SStreamTask** pTask); int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); + void streamMetaClear(SStreamMeta* pMeta); void streamMetaInitBackend(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 6eee8c510b..6899140af9 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -155,7 +155,7 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) { } case TDMT_STREAM_TASK_DROP: - return tqStreamTaskProcessDropReq(pSnode->pMeta, pMsg->pCont, pMsg->contLen); + return tqStreamTaskProcessDropReq(pSnode->pMeta, &pSnode->msgCb, pMsg->pCont, pMsg->contLen); case TDMT_VND_STREAM_TASK_UPDATE: return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true); case TDMT_VND_STREAM_TASK_RESET: diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 7d83dbcf84..1f755f816e 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -1302,7 +1302,7 @@ _checkpoint: } streamMetaWLock(pMeta); - if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) { + if ((code = streamMetaSaveTaskInMeta(pMeta, pTask)) != 0) { streamMetaWUnLock(pMeta); taosHashCancelIterate(pInfoHash, infoHash); TSDB_CHECK_CODE(code, lino, _exit); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1c0f73249c..bd2a7207f2 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1118,7 +1118,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { - return tqStreamTaskProcessDropReq(pTq->pStreamMeta, msg, msgLen); + return tqStreamTaskProcessDropReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, msg, msgLen); } int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen) { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index bcf60f5c32..fce2d83490 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -260,13 +260,13 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM // stream do update the nodeEp info, write it into stream meta. if (updated) { tqDebug("s-task:%s vgId:%d save task after update epset, and stop task", idstr, vgId); - code = streamMetaSaveTask(pMeta, pTask); + code = streamMetaSaveTaskInMeta(pMeta, pTask); if (code) { tqError("s-task:%s vgId:%d failed to save task, code:%s", idstr, vgId, tstrerror(code)); } if (pHTask != NULL) { - code = streamMetaSaveTask(pMeta, pHTask); + code = streamMetaSaveTaskInMeta(pMeta, pHTask); if (code) { tqError("s-task:%s vgId:%d failed to save related history task, code:%s", idstr, vgId, tstrerror(code)); } @@ -688,7 +688,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve return code; } -int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) { +int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, SMsgCb* cb, char* msg, int32_t msgLen) { SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; int32_t code = 0; int32_t vgId = pMeta->vgId; @@ -720,29 +720,40 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen // drop the related fill-history task firstly if (hTaskId.taskId != 0 && hTaskId.streamId != 0) { - tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId); - code = streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId); + code = streamTaskSchedTask(cb, vgId, hTaskId.streamId, hTaskId.taskId, STREAM_EXEC_T_DROP_ONE_TASK); if (code) { - tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x failed", pReq->taskId, vgId, + tqError("s-task:0x%x vgId:%d failed to create msg to drop rel fill-history task:0x%x, code:%s", pReq->taskId, + vgId, (int32_t)hTaskId.taskId, tstrerror(code)); + } else { + tqDebug("s-task:0x%x vgId:%d create msg to drop rel fill-history task:0x%x succ", pReq->taskId, vgId, (int32_t)hTaskId.taskId); } } // drop the stream task now - code = streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId); + code = streamTaskSchedTask(cb, vgId, pReq->streamId, pReq->taskId, STREAM_EXEC_T_DROP_ONE_TASK); if (code) { - tqDebug("s-task:0x%x vgId:%d drop task failed", pReq->taskId, vgId); + tqError("s-task:0x%x vgId:%d failed to create msg to drop task, code:%s", pReq->taskId, vgId, tstrerror(code)); + } else { + tqDebug("s-task:0x%x vgId:%d create msg to drop succ", pReq->taskId, vgId); } +// code = streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId); +// if (code) { +// tqDebug("s-task:0x%x vgId:%d drop task failed", pReq->taskId, vgId); +// } + // commit the update - int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); - tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks); +// int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); +// tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks); if (streamMetaCommit(pMeta) < 0) { // persist to disk } streamMetaWUnLock(pMeta); + + tqDebug("vgId:%d process drop task:0x%x async completed", vgId, pReq->taskId); return 0; // always return success } @@ -857,6 +868,9 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead } else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) { code = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId); return code; + } else if (type == STREAM_EXEC_T_DROP_ONE_TASK) { + code = streamMetaDropTask(pMeta, req.streamId, req.taskId); + return code; } else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while SStreamTask* pTask = NULL; code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 1386b0b82f..3d60e4d797 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -972,20 +972,28 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) { return TSDB_CODE_SUCCESS; } -int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode) { +int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waitingDuration) { + int64_t st = taosGetTimestampMs(); SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; if (pTaskInfo == NULL) { return TSDB_CODE_QRY_INVALID_QHANDLE; } - qDebug("%s sync killed execTask", GET_TASKID(pTaskInfo)); + qDebug("%s sync killed execTask, and waiting for %.2fs", GET_TASKID(pTaskInfo), waitingDuration/1000.0); setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED); while (1) { taosWLockLatch(&pTaskInfo->lock); if (qTaskIsExecuting(pTaskInfo)) { // let's wait for 100 ms and try again taosWUnLockLatch(&pTaskInfo->lock); - taosMsleep(100); + + taosMsleep(200); + + int64_t d = taosGetTimestampMs() - st; + if (d >= waitingDuration && waitingDuration >= 0) { + qWarn("%s waiting more than %.2fs, not wait anymore", GET_TASKID(pTaskInfo), waitingDuration/1000.0); + return TSDB_CODE_SUCCESS; + } } else { // not running now pTaskInfo->code = rspCode; taosWUnLockLatch(&pTaskInfo->lock); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 1a49e50547..d62165a72c 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -625,14 +625,10 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV { // destroy the related fill-history tasks // drop task should not in the meta-lock, and drop the related fill-history task now if (pReq->dropRelHTask) { - code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); - int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); - stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d", - id, vgId, pReq->taskId, numOfTasks); - } - - if (pReq->dropRelHTask) { - code = streamMetaCommit(pMeta); + code = streamTaskSchedTask(pTask->pMsgCb, vgId, pReq->hStreamId, pReq->hTaskId, STREAM_EXEC_T_DROP_ONE_TASK); + if (code) { + stError("s-task:%s failed to create msg to drop related fill-history task, code:%s", id, tstrerror(code)); + } } } @@ -697,7 +693,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV pTask->status.taskStatus = TASK_STATUS__READY; - code = streamMetaSaveTask(pMeta, pTask); + code = streamMetaSaveTaskInMeta(pMeta, pTask); streamMutexUnlock(&pTask->lock); if (code != TSDB_CODE_SUCCESS) { @@ -708,10 +704,12 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV // drop task should not in the meta-lock, and drop the related fill-history task now if (pReq->dropRelHTask) { - code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); - int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); - stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId, - (int32_t)pReq->hTaskId, numOfTasks); + code = streamTaskSchedTask(pTask->pMsgCb, vgId, pReq->hStreamId, pReq->hTaskId, STREAM_EXEC_T_DROP_ONE_TASK); + if (code) { + stError("s-task:%s failed to create msg to drop related fill-history task, code:%s", id, tstrerror(code)); + } else { + stDebug("s-task:%s vgId:%d create msg to drop related fill-history task:0x%x", id, vgId, (int32_t)pReq->hTaskId); + } } code = streamMetaCommit(pMeta); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 9a2eeb9311..5ec7ed7761 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -632,7 +632,7 @@ void streamMetaCloseImpl(void* arg) { } // todo let's check the status for each task -int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { +int32_t streamMetaSaveTaskInMeta(SStreamMeta* pMeta, SStreamTask* pTask) { int32_t vgId = pTask->pMeta->vgId; void* buf = NULL; int32_t len; @@ -682,7 +682,7 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { return code; } -int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pTaskId) { +int32_t streamMetaRemoveTaskInMeta(SStreamMeta* pMeta, STaskId* pTaskId) { int64_t key[2] = {pTaskId->streamId, pTaskId->taskId}; int32_t code = tdbTbDelete(pMeta->pTaskDb, key, STREAM_TASK_KEY_LEN, pMeta->txn); if (code != 0) { @@ -705,7 +705,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (p != NULL) { - stDebug("s-task:%" PRIx64 " already exist in meta, no need to register", id.taskId); + stDebug("s-task:0x%" PRIx64 " already exist in meta, no need to register", id.taskId); tFreeStreamTask(pTask); return code; } @@ -735,7 +735,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa return code; } - if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) { + if ((code = streamMetaSaveTaskInMeta(pMeta, pTask)) != 0) { int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); void* pUnused = taosArrayPop(pMeta->pTaskList); @@ -885,6 +885,8 @@ static void doRemoveIdFromList(SArray* pTaskList, int32_t num, SStreamTaskId* id static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) { int32_t code = 0; + int32_t waitingDuration = 5000; + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { code = streamTaskSendCheckpointSourceRsp(pTask); if (code) { @@ -895,7 +897,7 @@ static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) { // let's kill the query procedure within stream, to end it ASAP. if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) { - code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS); + code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS, 5000); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s failed to kill task related query handle, code:%s", pTask->id.idStr, tstrerror(code)); } @@ -932,7 +934,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t code = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id); - code = streamMetaRemoveTask(pMeta, &id); + code = streamMetaRemoveTaskInMeta(pMeta, &id); if (code) { stError("vgId:%d failed to remove task:0x%" PRIx64 ", code:%s", pMeta->vgId, id.taskId, tstrerror(code)); } @@ -963,6 +965,32 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t return 0; } +int32_t streamMetaDropTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { + SStreamTask* pTask = NULL; + int32_t code = 0; + int32_t vgId = pMeta->vgId; + int32_t numOfTasks = 0; + + streamMetaWLock(pMeta); + + code = streamMetaUnregisterTask(pMeta, streamId, taskId); + numOfTasks = streamMetaGetNumOfTasks(pMeta); + if (code) { + stError("vgId:%d failed to drop task:0x%x, code:%s", vgId, taskId, tstrerror(code)); + } + + code = streamMetaCommit(pMeta); + if (code) { + stError("vgId:%d failed to commit after drop task:0x%x, code:%s", vgId, taskId, tstrerror(code)); + } else { + stDebug("s-task:0x%"PRIx64"-0x%x vgId:%d dropped, remain tasks:%d", streamId, taskId, pMeta->vgId, numOfTasks); + } + + streamMetaWUnLock(pMeta); + + return code; +} + int32_t streamMetaBegin(SStreamMeta* pMeta) { streamMetaWLock(pMeta); int32_t code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, @@ -1185,7 +1213,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { if (taosArrayGetSize(pRecycleList) > 0) { for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) { STaskId* pId = taosArrayGet(pRecycleList, i); - code = streamMetaRemoveTask(pMeta, pId); + code = streamMetaRemoveTaskInMeta(pMeta, pId); if (code) { stError("s-task:0x%" PRIx64 " failed to remove task, code:%s", pId->taskId, tstrerror(code)); } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index d27ed520c6..f0b288f096 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -703,7 +703,7 @@ int32_t streamTaskStop(SStreamTask* pTask) { } if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) { - code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS); + code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS, 5000); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s failed to kill task related query handle, code:%s", id, tstrerror(code)); } @@ -862,7 +862,7 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt) { pStreamTask->status.taskStatus = TASK_STATUS__READY; } - code = streamMetaSaveTask(pMeta, pStreamTask); + code = streamMetaSaveTaskInMeta(pMeta, pStreamTask); streamMutexUnlock(&(pStreamTask->lock)); streamMetaReleaseTask(pMeta, pStreamTask); @@ -1025,7 +1025,7 @@ static int32_t taskPauseCallback(SStreamTask* pTask, void* param) { // in case of fill-history task, stop the tsdb file scan operation. if (pTask->info.fillHistory == 1) { void* pExecutor = pTask->exec.pExecutor; - code = qKillTask(pExecutor, TSDB_CODE_SUCCESS); + code = qKillTask(pExecutor, TSDB_CODE_SUCCESS, 10000); } stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr); @@ -1287,6 +1287,8 @@ const char* streamTaskGetExecType(int32_t type) { return "resume-task-from-idle"; case STREAM_EXEC_T_ADD_FAILED_TASK: return "record-start-failed-task"; + case STREAM_EXEC_T_DROP_ONE_TASK: + return "drop-one-task"; case 0: return "exec-all-tasks"; default: From 43d45e9f9a4a20a424aa21881abe9cc6a616cc34 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 13 Feb 2025 23:43:07 +0800 Subject: [PATCH 2/8] refactor(stream): drop task in async ways --- source/dnode/vnode/src/tqCommon/tqCommon.c | 2 +- tests/ci/func.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index fce2d83490..35edccd9fe 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -720,7 +720,7 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, SMsgCb* cb, char* msg, in // drop the related fill-history task firstly if (hTaskId.taskId != 0 && hTaskId.streamId != 0) { - code = streamTaskSchedTask(cb, vgId, hTaskId.streamId, hTaskId.taskId, STREAM_EXEC_T_DROP_ONE_TASK); + code = streamTaskSchedTask(cb, vgId, hTaskId.streamId, (int32_t)hTaskId.taskId, STREAM_EXEC_T_DROP_ONE_TASK); if (code) { tqError("s-task:0x%x vgId:%d failed to create msg to drop rel fill-history task:0x%x, code:%s", pReq->taskId, vgId, (int32_t)hTaskId.taskId, tstrerror(code)); diff --git a/tests/ci/func.txt b/tests/ci/func.txt index 45d4fb1c11..c724568537 100644 --- a/tests/ci/func.txt +++ b/tests/ci/func.txt @@ -79,7 +79,7 @@ (void)streamMetaAddFailedTask (void)streamMetaAddTaskLaunchResult (void)streamMetaCommit -(void)streamMetaRemoveTask +(void)streamMetaRemoveTaskInMeta (void)streamMetaSendHbHelper (void)streamMetaStartAllTasks (void)streamMetaStartOneTask From 25cdfa5ee93fb49a9b4b6af8be108135869b8147 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 14 Feb 2025 18:34:20 +0800 Subject: [PATCH 3/8] refactor(stream): add long exec stream queue for history tasks in step1 and re-calculate task execution. --- include/common/tmsgcb.h | 1 + include/util/tworker.h | 2 +- source/dnode/mgmt/mgmt_vnode/inc/vmInt.h | 4 ++ source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 18 ++++--- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 8 ++- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 59 ++++++++++++++++++--- source/dnode/vnode/inc/vnode.h | 1 + source/dnode/vnode/src/vnd/vnodeSvr.c | 22 ++++++-- source/libs/executor/src/projectoperator.c | 8 --- source/libs/stream/src/streamExec.c | 2 +- source/libs/stream/src/streamStartHistory.c | 2 +- source/util/src/tworker.c | 7 ++- 12 files changed, 99 insertions(+), 35 deletions(-) diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index c934cb6961..2847f4278a 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -38,6 +38,7 @@ typedef enum { STREAM_QUEUE, ARB_QUEUE, STREAM_CTRL_QUEUE, + STREAM_LONG_EXEC_QUEUE, QUEUE_MAX, } EQueueType; diff --git a/include/util/tworker.h b/include/util/tworker.h index a3ba7dba6d..bc0dde1a37 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -76,7 +76,7 @@ void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue); int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool); void tAutoQWorkerCleanup(SAutoQWorkerPool *pool); -STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp); +STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp, int32_t minNum); void tAutoQWorkerFreeQueue(SAutoQWorkerPool *pool, STaosQueue *queue); int32_t tWWorkerInit(SWWorkerPool *pool); diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 84f5149624..9b4c11d6ae 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -32,6 +32,7 @@ typedef struct SVnodeMgmt { const char *name; SQueryAutoQWorkerPool queryPool; SAutoQWorkerPool streamPool; + SAutoQWorkerPool streamLongExecPool; SWWorkerPool streamCtrlPool; SWWorkerPool fetchPool; SSingleWorker mgmtWorker; @@ -75,6 +76,7 @@ typedef struct { STaosQueue *pQueryQ; STaosQueue *pStreamQ; STaosQueue *pStreamCtrlQ; + STaosQueue *pStreamLongExecQ; STaosQueue *pFetchQ; STaosQueue *pMultiMgmQ; } SVnodeObj; @@ -137,6 +139,8 @@ int32_t vmPutMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToStreamCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmPutMsgToStreamLongExecQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); + int32_t vmPutMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 234d4f41e1..1dea7d3cad 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -1008,27 +1008,29 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY, vmPutMsgToStreamLongExecQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_CONSEN_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index d71e0b02c4..6f30977e10 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -398,10 +398,14 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, dInfo("vgId:%d, wait for vnode stream queue:%p is empty, %d remains", pVnode->vgId, pVnode->pStreamQ, taosQueueItemSize(pVnode->pStreamQ)); - while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10); + while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(50); dInfo("vgId:%d, wait for vnode stream ctrl queue:%p is empty", pVnode->vgId, pVnode->pStreamCtrlQ); - while (!taosQueueEmpty(pVnode->pStreamCtrlQ)) taosMsleep(10); + while (!taosQueueEmpty(pVnode->pStreamCtrlQ)) taosMsleep(50); + + dInfo("vgId:%d, wait for vnode stream long-exec queue:%p is empty, %d remains", pVnode->vgId, + pVnode->pStreamLongExecQ, taosQueueItemSize(pVnode->pStreamLongExecQ)); + while (!taosQueueEmpty(pVnode->pStreamLongExecQ)) taosMsleep(50); dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index b398bdf242..5acd06bbda 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -150,7 +150,7 @@ static void vmProcessStreamCtrlQueue(SQueueInfo *pInfo, STaosQall* pQall, int32_ SRpcMsg *pMsg = pItem; const STraceId *trace = &pMsg->info.traceId; - dGTrace("vgId:%d, msg:%p get from vnode-ctrl-stream queue", pVnode->vgId, pMsg); + dGTrace("vgId:%d, msg:%p get from vnode-stream-ctrl queue", pVnode->vgId, pMsg); code = vnodeProcessStreamCtrlMsg(pVnode->pImpl, pMsg, pInfo); if (code != 0) { terrno = code; @@ -165,6 +165,26 @@ static void vmProcessStreamCtrlQueue(SQueueInfo *pInfo, STaosQall* pQall, int32_ } } +static void vmProcessStreamLongExecQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { + SVnodeObj *pVnode = pInfo->ahandle; + const STraceId *trace = &pMsg->info.traceId; + int32_t code = 0; + + dGTrace("vgId:%d, msg:%p get from vnode-stream long-exec queue", pVnode->vgId, pMsg); + + code = vnodeProcessStreamLongExecMsg(pVnode->pImpl, pMsg, pInfo); + if (code != 0) { + terrno = code; + dGError("vgId:%d, msg:%p failed to process stream msg %s since %s", pVnode->vgId, pMsg, TMSG_INFO(pMsg->msgType), + tstrerror(code)); + vmSendRsp(pMsg, code); + } + + dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); +} + static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnodeObj *pVnode = pInfo->ahandle; SRpcMsg *pMsg = NULL; @@ -274,9 +294,13 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp code = taosWriteQitem(pVnode->pStreamQ, pMsg); break; case STREAM_CTRL_QUEUE: - dGTrace("vgId:%d, msg:%p put into vnode-ctrl-stream queue", pVnode->vgId, pMsg); + dGTrace("vgId:%d, msg:%p put into vnode-stream-ctrl queue", pVnode->vgId, pMsg); code = taosWriteQitem(pVnode->pStreamCtrlQ, pMsg); break; + case STREAM_LONG_EXEC_QUEUE: + dGTrace("vgId:%d, msg:%p put into vnode-stream-long-exec queue", pVnode->vgId, pMsg); + code = taosWriteQitem(pVnode->pStreamLongExecQ, pMsg); + break; case FETCH_QUEUE: dGTrace("vgId:%d, msg:%p put into vnode-fetch queue", pVnode->vgId, pMsg); code = taosWriteQitem(pVnode->pFetchQ, pMsg); @@ -335,6 +359,8 @@ int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMs int32_t vmPutMsgToStreamCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_CTRL_QUEUE); } +int32_t vmPutMsgToStreamLongExecQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_LONG_EXEC_QUEUE); } + int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { const STraceId *trace = &pMsg->info.traceId; dGTrace("msg:%p, put into vnode-multi-mgmt queue", pMsg); @@ -409,6 +435,10 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { break; case STREAM_CTRL_QUEUE: size = taosQueueItemSize(pVnode->pStreamCtrlQ); + break; + case STREAM_LONG_EXEC_QUEUE: + size = taosQueueItemSize(pVnode->pStreamLongExecQ); + break; default: break; } @@ -451,13 +481,16 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { } pVnode->pQueryQ = tQueryAutoQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); - pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue); pVnode->pFetchQ = tWWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItems)vmProcessFetchQueue); + + // init stream msg processing queue family + pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue, 2); pVnode->pStreamCtrlQ = tWWorkerAllocQueue(&pMgmt->streamCtrlPool, pVnode, (FItems)vmProcessStreamCtrlQueue); + pVnode->pStreamLongExecQ = tAutoQWorkerAllocQueue(&pMgmt->streamLongExecPool, pVnode, (FItem)vmProcessStreamLongExecQueue, 1); if (pVnode->pWriteW.queue == NULL || pVnode->pSyncW.queue == NULL || pVnode->pSyncRdW.queue == NULL || pVnode->pApplyW.queue == NULL || pVnode->pQueryQ == NULL || pVnode->pStreamQ == NULL || pVnode->pFetchQ == NULL - || pVnode->pStreamCtrlQ == NULL) { + || pVnode->pStreamCtrlQ == NULL || pVnode->pStreamLongExecQ == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -473,6 +506,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { dInfo("vgId:%d, fetch-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ, taosQueueGetThreadId(pVnode->pFetchQ)); dInfo("vgId:%d, stream-queue:%p is alloced", pVnode->vgId, pVnode->pStreamQ); + dInfo("vgId:%d, stream-long-exec-queue:%p is alloced", pVnode->vgId, pVnode->pStreamLongExecQ); dInfo("vgId:%d, stream-ctrl-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pStreamCtrlQ, taosQueueGetThreadId(pVnode->pStreamCtrlQ)); return 0; @@ -481,17 +515,22 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { tQueryAutoQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); tAutoQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ); + tAutoQWorkerFreeQueue(&pMgmt->streamLongExecPool, pVnode->pStreamLongExecQ); tWWorkerFreeQueue(&pMgmt->streamCtrlPool, pVnode->pStreamCtrlQ); tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); pVnode->pQueryQ = NULL; + pVnode->pFetchQ = NULL; + pVnode->pStreamQ = NULL; pVnode->pStreamCtrlQ = NULL; - pVnode->pFetchQ = NULL; + pVnode->pStreamLongExecQ = NULL; + dDebug("vgId:%d, queue is freed", pVnode->vgId); } int32_t vmStartWorker(SVnodeMgmt *pMgmt) { - int32_t code = 0; + int32_t code = 0; + SQueryAutoQWorkerPool *pQPool = &pMgmt->queryPool; pQPool->name = "vnode-query"; pQPool->min = tsNumOfVnodeQueryThreads; @@ -505,8 +544,13 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { pStreamPool->ratio = tsRatioOfVnodeStreamThreads; if ((code = tAutoQWorkerInit(pStreamPool)) != 0) return code; + SAutoQWorkerPool *pLongExecPool = &pMgmt->streamLongExecPool; + pLongExecPool->name = "vnode-stream-long-exec"; + pLongExecPool->ratio = tsRatioOfVnodeStreamThreads/3; + if ((code = tAutoQWorkerInit(pLongExecPool)) != 0) return code; + SWWorkerPool *pStreamCtrlPool = &pMgmt->streamCtrlPool; - pStreamCtrlPool->name = "vnode-ctrl-stream"; + pStreamCtrlPool->name = "vnode-stream-ctrl"; pStreamCtrlPool->max = 1; if ((code = tWWorkerInit(pStreamCtrlPool)) != 0) return code; @@ -541,6 +585,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { void vmStopWorker(SVnodeMgmt *pMgmt) { tQueryAutoQWorkerCleanup(&pMgmt->queryPool); tAutoQWorkerCleanup(&pMgmt->streamPool); + tAutoQWorkerCleanup(&pMgmt->streamLongExecPool); tWWorkerCleanup(&pMgmt->streamCtrlPool); tWWorkerCleanup(&pMgmt->fetchPool); dDebug("vnode workers are closed"); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 50d75c4838..871a8c06e1 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -113,6 +113,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); +int32_t vnodeProcessStreamLongExecMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 6190f4b0a7..f37dd94106 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -931,9 +931,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { vTrace("vgId:%d, msg:%p in stream queue is processing", pVnode->config.vgId, pMsg); - if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG || - pMsg->msgType == TDMT_VND_BATCH_META) && - !syncIsReadyForRead(pVnode->sync)) { + if (!syncIsReadyForRead(pVnode->sync)) { vnodeRedirectRpcMsg(pVnode, pMsg, terrno); return 0; } @@ -945,8 +943,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg); case TDMT_STREAM_RETRIEVE_RSP: return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg); - case TDMT_VND_STREAM_SCAN_HISTORY: - return tqProcessTaskScanHistory(pVnode->pTq, pMsg); case TDMT_VND_GET_STREAM_PROGRESS: return tqStreamProgressRetrieveReq(pVnode->pTq, pMsg); default: @@ -993,6 +989,22 @@ int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pIn } } +int32_t vnodeProcessStreamLongExecMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { + vTrace("vgId:%d, msg:%p in stream long exec queue is processing", pVnode->config.vgId, pMsg); + if (!syncIsReadyForRead(pVnode->sync)) { + vnodeRedirectRpcMsg(pVnode, pMsg, terrno); + return 0; + } + + switch (pMsg->msgType) { + case TDMT_VND_STREAM_SCAN_HISTORY: + return tqProcessTaskScanHistory(pVnode->pTq, pMsg); + default: + vError("unknown msg type:%d in stream long exec queue", pMsg->msgType); + return TSDB_CODE_APP_ERROR; + } +} + void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { int32_t code = tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data); if (code) { diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index cb91bae691..0aab1511a4 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -42,9 +42,7 @@ typedef struct SIndefOperatorInfo { } SIndefOperatorInfo; static int32_t doGenerateSourceData(SOperatorInfo* pOperator); -static SSDataBlock* doProjectOperation1(SOperatorInfo* pOperator); static int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock); -static SSDataBlock* doApplyIndefinitFunction1(SOperatorInfo* pOperator); static int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock); static int32_t setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols, SArray** pResList); static int32_t setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup, @@ -557,12 +555,6 @@ static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOp } } -SSDataBlock* doApplyIndefinitFunction1(SOperatorInfo* pOperator) { - SSDataBlock* pResBlock = NULL; - pOperator->pTaskInfo->code = doApplyIndefinitFunction(pOperator, &pResBlock); - return pResBlock; -} - int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { QRY_PARAM_CHECK(pResBlock); SIndefOperatorInfo* pIndefInfo = pOperator->info; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index ee34648a47..4e9e236507 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -875,7 +875,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { } double el = (taosGetTimestampMs() - st) / 1000.0; - if (el > 5.0) { // elapsed more than 5 sec, not occupy the CPU anymore + if (el > 2.0) { // elapsed more than 5 sec, not occupy the CPU anymore stDebug("s-task:%s occupy more than 5.0s, release the exec threads and idle for 500ms", id); streamTaskSetIdleInfo(pTask, 500); return code; diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c index 54a8929123..f8b1b5ecbc 100644 --- a/source/libs/stream/src/streamStartHistory.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -76,7 +76,7 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) { memcpy(serializedReq, &req, len); SRpcMsg rpcMsg = {.contLen = len, .pCont = serializedReq, .msgType = TDMT_VND_STREAM_SCAN_HISTORY}; - return tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg); + return tmsgPutToQueue(pTask->pMsgCb, STREAM_LONG_EXEC_QUEUE, &rpcMsg); } void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) { diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index dbd8cb159e..469f98fcf0 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -256,7 +256,7 @@ static void *tAutoQWorkerThreadFp(SQueueWorker *worker) { return NULL; } -STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp) { +STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp, int32_t minNum) { int32_t code; STaosQueue *queue; @@ -280,7 +280,10 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem int32_t queueNum = taosGetQueueNumber(pool->qset); int32_t curWorkerNum = taosArrayGetSize(pool->workers); int32_t dstWorkerNum = ceilf(queueNum * pool->ratio); - if (dstWorkerNum < 2) dstWorkerNum = 2; + + if (dstWorkerNum < minNum) { + dstWorkerNum = minNum; + } // spawn a thread to process queue while (curWorkerNum < dstWorkerNum) { From 8848ae61a42d7990d0ff4490056168ec010ddc08 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 15 Feb 2025 00:00:45 +0800 Subject: [PATCH 4/8] fix(stream): add missing release --- source/dnode/vnode/src/tqCommon/tqCommon.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 35edccd9fe..29b9141759 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -1231,6 +1231,7 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m streamMetaReleaseTask(pMeta, pHTask); } + streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; } From 465e5eabd777a255393167a6d6a1b4ce2f66e423 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 15 Feb 2025 23:20:05 +0800 Subject: [PATCH 5/8] fix(stream): drop task in synch model in write thread. --- include/libs/stream/tstream.h | 2 +- source/dnode/vnode/src/tqCommon/tqCommon.c | 27 ++++++----------- source/libs/executor/src/executor.c | 34 +++++++++++----------- source/libs/stream/src/streamCheckpoint.c | 32 ++++++++++---------- source/libs/stream/src/streamMeta.c | 26 ++++++++--------- source/libs/stream/src/streamTask.c | 4 +-- 6 files changed, 59 insertions(+), 66 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index cc12566389..1f12b8addd 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -58,7 +58,7 @@ extern "C" { #define STREAM_EXEC_T_STOP_ALL_TASKS (-5) #define STREAM_EXEC_T_RESUME_TASK (-6) #define STREAM_EXEC_T_ADD_FAILED_TASK (-7) -#define STREAM_EXEC_T_DROP_ONE_TASK (-8) +#define STREAM_EXEC_T_STOP_ONE_TASK (-8) typedef struct SStreamTask SStreamTask; typedef struct SStreamQueue SStreamQueue; diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 29b9141759..817868a1d6 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -720,40 +720,31 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, SMsgCb* cb, char* msg, in // drop the related fill-history task firstly if (hTaskId.taskId != 0 && hTaskId.streamId != 0) { - code = streamTaskSchedTask(cb, vgId, hTaskId.streamId, (int32_t)hTaskId.taskId, STREAM_EXEC_T_DROP_ONE_TASK); + tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId); + code = streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId); if (code) { - tqError("s-task:0x%x vgId:%d failed to create msg to drop rel fill-history task:0x%x, code:%s", pReq->taskId, - vgId, (int32_t)hTaskId.taskId, tstrerror(code)); - } else { - tqDebug("s-task:0x%x vgId:%d create msg to drop rel fill-history task:0x%x succ", pReq->taskId, vgId, + tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x failed", pReq->taskId, vgId, (int32_t)hTaskId.taskId); } } // drop the stream task now - code = streamTaskSchedTask(cb, vgId, pReq->streamId, pReq->taskId, STREAM_EXEC_T_DROP_ONE_TASK); + code = streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId); if (code) { - tqError("s-task:0x%x vgId:%d failed to create msg to drop task, code:%s", pReq->taskId, vgId, tstrerror(code)); - } else { - tqDebug("s-task:0x%x vgId:%d create msg to drop succ", pReq->taskId, vgId); + tqDebug("s-task:0x%x vgId:%d drop task failed", pReq->taskId, vgId); } -// code = streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId); -// if (code) { -// tqDebug("s-task:0x%x vgId:%d drop task failed", pReq->taskId, vgId); -// } - // commit the update -// int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); -// tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks); + int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); + tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks); if (streamMetaCommit(pMeta) < 0) { // persist to disk } streamMetaWUnLock(pMeta); + tqDebug("vgId:%d process drop task:0x%x completed", vgId, pReq->taskId); - tqDebug("vgId:%d process drop task:0x%x async completed", vgId, pReq->taskId); return 0; // always return success } @@ -868,7 +859,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead } else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) { code = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId); return code; - } else if (type == STREAM_EXEC_T_DROP_ONE_TASK) { + } else if (type == STREAM_EXEC_T_STOP_ONE_TASK) { code = streamMetaDropTask(pMeta, req.streamId, req.taskId); return code; } else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 3d60e4d797..f271679e37 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -982,24 +982,24 @@ int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waitingDuration) { qDebug("%s sync killed execTask, and waiting for %.2fs", GET_TASKID(pTaskInfo), waitingDuration/1000.0); setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED); - while (1) { - taosWLockLatch(&pTaskInfo->lock); - if (qTaskIsExecuting(pTaskInfo)) { // let's wait for 100 ms and try again - taosWUnLockLatch(&pTaskInfo->lock); - - taosMsleep(200); - - int64_t d = taosGetTimestampMs() - st; - if (d >= waitingDuration && waitingDuration >= 0) { - qWarn("%s waiting more than %.2fs, not wait anymore", GET_TASKID(pTaskInfo), waitingDuration/1000.0); - return TSDB_CODE_SUCCESS; - } - } else { // not running now - pTaskInfo->code = rspCode; - taosWUnLockLatch(&pTaskInfo->lock); +// while (1) { +// taosWLockLatch(&pTaskInfo->lock); +// if (qTaskIsExecuting(pTaskInfo)) { // let's wait for 100 ms and try again +// taosWUnLockLatch(&pTaskInfo->lock); +// +// taosMsleep(200); +// +// int64_t d = taosGetTimestampMs() - st; +// if (d >= waitingDuration && waitingDuration >= 0) { +// qWarn("%s waiting more than %.2fs, not wait anymore", GET_TASKID(pTaskInfo), waitingDuration/1000.0); +// return TSDB_CODE_SUCCESS; +// } +// } else { // not running now +// pTaskInfo->code = rspCode; +// taosWUnLockLatch(&pTaskInfo->lock); return TSDB_CODE_SUCCESS; - } - } +// } +// } } bool qTaskIsExecuting(qTaskInfo_t qinfo) { diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index d62165a72c..a43cdd0b85 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -622,15 +622,19 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV pReq->transId); streamMutexUnlock(&pTask->lock); - { // destroy the related fill-history tasks - // drop task should not in the meta-lock, and drop the related fill-history task now - if (pReq->dropRelHTask) { - code = streamTaskSchedTask(pTask->pMsgCb, vgId, pReq->hStreamId, pReq->hTaskId, STREAM_EXEC_T_DROP_ONE_TASK); - if (code) { - stError("s-task:%s failed to create msg to drop related fill-history task, code:%s", id, tstrerror(code)); - } - } - } + { // destroy the related fill-history tasks + // drop task should not in the meta-lock, and drop the related fill-history task now + if (pReq->dropRelHTask) { + code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); + int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); + stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d", + id, vgId, pReq->taskId, numOfTasks); + } + + if (pReq->dropRelHTask) { + code = streamMetaCommit(pMeta); + } + } // always return true return TSDB_CODE_SUCCESS; @@ -704,12 +708,10 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV // drop task should not in the meta-lock, and drop the related fill-history task now if (pReq->dropRelHTask) { - code = streamTaskSchedTask(pTask->pMsgCb, vgId, pReq->hStreamId, pReq->hTaskId, STREAM_EXEC_T_DROP_ONE_TASK); - if (code) { - stError("s-task:%s failed to create msg to drop related fill-history task, code:%s", id, tstrerror(code)); - } else { - stDebug("s-task:%s vgId:%d create msg to drop related fill-history task:0x%x", id, vgId, (int32_t)pReq->hTaskId); - } + code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); + int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); + stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId, + (int32_t)pReq->hTaskId, numOfTasks); } code = streamMetaCommit(pMeta); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 5ec7ed7761..c7e0567d20 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -965,7 +965,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t return 0; } -int32_t streamMetaDropTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { +int32_t streamMetaStopTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { SStreamTask* pTask = NULL; int32_t code = 0; int32_t vgId = pMeta->vgId; @@ -973,18 +973,18 @@ int32_t streamMetaDropTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) streamMetaWLock(pMeta); - code = streamMetaUnregisterTask(pMeta, streamId, taskId); - numOfTasks = streamMetaGetNumOfTasks(pMeta); - if (code) { - stError("vgId:%d failed to drop task:0x%x, code:%s", vgId, taskId, tstrerror(code)); - } - - code = streamMetaCommit(pMeta); - if (code) { - stError("vgId:%d failed to commit after drop task:0x%x, code:%s", vgId, taskId, tstrerror(code)); - } else { - stDebug("s-task:0x%"PRIx64"-0x%x vgId:%d dropped, remain tasks:%d", streamId, taskId, pMeta->vgId, numOfTasks); - } +// code = streamMetaUnregisterTask(pMeta, streamId, taskId); +// numOfTasks = streamMetaGetNumOfTasks(pMeta); +// if (code) { +// stError("vgId:%d failed to drop task:0x%x, code:%s", vgId, taskId, tstrerror(code)); +// } +// +// code = streamMetaCommit(pMeta); +// if (code) { +// stError("vgId:%d failed to commit after drop task:0x%x, code:%s", vgId, taskId, tstrerror(code)); +// } else { +// stDebug("s-task:0x%"PRIx64"-0x%x vgId:%d dropped, remain tasks:%d", streamId, taskId, pMeta->vgId, numOfTasks); +// } streamMetaWUnLock(pMeta); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index f0b288f096..e4e8a37b37 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -1287,8 +1287,8 @@ const char* streamTaskGetExecType(int32_t type) { return "resume-task-from-idle"; case STREAM_EXEC_T_ADD_FAILED_TASK: return "record-start-failed-task"; - case STREAM_EXEC_T_DROP_ONE_TASK: - return "drop-one-task"; + case STREAM_EXEC_T_STOP_ONE_TASK: + return "stop-one-task"; case 0: return "exec-all-tasks"; default: From c4cd6dd62d0f75ce3e84d6b4e22d5b75f420f92d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 17 Feb 2025 10:06:09 +0800 Subject: [PATCH 6/8] fix(stream): drop task in synch model in write thread. --- include/dnode/vnode/tqCommon.h | 2 +- include/libs/executor/executor.h | 2 +- include/libs/stream/tstream.h | 2 +- source/dnode/snode/src/snode.c | 2 +- source/dnode/vnode/src/tq/tq.c | 2 +- source/dnode/vnode/src/tqCommon/tqCommon.c | 4 +- source/libs/executor/src/executor.c | 49 +++++++++++++--------- source/libs/stream/src/streamMeta.c | 4 +- source/libs/stream/src/streamStartTask.c | 1 - 9 files changed, 38 insertions(+), 30 deletions(-) diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index 69a11638b1..4d5e18520c 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -34,7 +34,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored); -int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, SMsgCb* cb, char* msg, int32_t msgLen); +int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen); int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader); int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta); int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta); diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 5b4224d575..988a650d6c 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -180,7 +180,7 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo); */ int32_t qAsyncKillTask(qTaskInfo_t tinfo, int32_t rspCode); -int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waiting); +int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waitDuration); bool qTaskIsExecuting(qTaskInfo_t qinfo); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 1f12b8addd..b7d560e060 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -760,7 +760,6 @@ int32_t streamMetaRemoveTaskInMeta(SStreamMeta* pMeta, STaskId* pKey); int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded); int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); -int32_t streamMetaDropTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask); int32_t streamMetaAcquireTaskUnsafe(SStreamMeta* pMeta, STaskId* pId, SStreamTask** pTask); @@ -800,6 +799,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta); int32_t streamMetaStartAllTasks(SStreamMeta* pMeta); int32_t streamMetaStopAllTasks(SStreamMeta* pMeta); int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); +int32_t streamMetaStopOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); bool streamMetaAllTasksReady(const SStreamMeta* pMeta); int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask); int32_t streamTaskCheckIfReqConsenChkptId(SStreamTask* pTask, int64_t ts); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 6899140af9..6eee8c510b 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -155,7 +155,7 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) { } case TDMT_STREAM_TASK_DROP: - return tqStreamTaskProcessDropReq(pSnode->pMeta, &pSnode->msgCb, pMsg->pCont, pMsg->contLen); + return tqStreamTaskProcessDropReq(pSnode->pMeta, pMsg->pCont, pMsg->contLen); case TDMT_VND_STREAM_TASK_UPDATE: return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true); case TDMT_VND_STREAM_TASK_RESET: diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index bd2a7207f2..1c0f73249c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1118,7 +1118,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { - return tqStreamTaskProcessDropReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, msg, msgLen); + return tqStreamTaskProcessDropReq(pTq->pStreamMeta, msg, msgLen); } int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen) { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 817868a1d6..76f0fd6bc6 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -688,7 +688,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve return code; } -int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, SMsgCb* cb, char* msg, int32_t msgLen) { +int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) { SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; int32_t code = 0; int32_t vgId = pMeta->vgId; @@ -860,7 +860,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead code = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId); return code; } else if (type == STREAM_EXEC_T_STOP_ONE_TASK) { - code = streamMetaDropTask(pMeta, req.streamId, req.taskId); + code = streamMetaStopOneTask(pMeta, req.streamId, req.taskId); return code; } else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while SStreamTask* pTask = NULL; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index f271679e37..1aa114a02a 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -972,34 +972,43 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) { return TSDB_CODE_SUCCESS; } -int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waitingDuration) { +int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waitDuration) { int64_t st = taosGetTimestampMs(); SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; if (pTaskInfo == NULL) { return TSDB_CODE_QRY_INVALID_QHANDLE; } - qDebug("%s sync killed execTask, and waiting for %.2fs", GET_TASKID(pTaskInfo), waitingDuration/1000.0); + if (waitDuration > 0) { + qDebug("%s sync killed execTask, and waiting for %.2fs", GET_TASKID(pTaskInfo), waitDuration/1000.0); + } else { + qDebug("%s async killed execTask", GET_TASKID(pTaskInfo)); + } + setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED); -// while (1) { -// taosWLockLatch(&pTaskInfo->lock); -// if (qTaskIsExecuting(pTaskInfo)) { // let's wait for 100 ms and try again -// taosWUnLockLatch(&pTaskInfo->lock); -// -// taosMsleep(200); -// -// int64_t d = taosGetTimestampMs() - st; -// if (d >= waitingDuration && waitingDuration >= 0) { -// qWarn("%s waiting more than %.2fs, not wait anymore", GET_TASKID(pTaskInfo), waitingDuration/1000.0); -// return TSDB_CODE_SUCCESS; -// } -// } else { // not running now -// pTaskInfo->code = rspCode; -// taosWUnLockLatch(&pTaskInfo->lock); - return TSDB_CODE_SUCCESS; -// } -// } + if (waitDuration > 0) { + while (1) { + taosWLockLatch(&pTaskInfo->lock); + if (qTaskIsExecuting(pTaskInfo)) { // let's wait for 100 ms and try again + taosWUnLockLatch(&pTaskInfo->lock); + + taosMsleep(200); + + int64_t d = taosGetTimestampMs() - st; + if (d >= waitDuration && waitDuration >= 0) { + qWarn("%s waiting more than %.2fs, not wait anymore", GET_TASKID(pTaskInfo), waitDuration / 1000.0); + return TSDB_CODE_SUCCESS; + } + } else { // not running now + pTaskInfo->code = rspCode; + taosWUnLockLatch(&pTaskInfo->lock); + return TSDB_CODE_SUCCESS; + } + } + } + + return TSDB_CODE_SUCCESS; } bool qTaskIsExecuting(qTaskInfo_t qinfo) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index c7e0567d20..780a369409 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -897,7 +897,7 @@ static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) { // let's kill the query procedure within stream, to end it ASAP. if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) { - code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS, 5000); + code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS, -1); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s failed to kill task related query handle, code:%s", pTask->id.idStr, tstrerror(code)); } @@ -965,7 +965,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t return 0; } -int32_t streamMetaStopTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { +int32_t streamMetaStopOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { SStreamTask* pTask = NULL; int32_t code = 0; int32_t vgId = pMeta->vgId; diff --git a/source/libs/stream/src/streamStartTask.c b/source/libs/stream/src/streamStartTask.c index 9c16ff036e..d9ca506849 100644 --- a/source/libs/stream/src/streamStartTask.c +++ b/source/libs/stream/src/streamStartTask.c @@ -447,7 +447,6 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) { continue; } - int64_t refId = pTask->id.refId; int32_t ret = streamTaskStop(pTask); if (ret) { stError("s-task:0x%x failed to stop task, code:%s", pTaskId->taskId, tstrerror(ret)); From 870fe1c071d3b9a8e6eb6901677945ef2944834c Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Thu, 20 Feb 2025 11:52:04 +0800 Subject: [PATCH 7/8] fix(tsma):drop invalid state for tsma --- include/libs/executor/storageapi.h | 1 + include/libs/stream/streamState.h | 1 + include/libs/stream/tstreamFileState.h | 1 + source/dnode/snode/src/snodeInitApi.c | 1 + source/dnode/vnode/src/vnd/vnodeInitApi.c | 1 + source/libs/executor/src/scanoperator.c | 6 +++- .../executor/src/streamtimewindowoperator.c | 30 ++++++++++++++++++- source/libs/stream/src/streamState.c | 4 +++ source/libs/stream/src/tstreamFileState.c | 26 ++++++++++++++++ 9 files changed, 69 insertions(+), 2 deletions(-) diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 3cc2acf30f..3ed5d82f98 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -361,6 +361,7 @@ typedef struct SStateStore { bool (*streamStateCheck)(SStreamState* pState, const SWinKey* key); int32_t (*streamStateGetByPos)(SStreamState* pState, void* pos, void** pVal); void (*streamStateDel)(SStreamState* pState, const SWinKey* key); + void (*streamStateDelByGroupId)(SStreamState* pState, uint64_t groupId); void (*streamStateClear)(SStreamState* pState); void (*streamStateSetNumber)(SStreamState* pState, int32_t number, int32_t tsIdex); void (*streamStateSaveInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen); diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index b4e0087b1a..bab1d9438c 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -44,6 +44,7 @@ int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, in bool streamStateCheck(SStreamState* pState, const SWinKey* key); int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal); void streamStateDel(SStreamState* pState, const SWinKey* key); +void streamStateDelByGroupId(SStreamState* pState, uint64_t groupId); void streamStateClear(SStreamState* pState); void streamStateSetNumber(SStreamState* pState, int32_t number, int32_t tsIdex); void streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen); diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index f47c308e18..7463d4d130 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -59,6 +59,7 @@ int32_t addRowBuffIfNotExist(SStreamFileState* pFileState, void* pKey, int32_t k int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen, int32_t* pWinCode); void deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen); +void deleteRowBuffByGroupId(SStreamFileState* pFileState, uint64_t groupId); int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal); bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen); int32_t putFreeBuff(SStreamFileState* pFileState, SRowBuffPos* pPos); diff --git a/source/dnode/snode/src/snodeInitApi.c b/source/dnode/snode/src/snodeInitApi.c index 68dc981338..a35baa0092 100644 --- a/source/dnode/snode/src/snodeInitApi.c +++ b/source/dnode/snode/src/snodeInitApi.c @@ -43,6 +43,7 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateCheck = streamStateCheck; pStore->streamStateGetByPos = streamStateGetByPos; pStore->streamStateDel = streamStateDel; + pStore->streamStateDelByGroupId = streamStateDelByGroupId; pStore->streamStateClear = streamStateClear; pStore->streamStateSaveInfo = streamStateSaveInfo; pStore->streamStateGetInfo = streamStateGetInfo; diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index b8682028cf..0b6c13d2db 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -166,6 +166,7 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateCheck = streamStateCheck; pStore->streamStateGetByPos = streamStateGetByPos; pStore->streamStateDel = streamStateDel; + pStore->streamStateDelByGroupId = streamStateDelByGroupId; pStore->streamStateClear = streamStateClear; pStore->streamStateSaveInfo = streamStateSaveInfo; pStore->streamStateGetInfo = streamStateGetInfo; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 1060cbcffe..ce2a5019a6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3818,7 +3818,11 @@ FETCH_NEXT_BLOCK: int32_t deleteNum = 0; code = deletePartName(pInfo, pBlock, &deleteNum); QUERY_CHECK_CODE(code, lino, _end); - if (deleteNum == 0) goto FETCH_NEXT_BLOCK; + if (deleteNum == 0) { + printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "block recv", GET_TASKID(pTaskInfo)); + qDebug("===stream=== ignore block type 18, delete num is 0"); + goto FETCH_NEXT_BLOCK; + } } break; case STREAM_CHECKPOINT: { qError("stream check point error. msg type: STREAM_INPUT__DATA_BLOCK"); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 031d2e8bdc..5cab26b9d3 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -237,6 +237,29 @@ static void doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId) static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; } +static void doDeleteWindowByGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock) { + SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + SStreamIntervalOperatorInfo* pInfo = pOperator->info; + + SColumnInfoData* pGpIdCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX); + uint64_t* pGroupIdData = (uint64_t*)pGpIdCol->pData; + for (int32_t i = 0; i < pBlock->info.rows; i++) { + uint64_t groupId = pGroupIdData[i]; + void* pIte = NULL; + int32_t iter = 0; + while ((pIte = tSimpleHashIterate(pInfo->aggSup.pResultRowHashTable, pIte, &iter)) != NULL) { + size_t keyLen = 0; + SWinKey* pKey = tSimpleHashGetKey(pIte, &keyLen); + if (pKey->groupId == groupId) { + int32_t tmpRes = tSimpleHashIterateRemove(pInfo->aggSup.pResultRowHashTable, pKey, keyLen, &pIte, &iter); + qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes); + } + } + + pAPI->stateStore.streamStateDelByGroupId(pInfo->pState, groupId); + } +} + static int32_t doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDataBlock* pBlock, SArray* pUpWins, SSHashObj* pUpdatedMap, SHashObj* pInvalidWins) { int32_t code = TSDB_CODE_SUCCESS; @@ -5232,7 +5255,12 @@ static int32_t doStreamIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** p code = getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap); QUERY_CHECK_CODE(code, lino, _end); continue; - } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE || pBlock->info.type == STREAM_DROP_CHILD_TABLE) { + } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { + printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + (*ppRes) = pBlock; + return code; + } else if (pBlock->info.type == STREAM_DROP_CHILD_TABLE) { + doDeleteWindowByGroupId(pOperator, pBlock); printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); (*ppRes) = pBlock; return code; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 7259c0e49a..621be05e84 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -224,6 +224,10 @@ void streamStateDel(SStreamState* pState, const SWinKey* key) { deleteRowBuff(pState->pFileState, key, sizeof(SWinKey)); } +void streamStateDelByGroupId(SStreamState* pState, uint64_t groupId) { + deleteRowBuffByGroupId(pState->pFileState, groupId); +} + int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { return streamStateFillPut_rocksdb(pState, key, value, vLen); } diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index aaff58d1b4..d6dfde1ee6 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -667,6 +667,32 @@ void deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLe } } +void deleteRowBuffByGroupId(SStreamFileState* pFileState, uint64_t groupId) { + SSHashObj* pRowMap = pFileState->rowStateBuff; + void* pIte = NULL; + int32_t iter = 0; + while ((pIte = tSimpleHashIterate(pRowMap, pIte, &iter)) != NULL) { + size_t keyLen = 0; + SWinKey* pKey = tSimpleHashGetKey(pIte, &keyLen); + if (pKey->groupId == groupId) { + int32_t tmpRes = tSimpleHashIterateRemove(pRowMap, pKey, keyLen, &pIte, &iter); + qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes); + } + } + + while (1) { + SWinKey tmp = {.ts = INT64_MIN, .groupId = groupId}; + SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pFileState->pFileStore, &tmp); + SWinKey delKey = {.groupId = groupId}; + int32_t code = streamStateGetGroupKVByCur_rocksdb(pFileState->pFileStore, pCur, &delKey, NULL, 0); + if (code != TSDB_CODE_SUCCESS) { + break; + } + code = streamStateDel_rocksdb(pFileState->pFileStore, &delKey); + qTrace("%s at line %d res:%d", __func__, __LINE__, code); + } +} + static int32_t recoverSessionRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; From d20d21c3c4721cd0bff647fd6c4df7eb4bc38bdb Mon Sep 17 00:00:00 2001 From: xiao-77 Date: Thu, 20 Feb 2025 14:29:00 +0800 Subject: [PATCH 8/8] Doc(config):Add more info abort tsz compressor. --- docs/zh/14-reference/01-components/01-taosd.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/zh/14-reference/01-components/01-taosd.md b/docs/zh/14-reference/01-components/01-taosd.md index 8dc0257cfe..06897a68d6 100644 --- a/docs/zh/14-reference/01-components/01-taosd.md +++ b/docs/zh/14-reference/01-components/01-taosd.md @@ -1441,7 +1441,7 @@ charset 的有效值是 UTF-8。 - 取值范围:float/double/none - 默认值:none,表示关闭无损压缩 - 动态修改:不支持 -- 支持版本:从 v3.3.0.0 前支持 +- 支持版本:从v3.1.0.0 版本引入,v3.3.0.0 以后废弃 #### ifAdtFse - 说明:在启用 TSZ 有损压缩时,使用 FSE 算法替换 HUFFMAN 算法,FSE 算法压缩速度更快,但解压稍慢,追求压缩速度可选用此算法 @@ -1450,22 +1450,22 @@ charset 的有效值是 UTF-8。 - 最小值:0 - 最大值:1 - 动态修改:支持通过 SQL 修改,重启生效 -- 支持版本:从 v3.1.0.0 版本开始引入 +- 支持版本:从v3.1.0.0 版本引入,v3.3.0.0 以后废弃 #### maxRange - 说明:用于有损压缩设置 `内部参数` - 动态修改:支持通过 SQL 修改,重启生效 -- 支持版本:从 v3.1.0.0 版本开始引入 +- 支持版本:从v3.1.0.0 版本引入,v3.3.0.0 以后废弃 #### curRange - 说明:用于有损压缩设置 `内部参数` - 动态修改:支持通过 SQL 修改,重启生效 -- 支持版本:从 v3.1.0.0 版本开始引入 +- 支持版本:从v3.1.0.0 版本引入,v3.3.0.0 以后废弃 #### compressor - 说明:用于有损压缩设置 `内部参数` - 动态修改:支持通过 SQL 修改,重启生效 -- 支持版本:从 v3.1.0.0 版本开始引入 +- 支持版本:从v3.1.0.0 版本引入,v3.3.0.0 以后废弃 **补充说明** 1. 在 3.3.5.0 之后,所有配置参数都将被持久化到本地存储,重启数据库服务后,将默认使用持久化的配置参数列表;如果您希望继续使用 config 文件中配置的参数,需设置 forceReadConfig 为 1。