From 3aed8c29ae6447cb1752eb007c3ebd0eeb3eafae Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 2 Nov 2023 17:12:20 +0800 Subject: [PATCH 1/4] refactor: set different tq level. --- source/dnode/vnode/src/tq/tq.c | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 28e2235f3a..0304a9a650 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1821,17 +1821,17 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { if (pMeta->updateInfo.transId != req.transId) { pMeta->updateInfo.transId = req.transId; - tqDebug("s-task:%s receive new trans to update nodeEp msg from mnode, transId:%d", pTask->id.idStr, req.transId); + tqInfo("s-task:%s receive new trans to update nodeEp msg from mnode, transId:%d", pTask->id.idStr, req.transId); // info needs to be kept till the new trans to update the nodeEp arrived. taosHashClear(pMeta->updateInfo.pTasks); } else { - tqDebug("s-task:%s recv trans to update nodeEp from mnode, transId:%d", pTask->id.idStr, req.transId); + tqInfo("s-task:%s recv trans to update nodeEp from mnode, transId:%d", pTask->id.idStr, req.transId); } STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId}; void* exist = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry)); if (exist != NULL) { - tqDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId, + tqInfo("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId, req.transId); rsp.code = TSDB_CODE_SUCCESS; streamMetaWUnLock(pMeta); @@ -1856,7 +1856,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { pMeta->vgId, req.taskId); CLEAR_RELATED_FILLHISTORY_TASK(pTask); } else { - tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr); + tqInfo("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr); streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList); } } @@ -1879,10 +1879,10 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { if (ppHTask != NULL) { streamTaskStop(*ppHTask); - tqDebug("s-task:%s task nodeEp update completed, streamTask and related fill-history task closed", pTask->id.idStr); + tqInfo("s-task:%s task nodeEp update completed, streamTask and related fill-history task closed", pTask->id.idStr); taosHashPut(pMeta->updateInfo.pTasks, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0); } else { - tqDebug("s-task:%s task nodeEp update completed, streamTask closed", pTask->id.idStr); + tqInfo("s-task:%s task nodeEp update completed, streamTask closed", pTask->id.idStr); } rsp.code = 0; @@ -1894,22 +1894,22 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { pMeta->startInfo.startAllTasksFlag = 1; if (updateTasks < numOfTasks) { - tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, + tqInfo("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, updateTasks, (numOfTasks - updateTasks)); streamMetaWUnLock(pMeta); } else { if (!pTq->pVnode->restored) { - tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId); + tqInfo("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId); pMeta->startInfo.startAllTasksFlag = 0; streamMetaWUnLock(pMeta); } else { - tqDebug("vgId:%d tasks are all updated and stopped, restart them", vgId); + tqInfo("vgId:%d tasks are all updated and stopped, restart them", vgId); terrno = 0; streamMetaWUnLock(pMeta); while (streamMetaTaskInTimer(pMeta)) { - qDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); + tqInfo("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); taosMsleep(100); } From 39175dc03b989ccc1a8c3463cc5bbdb1f03c1ab9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 2 Nov 2023 18:07:36 +0800 Subject: [PATCH 2/4] fix(stream): fix deadlock in transfer state. --- source/libs/stream/src/streamExec.c | 11 ++++------- source/libs/stream/src/streamMeta.c | 8 ++------ 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index fd2aa47ef2..7064e48641 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -329,6 +329,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { // start the task state transfer procedure. char* p = NULL; streamTaskGetStatus(pStreamTask, &p); + if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { // update the scan data range for source task. stDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64 @@ -347,9 +348,8 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { streamTaskReleaseState(pTask); streamTaskReloadState(pStreamTask); - // 3. resume the state of stream task, after this function, the stream task will run immidately. But it can not be - // pause, since the pause allowed attribute is not set yet. - streamTaskResume(pStreamTask); // todo refactor: use streamTaskResume. + // 3. resume the state of stream task, after this function, the stream task will run immediately. + streamTaskResume(pStreamTask); stDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); @@ -357,12 +357,9 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); // 5. save to disk - streamMetaWLock(pMeta); pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask, NULL); - streamMetaWUnLock(pMeta); - // 7. pause allowed. - streamTaskEnablePause(pStreamTask); + // 6. pause allowed. if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputq.queue->pQueue)) { SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 887e879934..76945f17a9 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -260,13 +260,9 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) { } } - pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); - while (pMeta->streamBackend == NULL) { + while ((pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId)) == NULL) { + stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId); taosMsleep(100); - pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); - if (pMeta->streamBackend == NULL) { - stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId); - } } pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); From 48aff868b6df8ef65d00042af3265046b7f2470b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 2 Nov 2023 18:16:47 +0800 Subject: [PATCH 3/4] fix(stream):update the log level. --- source/dnode/vnode/src/tq/tq.c | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0304a9a650..13dec9ca54 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1825,13 +1825,13 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { // info needs to be kept till the new trans to update the nodeEp arrived. taosHashClear(pMeta->updateInfo.pTasks); } else { - tqInfo("s-task:%s recv trans to update nodeEp from mnode, transId:%d", pTask->id.idStr, req.transId); + tqDebug("s-task:%s recv trans to update nodeEp from mnode, transId:%d", pTask->id.idStr, req.transId); } STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId}; void* exist = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry)); if (exist != NULL) { - tqInfo("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId, + tqDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId, req.transId); rsp.code = TSDB_CODE_SUCCESS; streamMetaWUnLock(pMeta); @@ -1856,7 +1856,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { pMeta->vgId, req.taskId); CLEAR_RELATED_FILLHISTORY_TASK(pTask); } else { - tqInfo("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr); + tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr); streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList); } } @@ -1879,10 +1879,10 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { if (ppHTask != NULL) { streamTaskStop(*ppHTask); - tqInfo("s-task:%s task nodeEp update completed, streamTask and related fill-history task closed", pTask->id.idStr); + tqDebug("s-task:%s task nodeEp update completed, streamTask and related fill-history task closed", pTask->id.idStr); taosHashPut(pMeta->updateInfo.pTasks, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0); } else { - tqInfo("s-task:%s task nodeEp update completed, streamTask closed", pTask->id.idStr); + tqDebug("s-task:%s task nodeEp update completed, streamTask closed", pTask->id.idStr); } rsp.code = 0; @@ -1894,12 +1894,12 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { pMeta->startInfo.startAllTasksFlag = 1; if (updateTasks < numOfTasks) { - tqInfo("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, + tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, updateTasks, (numOfTasks - updateTasks)); streamMetaWUnLock(pMeta); } else { if (!pTq->pVnode->restored) { - tqInfo("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId); + tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId); pMeta->startInfo.startAllTasksFlag = 0; streamMetaWUnLock(pMeta); } else { @@ -1909,7 +1909,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { streamMetaWUnLock(pMeta); while (streamMetaTaskInTimer(pMeta)) { - tqInfo("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); + tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); taosMsleep(100); } @@ -1931,11 +1931,11 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { } if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { - vInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); + tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); tqResetStreamTaskStatus(pTq); tqLaunchStreamTaskAsync(pTq); } else { - vInfo("vgId:%d, follower node not start stream tasks", vgId); + tqInfo("vgId:%d, follower node not start stream tasks", vgId); } streamMetaWUnLock(pMeta); From b3d57e4a2de66806af596bb37a9523d506b58fc1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 2 Nov 2023 19:12:36 +0800 Subject: [PATCH 4/4] fix(stream): not transfer state if state is not appropriate. --- include/util/taoserror.h | 1 + source/dnode/vnode/src/tq/tq.c | 4 ---- source/libs/stream/src/streamExec.c | 16 +++++++++++----- source/util/src/terror.c | 3 ++- 4 files changed, 14 insertions(+), 10 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 124c8bac4c..8457f26967 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -804,6 +804,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) #define TSDB_CODE_STREAM_EXEC_CANCELLED TAOS_DEF_ERROR_CODE(0, 0x4102) #define TSDB_CODE_STREAM_INVALID_STATETRANS TAOS_DEF_ERROR_CODE(0, 0x4103) +#define TSDB_CODE_STREAM_TASK_IVLD_STATUS TAOS_DEF_ERROR_CODE(0, 0x4104) // TDLite #define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 13dec9ca54..4802988f18 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1164,10 +1164,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { return 0; } - if (pTask->info.fillHistory == 1) { - ASSERT(pTask->status.pauseAllowed == true); - } - streamScanHistoryData(pTask); double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 7064e48641..49f691c558 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -285,13 +285,14 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; + const char* id = pTask->id.idStr; SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId); if (pStreamTask == NULL) { stError( "s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed, destroy the related " "fill-history task", - pTask->id.idStr, (int32_t) pTask->streamTaskId.taskId); + id, (int32_t) pTask->streamTaskId.taskId); // 1. free it and remove fill-history task from disk meta-store streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); @@ -304,7 +305,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { streamMetaWUnLock(pMeta); return TSDB_CODE_STREAM_TASK_NOT_EXIST; } else { - stDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", pTask->id.idStr, + stDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", id, pStreamTask->id.idStr); } @@ -318,7 +319,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { } else { ASSERT(status == TASK_STATUS__READY|| status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP); streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT); - stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr); + stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, id); } // wait for the stream task to handle all in the inputQ, and to be idle @@ -328,7 +329,12 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { // In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to // start the task state transfer procedure. char* p = NULL; - streamTaskGetStatus(pStreamTask, &p); + status = streamTaskGetStatus(pStreamTask, &p); + if (status == TASK_STATUS__STOP || status == TASK_STATUS__DROPPING) { + stError("s-task:%s failed to transfer state from fill-history task:%s, status:%s", id, pStreamTask->id.idStr, p); + streamMetaReleaseTask(pMeta, pStreamTask); + return TSDB_CODE_STREAM_TASK_IVLD_STATUS; + } if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { // update the scan data range for source task. @@ -351,7 +357,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { // 3. resume the state of stream task, after this function, the stream task will run immediately. streamTaskResume(pStreamTask); - stDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); + stDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", id); // 4. free it and remove fill-history task from disk meta-store streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index b5a4fc4008..92ab56b71b 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -665,7 +665,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SAME_COMMITTED_VALUE, "Same committed valu // stream TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist") TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_EXEC_CANCELLED, "Stream task exec cancelled") -TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INVALID_STATETRANS, "Invalid task state transfer") +TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INVALID_STATETRANS, "Invalid task state to handle event") +TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_IVLD_STATUS,"Invalid task status to proceed") // TDLite TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags")