diff --git a/include/util/taoserror.h b/include/util/taoserror.h index b89ea93c7b..55dba97862 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -807,6 +807,7 @@ int32_t* taosGetErrno(); // stream #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) #define TSDB_CODE_STREAM_EXEC_CANCELLED TAOS_DEF_ERROR_CODE(0, 0x4102) +#define TSDB_CODE_STREAM_INVALID_STATETRANS TAOS_DEF_ERROR_CODE(0, 0x4103) // TDLite #define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 7a14f8349f..e8662c74df 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1064,6 +1064,47 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms return code; } +static void doStartStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) { + const char* id = pTask->id.idStr; + int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer; + + // if it's an source task, extract the last version in wal. + SVersionRange *pRange = &pTask->dataRange.range; + + bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer); + pTask->execInfo.step2Start = taosGetTimestampMs(); + + if (done) { + qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0); + streamTaskPutTranstateIntoInputQ(pTask); + streamExecTask(pTask); // exec directly + } else { + STimeWindow* pWindow = &pTask->dataRange.window; + tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 + ", do secondary scan-history from WAL after halt the related stream task:%s", + id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, + pStreamTask->id.idStr); + ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); + + streamSetParamForStreamScannerStep2(pTask, pRange, pWindow); + + int64_t dstVer = pTask->dataRange.range.minVer; + pTask->chkInfo.nextProcessVer = dstVer; + + walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer); + tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer, + pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE); + + /*int8_t status = */streamTaskSetSchedStatusInactive(pTask); + + // now the fill-history task starts to scan data from wal files. + int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); + if (code == TSDB_CODE_SUCCESS) { + tqScanWalAsync(pTq, false); + } + } +} + // this function should be executed by only one thread int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont; @@ -1146,7 +1187,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s scan-history(step 1) ended, elapsed time:%.2fs", id, el); if (pTask->info.fillHistory) { - SVersionRange* pRange = NULL; SStreamTask* pStreamTask = NULL; // 1. get the related stream task @@ -1165,50 +1205,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); - streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT); - int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer; - - // if it's an source task, extract the last version in wal. - pRange = &pTask->dataRange.range; - bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer); - pTask->execInfo.step2Start = taosGetTimestampMs(); - - if (done) { - qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0); - streamTaskPutTranstateIntoInputQ(pTask); -// streamTaskRestoreStatus(pTask); - -// if (pTask->status.taskStatus == TASK_STATUS__PAUSE) { -// pTask->status.keepTaskStatus = TASK_STATUS__READY; -// qDebug("s-task:%s prev status is %s, update the kept status to be:%s when after step 2", id, -// streamGetTaskStatusStr(TASK_STATUS__PAUSE), streamGetTaskStatusStr(pTask->status.keepTaskStatus)); -// } - - streamExecTask(pTask); // exec directly + code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT); + if (code == TSDB_CODE_SUCCESS) { + doStartStep2(pTask, pStreamTask, pTq); } else { - STimeWindow* pWindow = &pTask->dataRange.window; - tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 - ", do secondary scan-history from WAL after halt the related stream task:%s", - id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, - pStreamTask->id.idStr); - ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); - - streamSetParamForStreamScannerStep2(pTask, pRange, pWindow); - - int64_t dstVer = pTask->dataRange.range.minVer; - pTask->chkInfo.nextProcessVer = dstVer; - - walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer); - tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer, - pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE); - - /*int8_t status = */streamTaskSetSchedStatusInactive(pTask); - - // now the fill-history task starts to scan data from wal files. - code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); - if (code == TSDB_CODE_SUCCESS) { - tqScanWalAsync(pTq, false); - } + tqError("s-task:%s failed to halt s-task:%s, not launch step2", id, pStreamTask->id.idStr); } streamMetaReleaseTask(pMeta, pStreamTask); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index e7993ac673..2d94f23009 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -59,6 +59,7 @@ int32_t tqScanWal(STQ* pTq) { } int32_t tqStartStreamTask(STQ* pTq) { + int32_t code = TSDB_CODE_SUCCESS; int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; @@ -102,12 +103,16 @@ int32_t tqStartStreamTask(STQ* pTq) { } EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; - streamTaskHandleEvent(pTask->status.pSM, event); + int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event); + if (ret != TSDB_CODE_SUCCESS) { + code = ret; + } + streamMetaReleaseTask(pMeta, pTask); } taosArrayDestroy(pTaskList); - return 0; + return code; } int32_t tqLaunchStreamTaskAsync(STQ* pTq) { diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 6c91bbf2d8..508efe9856 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -82,13 +82,6 @@ int32_t streamTaskInitStatus(SStreamTask* pTask) { return 0; } -int32_t streamTaskSetReadyForWal(SStreamTask* pTask) { - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - stDebug("s-task:%s ready for extract data from wal", pTask->id.idStr); - } - return TSDB_CODE_SUCCESS; -} - static int32_t streamTaskDoCheckpoint(SStreamTask* pTask) { stDebug("s-task:%s start to do checkpoint", pTask->id.idStr); return 0; @@ -108,9 +101,31 @@ int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) { } // todo check rsp code for handle Event:TASK_EVENT_SCANHIST_DONE -static bool isUnsupportedTransform(ETaskStatus state, const EStreamTaskEvent event) { - if (state == TASK_STATUS__STOP || state == TASK_STATUS__DROPPING || state == TASK_STATUS__UNINIT) { - if (event == TASK_EVENT_SCANHIST_DONE || event == TASK_EVENT_CHECKPOINT_DONE || event == TASK_EVENT_GEN_CHECKPOINT) { +static bool isInvalidStateTransfer(ETaskStatus state, const EStreamTaskEvent event) { + if (event == TASK_EVENT_INIT_STREAM_SCANHIST || event == TASK_EVENT_INIT || event == TASK_EVENT_INIT_SCANHIST) { + return (state != TASK_STATUS__UNINIT); + } + + if (event == TASK_EVENT_SCANHIST_DONE) { + return (state != TASK_STATUS__SCAN_HISTORY && state != TASK_STATUS__STREAM_SCAN_HISTORY); + } + + if (event == TASK_EVENT_GEN_CHECKPOINT) { + return (state != TASK_STATUS__READY); + } + + if (event == TASK_EVENT_CHECKPOINT_DONE) { + return (state != TASK_STATUS__CK); + } + + // todo refactor later + if (event == TASK_EVENT_RESUME) { + return true; + } + + if (event == TASK_EVENT_HALT) { + if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__UNINIT || state == TASK_STATUS__STOP || + state == TASK_STATUS__SCAN_HISTORY) { return true; } } @@ -128,7 +143,7 @@ static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStream } } - if (isUnsupportedTransform(state, event)) { + if (isInvalidStateTransfer(state, event)) { return NULL; } else { ASSERT(0); @@ -219,7 +234,7 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt taosMsleep(100); } else { stDebug("s-task:%s is dropped or stopped already, not wait.", id); - return TSDB_CODE_INVALID_PARA; + return TSDB_CODE_STREAM_INVALID_STATETRANS; } } @@ -260,7 +275,7 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { if (pTrans == NULL) { stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, GET_EVT_NAME(event)); taosThreadMutexUnlock(&pTask->lock); - return TSDB_CODE_INVALID_PARA; // todo: set new error code// failed to handle the event. + return TSDB_CODE_STREAM_INVALID_STATETRANS; } if (pSM->pActiveTrans != NULL) { @@ -303,14 +318,14 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even taosThreadMutexUnlock(&pTask->lock); stDebug("s-task:%s unlockx", pTask->id.idStr); - return TSDB_CODE_INVALID_PARA; + return TSDB_CODE_STREAM_INVALID_STATETRANS; } if (pTrans->event != event) { stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", pTask->id.idStr, GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pTrans->event)); taosThreadMutexUnlock(&pTask->lock); - return TSDB_CODE_INVALID_PARA; + return TSDB_CODE_STREAM_INVALID_STATETRANS; } keepPrevInfo(pSM); @@ -469,6 +484,10 @@ void doInitStateTransferTable(void) { streamTaskKeepCurrentVerInWal, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); + trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, + streamTaskKeepCurrentVerInWal, NULL, true); + taosArrayPush(streamTaskSMTrans, &trans); + SAttachedEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT}; trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 12a37a041a..8bebe7f34e 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -670,6 +670,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT, "Replay is disabled // stream TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist") TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_EXEC_CANCELLED, "Stream task exec cancelled") +TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INVALID_STATETRANS, "Invalid task state transfer") + // TDLite TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags") TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_DIR, "Invalid TDLite open directory")