diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 124c8bac4c..8457f26967 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -804,6 +804,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) #define TSDB_CODE_STREAM_EXEC_CANCELLED TAOS_DEF_ERROR_CODE(0, 0x4102) #define TSDB_CODE_STREAM_INVALID_STATETRANS TAOS_DEF_ERROR_CODE(0, 0x4103) +#define TSDB_CODE_STREAM_TASK_IVLD_STATUS TAOS_DEF_ERROR_CODE(0, 0x4104) // TDLite #define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 28e2235f3a..4802988f18 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1164,10 +1164,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { return 0; } - if (pTask->info.fillHistory == 1) { - ASSERT(pTask->status.pauseAllowed == true); - } - streamScanHistoryData(pTask); double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0; @@ -1821,7 +1817,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { if (pMeta->updateInfo.transId != req.transId) { pMeta->updateInfo.transId = req.transId; - tqDebug("s-task:%s receive new trans to update nodeEp msg from mnode, transId:%d", pTask->id.idStr, req.transId); + tqInfo("s-task:%s receive new trans to update nodeEp msg from mnode, transId:%d", pTask->id.idStr, req.transId); // info needs to be kept till the new trans to update the nodeEp arrived. taosHashClear(pMeta->updateInfo.pTasks); } else { @@ -1903,13 +1899,13 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { pMeta->startInfo.startAllTasksFlag = 0; streamMetaWUnLock(pMeta); } else { - tqDebug("vgId:%d tasks are all updated and stopped, restart them", vgId); + tqInfo("vgId:%d tasks are all updated and stopped, restart them", vgId); terrno = 0; streamMetaWUnLock(pMeta); while (streamMetaTaskInTimer(pMeta)) { - qDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); + tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); taosMsleep(100); } @@ -1931,11 +1927,11 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { } if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { - vInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); + tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); tqResetStreamTaskStatus(pTq); tqLaunchStreamTaskAsync(pTq); } else { - vInfo("vgId:%d, follower node not start stream tasks", vgId); + tqInfo("vgId:%d, follower node not start stream tasks", vgId); } streamMetaWUnLock(pMeta); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index fd2aa47ef2..49f691c558 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -285,13 +285,14 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; + const char* id = pTask->id.idStr; SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId); if (pStreamTask == NULL) { stError( "s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed, destroy the related " "fill-history task", - pTask->id.idStr, (int32_t) pTask->streamTaskId.taskId); + id, (int32_t) pTask->streamTaskId.taskId); // 1. free it and remove fill-history task from disk meta-store streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); @@ -304,7 +305,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { streamMetaWUnLock(pMeta); return TSDB_CODE_STREAM_TASK_NOT_EXIST; } else { - stDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", pTask->id.idStr, + stDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", id, pStreamTask->id.idStr); } @@ -318,7 +319,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { } else { ASSERT(status == TASK_STATUS__READY|| status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP); streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT); - stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr); + stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, id); } // wait for the stream task to handle all in the inputQ, and to be idle @@ -328,7 +329,13 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { // In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to // start the task state transfer procedure. char* p = NULL; - streamTaskGetStatus(pStreamTask, &p); + status = streamTaskGetStatus(pStreamTask, &p); + if (status == TASK_STATUS__STOP || status == TASK_STATUS__DROPPING) { + stError("s-task:%s failed to transfer state from fill-history task:%s, status:%s", id, pStreamTask->id.idStr, p); + streamMetaReleaseTask(pMeta, pStreamTask); + return TSDB_CODE_STREAM_TASK_IVLD_STATUS; + } + if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { // update the scan data range for source task. stDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64 @@ -347,22 +354,18 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { streamTaskReleaseState(pTask); streamTaskReloadState(pStreamTask); - // 3. resume the state of stream task, after this function, the stream task will run immidately. But it can not be - // pause, since the pause allowed attribute is not set yet. - streamTaskResume(pStreamTask); // todo refactor: use streamTaskResume. + // 3. resume the state of stream task, after this function, the stream task will run immediately. + streamTaskResume(pStreamTask); - stDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); + stDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", id); // 4. free it and remove fill-history task from disk meta-store streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); // 5. save to disk - streamMetaWLock(pMeta); pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask, NULL); - streamMetaWUnLock(pMeta); - // 7. pause allowed. - streamTaskEnablePause(pStreamTask); + // 6. pause allowed. if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputq.queue->pQueue)) { SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 887e879934..76945f17a9 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -260,13 +260,9 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) { } } - pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); - while (pMeta->streamBackend == NULL) { + while ((pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId)) == NULL) { + stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId); taosMsleep(100); - pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); - if (pMeta->streamBackend == NULL) { - stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId); - } } pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index b5a4fc4008..92ab56b71b 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -665,7 +665,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SAME_COMMITTED_VALUE, "Same committed valu // stream TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist") TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_EXEC_CANCELLED, "Stream task exec cancelled") -TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INVALID_STATETRANS, "Invalid task state transfer") +TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INVALID_STATETRANS, "Invalid task state to handle event") +TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_IVLD_STATUS,"Invalid task status to proceed") // TDLite TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags")