From 54ec679b5809ddd58245cb520cdbd9a8f93f61d0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 18 Oct 2023 15:34:53 +0800 Subject: [PATCH] refactor(stream): do some internal refactor. --- include/libs/stream/tstream.h | 40 +++- source/dnode/snode/src/snode.c | 27 ++- source/dnode/vnode/src/tq/tq.c | 116 +++++------ source/dnode/vnode/src/tq/tqStreamTask.c | 25 ++- source/libs/stream/inc/streamsm.h | 24 +-- source/libs/stream/src/stream.c | 2 +- source/libs/stream/src/streamDispatch.c | 9 +- source/libs/stream/src/streamExec.c | 32 +-- source/libs/stream/src/streamMeta.c | 3 +- source/libs/stream/src/streamStart.c | 235 ++++++++++++++--------- source/libs/stream/src/streamTask.c | 19 +- source/libs/stream/src/streamTaskSm.c | 187 ++++++++++++++---- 12 files changed, 465 insertions(+), 254 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 6eb1ed6e9b..41f260fdc3 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -44,11 +44,19 @@ extern "C" { #define NODE_ROLE_LEADER 0x2 #define NODE_ROLE_FOLLOWER 0x3 +#define HAS_RELATED_FILLHISTORY_TASK(_t) ((_t)->hTaskInfo.id.taskId != 0) +#define CLEAR_RELATED_FILLHISTORY_TASK(_t) \ + do { \ + (_t)->hTaskInfo.id.taskId = 0; \ + (_t)->hTaskInfo.id.streamId = 0; \ + } while (0); + typedef struct SStreamTask SStreamTask; typedef struct SStreamQueue SStreamQueue; typedef struct SStreamTaskSM SStreamTaskSM; #define SSTREAM_TASK_VER 2 + enum { STREAM_STATUS__NORMAL = 0, STREAM_STATUS__STOP, @@ -119,6 +127,21 @@ enum { STREAM_META_OK_TO_STOP = 2, }; +typedef enum EStreamTaskEvent { + TASK_EVENT_INIT = 0x1, + TASK_EVENT_INIT_SCAN_HISTORY = 0x2, + TASK_EVENT_SCANHIST_COMPLETED = 0x3, + TASK_EVENT_START = 0x4, + TASK_EVENT_STOP = 0x5, + TASK_EVENT_GEN_CHECKPOINT = 0x6, + TASK_EVENT_PAUSE = 0x7, + TASK_EVENT_RESUME = 0x8, + TASK_EVENT_HALT = 0x9, + TASK_EVENT_TRANS_STATE = 0xA, + TASK_EVENT_SCAN_TSDB = 0xB, + TASK_EVENT_SCAN_WAL = 0xC, +} EStreamTaskEvent; + typedef struct { int8_t type; } SStreamQueueItem; @@ -351,6 +374,7 @@ typedef struct SHistoryTaskInfo { int32_t tickCount; int32_t retryTimes; int32_t waitInterval; + int64_t haltVer; // offset in wal when halt the stream task } SHistoryTaskInfo; typedef struct STaskOutputInfo { @@ -692,20 +716,28 @@ bool streamTaskShouldStop(const SStreamStatus* pStatus); bool streamTaskShouldPause(const SStreamStatus* pStatus); bool streamTaskIsIdle(const SStreamTask* pTask); -void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen); +char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); +ETaskStatus streamTaskGetStatus(SStreamTask* pTask, char** pStr); -char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); +void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen); // recover and fill history void streamTaskCheckDownstream(SStreamTask* pTask); -int32_t streamTaskStartScanHistory(SStreamTask* pTask); +int32_t onNormalTaskReady(SStreamTask* pTask); +int32_t onScanhistoryTaskReady(SStreamTask* pTask); + +//int32_t streamTaskStartScanHistory(SStreamTask* pTask); int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage); int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); void streamTaskResetUpstreamStageInfo(SStreamTask* pTask); bool streamTaskAllUpstreamClosed(SStreamTask* pTask); bool streamTaskSetSchedStatusWait(SStreamTask* pTask); int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask); -int8_t streamTaskSetSchedStatusInActive(SStreamTask* pTask); +int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask); + +int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event); +int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM); +void streamTaskRestoreStatus(SStreamTask* pTask); int32_t streamTaskStop(SStreamTask* pTask); int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp, diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 6451dba2da..4015ba9c61 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -92,11 +92,13 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer } } - qInfo("snode:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64 - " child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms", + char* p = NULL; + streamTaskGetStatus(pTask, &p); + + qInfo("snode:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 + " nextProcessVer:%" PRId64 " child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms", SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, - pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), - pTask->info.fillHistory, pTask->info.triggerParam); + pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory, pTask->info.triggerParam); return 0; } @@ -174,10 +176,15 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta); taosWUnLockLatch(&pSnode->pMeta->lock); - qDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE, pTask->id.idStr, - streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks); - streamTaskCheckDownstream(pTask); + char* p = NULL; + streamTaskGetStatus(pTask, &p); + + qDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE, + pTask->id.idStr, p, numOfTasks); + + ASSERT(0); +// streamTaskCheckDownstream(pTask); return 0; } @@ -352,10 +359,10 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) { if (pTask != NULL) { rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage); streamMetaReleaseTask(pSnode->pMeta, pTask); - - const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); + char* p = NULL; + streamTaskGetStatus(pTask, &p); qDebug("s-task:%s status:%s, recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d", - pTask->id.idStr, pStatus, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); + pTask->id.idStr, p, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } else { rsp.status = TASK_DOWNSTREAM_NOT_READY; qDebug("recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d", diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1c90812d95..01dea11235 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -817,28 +817,29 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { } // sink - if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { - pTask->outputInfo.smaSink.vnode = pTq->pVnode; - pTask->outputInfo.smaSink.smaSink = smaHandleRes; - } else if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { - pTask->outputInfo.tbSink.vnode = pTq->pVnode; - pTask->outputInfo.tbSink.tbSinkFunc = tqSinkDataIntoDstTable; + STaskOutputInfo* pOutputInfo = &pTask->outputInfo; + if (pOutputInfo->type == TASK_OUTPUT__SMA) { + pOutputInfo->smaSink.vnode = pTq->pVnode; + pOutputInfo->smaSink.smaSink = smaHandleRes; + } else if (pOutputInfo->type == TASK_OUTPUT__TABLE) { + pOutputInfo->tbSink.vnode = pTq->pVnode; + pOutputInfo->tbSink.tbSinkFunc = tqSinkDataIntoDstTable; int32_t ver1 = 1; SMetaInfo info = {0}; - code = metaGetInfo(pTq->pVnode->pMeta, pTask->outputInfo.tbSink.stbUid, &info, NULL); + code = metaGetInfo(pTq->pVnode->pMeta, pOutputInfo->tbSink.stbUid, &info, NULL); if (code == TSDB_CODE_SUCCESS) { ver1 = info.skmVer; } - SSchemaWrapper* pschemaWrapper = pTask->outputInfo.tbSink.pSchemaWrapper; - pTask->outputInfo.tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1); - if (pTask->outputInfo.tbSink.pTSchema == NULL) { + SSchemaWrapper* pschemaWrapper = pOutputInfo->tbSink.pSchemaWrapper; + pOutputInfo->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1); + if (pOutputInfo->tbSink.pTSchema == NULL) { return -1; } - pTask->outputInfo.tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); - tSimpleHashSetFreeFp(pTask->outputInfo.tbSink.pTblInfo, freePtr); + pOutputInfo->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); + tSimpleHashSetFreeFp(pOutputInfo->tbSink.pTblInfo, freePtr); } if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { @@ -863,20 +864,23 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); } + char* p = NULL; + streamTaskGetStatus(pTask, &p); + if (pTask->info.fillHistory) { tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64 " child id:%d, level:%d, status:%s fill-history:%d, related stream task:0x%x trigger:%" PRId64 " ms", vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, - pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), - pTask->info.fillHistory, (int32_t)pTask->streamTaskId.taskId, pTask->info.triggerParam); + pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory, + (int32_t)pTask->streamTaskId.taskId, pTask->info.triggerParam); } else { tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64 " child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 " ms", vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, - pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), - pTask->info.fillHistory, (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.triggerParam); + pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory, + (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.triggerParam); } return 0; @@ -918,9 +922,10 @@ int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage); streamMetaReleaseTask(pMeta, pTask); - const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); + char* p = NULL; + streamTaskGetStatus(pTask, &p); tqDebug("s-task:%s status:%s, stage:%d recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), check_status:%d", - pTask->id.idStr, pStatus, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); + pTask->id.idStr, p, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } else { rsp.status = TASK_DOWNSTREAM_NOT_READY; tqDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64 @@ -1023,11 +1028,9 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms SStreamTask* p = streamMetaAcquireTask(pStreamMeta, streamId, taskId); bool restored = pTq->pVnode->restored; - if (p != NULL && restored) { - p->execInfo.init = taosGetTimestampMs(); - tqDebug("s-task:%s set the init ts:%"PRId64, p->id.idStr, p->execInfo.init); - - streamTaskCheckDownstream(p); + if (p != NULL && restored && p->info.fillHistory == 0) { + EStreamTaskEvent event = (p->hTaskInfo.id.taskId == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCAN_HISTORY; + streamTaskHandleEvent(p->status.pSM, event); } else if (!restored) { tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId); } @@ -1061,7 +1064,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // do recovery step1 const char* id = pTask->id.idStr; - const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); + char* pStatus = NULL; + streamTaskGetStatus(pTask, &pStatus); // avoid multi-thread exec while(1) { @@ -1115,7 +1119,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0; if (pTask->status.taskStatus == TASK_STATUS__PAUSE) { - int8_t status = streamTaskSetSchedStatusInActive(pTask); + int8_t status = streamTaskSetSchedStatusInactive(pTask); tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el, status); atomic_store_32(&pTask->status.inScanHistorySentinel, 0); @@ -1124,12 +1128,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } // the following procedure should be executed, no matter status is stop/pause or not - tqDebug("s-task:%s scan-history stage(step 1) ended, elapsed time:%.2fs", id, el); + tqDebug("s-task:%s scan-history(step 1) ended, elapsed time:%.2fs", id, el); if (pTask->info.fillHistory) { SVersionRange* pRange = NULL; SStreamTask* pStreamTask = NULL; - bool done = false; // 1. get the related stream task pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId); @@ -1148,10 +1151,10 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); - +#if 0 // 2. it cannot be paused, when the stream task in TASK_STATUS__SCAN_HISTORY status. Let's wait for the // stream task get ready for scan history data - while (pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { + while (streamTaskGetStatus(pStreamTask, NULL) == TASK_STATUS__SCAN_HISTORY) { tqDebug( "s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms", id, pTask->info.taskLevel, pStreamTask->id.idStr, streamGetTaskStatusStr(pStreamTask->status.taskStatus)); @@ -1209,21 +1212,26 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { taosThreadMutexUnlock(&pStreamTask->lock); break; } +#endif + + streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT); + int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer; // if it's an source task, extract the last version in wal. pRange = &pTask->dataRange.range; - done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer); + bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer); pTask->execInfo.step2Start = taosGetTimestampMs(); if (done) { qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0); streamTaskPutTranstateIntoInputQ(pTask); + streamTaskRestoreStatus(pTask); - if (pTask->status.taskStatus == TASK_STATUS__PAUSE) { - pTask->status.keepTaskStatus = TASK_STATUS__NORMAL; - qDebug("s-task:%s prev status is %s, update the kept status to be:%s when after step 2", id, - streamGetTaskStatusStr(TASK_STATUS__PAUSE), streamGetTaskStatusStr(pTask->status.keepTaskStatus)); - } +// if (pTask->status.taskStatus == TASK_STATUS__PAUSE) { +// pTask->status.keepTaskStatus = TASK_STATUS__NORMAL; +// qDebug("s-task:%s prev status is %s, update the kept status to be:%s when after step 2", id, +// streamGetTaskStatusStr(TASK_STATUS__PAUSE), streamGetTaskStatusStr(pTask->status.keepTaskStatus)); +// } streamExecTask(pTask); // exec directly } else { @@ -1243,35 +1251,29 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer, pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE); - /*int8_t status = */streamTaskSetSchedStatusInActive(pTask); - + /*int8_t status = */streamTaskSetSchedStatusInactive(pTask); +#if 0 // the fill-history task starts to process data in wal, let's set it status to be normal now if (pTask->info.fillHistory == 1 && !streamTaskShouldStop(&pTask->status)) { streamSetStatusNormal(pTask); } +#endif + streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_COMPLETED); tqScanWalAsync(pTq, false); } streamMetaReleaseTask(pMeta, pStreamTask); } else { STimeWindow* pWindow = &pTask->dataRange.window; + ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask)); - if (pTask->hTaskInfo.id.taskId == 0) { - *pWindow = (STimeWindow){INT64_MIN, INT64_MAX}; - tqDebug( - "s-task:%s scan-history in stream time window completed, no related fill-history task, reset the time " - "window:%" PRId64 " - %" PRId64, - id, pWindow->skey, pWindow->ekey); - qStreamInfoResetTimewindowFilter(pTask->exec.pExecutor); - } else { - // when related fill-history task exists, update the fill-history time window only when the - // state transfer is completed. - tqDebug( - "s-task:%s scan-history in stream time window completed, now start to handle data from WAL, start " - "ver:%" PRId64 ", window:%" PRId64 " - %" PRId64, - id, pTask->chkInfo.nextProcessVer, pWindow->skey, pWindow->ekey); - } + // Not update the fill-history time window until the state transfer is completed if the related fill-history task + // exists. + tqDebug( + "s-task:%s scan-history in stream time window completed, now start to handle data from WAL, start " + "ver:%" PRId64 ", window:%" PRId64 " - %" PRId64, + id, pTask->chkInfo.nextProcessVer, pWindow->skey, pWindow->ekey); code = streamTaskScanHistoryDataComplete(pTask); } @@ -1360,17 +1362,17 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { } SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, taskId); - if (pTask != NULL) { - // even in halt status, the data in inputQ must be processed - int8_t st = pTask->status.taskStatus; + if (pTask != NULL) { // even in halt status, the data in inputQ must be processed + char* p = NULL; + ETaskStatus st = streamTaskGetStatus(pTask, &p); if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK) { tqDebug("vgId:%d s-task:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer); streamExecTask(pTask); } else { - int8_t status = streamTaskSetSchedStatusInActive(pTask); + int8_t status = streamTaskSetSchedStatusInactive(pTask); tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId, - pTask->id.idStr, streamGetTaskStatusStr(st), status); + pTask->id.idStr, p, status); } streamMetaReleaseTask(pTq->pStreamMeta, pTask); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index b9cb22e7a4..29de14e3c3 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -99,12 +99,8 @@ int32_t tqCheckAndRunStreamTask(STQ* pTq) { continue; } - pTask->execInfo.init = taosGetTimestampMs(); - tqDebug("s-task:%s start check downstream tasks, set the init ts:%"PRId64, pTask->id.idStr, pTask->execInfo.init); - - streamSetStatusNormal(pTask); - streamTaskCheckDownstream(pTask); - + EStreamTaskEvent event = (pTask->hTaskInfo.id.taskId == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCAN_HISTORY; + streamTaskHandleEvent(pTask->status.pSM, event); streamMetaReleaseTask(pMeta, pTask); } @@ -113,8 +109,8 @@ int32_t tqCheckAndRunStreamTask(STQ* pTq) { } int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) { - int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; + int32_t vgId = pMeta->vgId; int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); if (numOfTasks == 0) { @@ -328,9 +324,11 @@ static bool taskReadyForDataFromWal(SStreamTask* pTask) { } // not in ready state, do not handle the data from wal - int32_t status = pTask->status.taskStatus; - if (status != TASK_STATUS__NORMAL) { - tqTrace("s-task:%s not ready for submit block in wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); +// int32_t status = pTask->status.taskStatus; + char* p = NULL; + int32_t status = streamTaskGetStatus(pTask, &p); + if (streamTaskGetStatus(pTask, &p) != TASK_STATUS__NORMAL) { + tqTrace("s-task:%s not ready for submit block in wal, status:%s", pTask->id.idStr, p); return false; } @@ -449,9 +447,10 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { taosThreadMutexLock(&pTask->lock); - const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); - if (pTask->status.taskStatus != TASK_STATUS__NORMAL) { - tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pStatus); + char* p = NULL; + ETaskStatus status = streamTaskGetStatus(pTask, &p); + if (status != TASK_STATUS__NORMAL) { + tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, p); taosThreadMutexUnlock(&pTask->lock); streamMetaReleaseTask(pStreamMeta, pTask); continue; diff --git a/source/libs/stream/inc/streamsm.h b/source/libs/stream/inc/streamsm.h index c2a351a297..39ea2a77f8 100644 --- a/source/libs/stream/inc/streamsm.h +++ b/source/libs/stream/inc/streamsm.h @@ -28,34 +28,26 @@ typedef struct SStreamTaskState { char* name; } SStreamTaskState; -typedef enum EStreamTaskEvent { - TASK_EVENT_INIT = 0x1, - TASK_EVENT_START = 0x2, - TASK_EVENT_STOP = 0x3, - TASK_EVENT_GEN_CHECKPOINT = 0x4, - TASK_EVENT_PAUSE = 0x5, - TASK_EVENT_RESUME = 0x6, - TASK_EVENT_HALT = 0x7, - TASK_EVENT_TRANS_STATE = 0x8, - TASK_EVENT_SCAN_TSDB = 0x9, - TASK_EVENT_SCAN_WAL = 0x10, -} EStreamTaskEvent; - typedef int32_t (*__state_trans_fn)(SStreamTask*); +typedef int32_t (*__state_trans_succ_fn)(SStreamTask*); typedef struct STaskStateTrans { + bool autoInvokeEndFn; SStreamTaskState state; EStreamTaskEvent event; SStreamTaskState next; + __state_trans_fn preAction; __state_trans_fn pAction; + __state_trans_succ_fn pSuccAction; } STaskStateTrans; struct SStreamTaskSM { - SStreamTaskState current; - SArray* pTransList; // SArray - int64_t stateTs; SStreamTask* pTask; + SArray* pTransList; // SArray STaskStateTrans* pActiveTrans; + int64_t startTs; + SStreamTaskState current; + SStreamTaskState prev; }; typedef struct SStreamEventInfo { diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 03ba796b2c..8910616f01 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -127,7 +127,7 @@ int32_t streamSchedExec(SStreamTask* pTask) { SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); if (pRunReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - /*int8_t status = */streamTaskSetSchedStatusInActive(pTask); + /*int8_t status = */streamTaskSetSchedStatusInactive(pTask); stError("failed to create msg to aunch s-task:%s, reason out of memory", pTask->id.idStr); return -1; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index a7a06dd884..39afc3c98a 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -662,8 +662,10 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) { int32_t numOfVgs = taosArrayGetSize(vgInfo); pTask->notReadyTasks = numOfVgs; + char* p = NULL; + streamTaskGetStatus(pTask, &p); stDebug("s-task:%s send scan-history data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s", pTask->id.idStr, - numOfVgs, streamGetTaskStatusStr(pTask->status.taskStatus)); + numOfVgs, p); for (int32_t i = 0; i < numOfVgs; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); req.downstreamTaskId = pVgInfo->taskId; @@ -775,8 +777,9 @@ int32_t doDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHist initRpcMsg(&msg, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, buf, tlen + sizeof(SMsgHead)); tmsgSendReq(pEpSet, &msg); - const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); - stDebug("s-task:%s status:%s dispatch scan-history finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pStatus, + char* p = NULL; + streamTaskGetStatus(pTask, &p); + stDebug("s-task:%s status:%s dispatch scan-history finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, p, pReq->downstreamTaskId, vgId); return 0; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index c49c647906..ca67e19901 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -318,12 +318,13 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { // todo. the dropping status should be append to the status after the halt completed. // It must be halted for a source stream task, since when the related scan-history-data task start scan the history // for the step 2. - int8_t status = pStreamTask->status.taskStatus; + int8_t status = streamTaskGetStatus(pStreamTask, NULL);//pStreamTask->status.taskStatus; if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { ASSERT(status == TASK_STATUS__HALT || status == TASK_STATUS__DROPPING); } else { ASSERT(status == TASK_STATUS__NORMAL); - pStreamTask->status.taskStatus = TASK_STATUS__HALT; + streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT); +// pStreamTask->status.taskStatus = TASK_STATUS__HALT; stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr); } @@ -337,9 +338,9 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { // update the scan data range for source task. stDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64 - ", status:%s, sched-status:%d", - pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN, - pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus); + ", status:normal, sched-status:%d", + pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN, + pTimeWindow->ekey, pStreamTask->status.schedStatus); } else { stDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr); } @@ -362,8 +363,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); // 5. clear the link between fill-history task and stream task info - pStreamTask->hTaskInfo.id.taskId = 0; - pStreamTask->hTaskInfo.id.streamId = 0; + CLEAR_RELATED_FILLHISTORY_TASK(pStreamTask); // 6. save to disk taosWLockLatch(&pMeta->lock); @@ -505,7 +505,7 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock code = streamTransferStateToStreamTask(pTask); if (code != TSDB_CODE_SUCCESS) { - /*int8_t status = */ streamTaskSetSchedStatusInActive(pTask); + /*int8_t status = */ streamTaskSetSchedStatusInactive(pTask); } } @@ -592,8 +592,9 @@ int32_t streamExecForAll(SStreamTask* pTask) { // todo other thread may change the status // do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed. if (type == STREAM_INPUT__CHECKPOINT) { - stDebug("s-task:%s checkpoint block received, set status:%s", pTask->id.idStr, - streamGetTaskStatusStr(pTask->status.taskStatus)); + char* p = NULL; + streamTaskGetStatus(pTask, &p); + stDebug("s-task:%s checkpoint block received, set status:%s", pTask->id.idStr, p); streamTaskBuildCheckpoint(pTask); return 0; } @@ -628,15 +629,18 @@ int32_t streamExecTask(SStreamTask* pTask) { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); taosThreadMutexUnlock(&pTask->lock); - stDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, - streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); + char* p = NULL; + streamTaskGetStatus(pTask, &p); + stDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, p, pTask->status.schedStatus); return 0; } taosThreadMutexUnlock(&pTask->lock); } } else { - stDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id, - streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); + char* p = NULL; + streamTaskGetStatus(pTask, &p); + stDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id, p, + pTask->status.schedStatus); } return 0; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index f7b0cdb0f1..2773912437 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -518,8 +518,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t } taosWUnLockLatch(&pMeta->lock); - stDebug("s-task:0x%x set task status:%s and start to unregister it", taskId, - streamGetTaskStatusStr(TASK_STATUS__DROPPING)); + stDebug("s-task:0x%x set task status:dropping and start to unregister it", taskId); while (1) { taosRLockLatch(&pMeta->lock); diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 061efc94fb..8ebc1c0094 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -17,6 +17,7 @@ #include "trpc.h" #include "ttimer.h" #include "wal.h" +#include "streamsm.h" typedef struct SLaunchHTaskInfo { SStreamMeta* pMeta; @@ -34,16 +35,17 @@ static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated); static SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); static void tryLaunchHistoryTask(void* param, void* tmrId); +static int32_t updateTaskReadyInMeta(SStreamTask* pTask); -static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) { - SStreamMeta* pMeta = pTask->pMeta; - int32_t vgId = pMeta->vgId; +int32_t streamTaskSetReady(SStreamTask* pTask) { + char* p = NULL; + int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask); + ETaskStatus status = streamTaskGetStatus(pTask, &p); - if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY && pTask->info.taskLevel != TASK_LEVEL__SOURCE) { + if (status == TASK_STATUS__SCAN_HISTORY && pTask->info.taskLevel != TASK_LEVEL__SOURCE) { pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->upstreamInfo.pList); stDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s", - pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream, - streamGetTaskStatusStr(pTask->status.taskStatus)); + pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream, p); } ASSERT(pTask->status.downstreamReady == 0); @@ -52,34 +54,10 @@ static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) { pTask->execInfo.start = taosGetTimestampMs(); int64_t el = (pTask->execInfo.start - pTask->execInfo.init); stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s", - pTask->id.idStr, numOfReqs, el, streamGetTaskStatusStr(pTask->status.taskStatus)); + pTask->id.idStr, numOfDowns, el, p); - taosWLockLatch(&pMeta->lock); - - STaskId id = streamTaskExtractKey(pTask); - taosHashPut(pMeta->startInfo.pReadyTaskSet, &id, sizeof(id), NULL, 0); - - int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); - - if (taosHashGetSize(pMeta->startInfo.pReadyTaskSet) == numOfTotal) { - STaskStartInfo* pStartInfo = &pMeta->startInfo; - pStartInfo->readyTs = pTask->execInfo.start; - - if (pStartInfo->startTs != 0) { - pStartInfo->elapsedTime = pStartInfo->readyTs - pStartInfo->startTs; - } else { - pStartInfo->elapsedTime = 0; - } - - streamMetaResetStartInfo(pStartInfo); - - stDebug("vgId:%d all %d task(s) are started successfully, last ready task:%s level:%d, startTs:%" PRId64 - ", readyTs:%" PRId64 " total elapsed time:%.2fs", - vgId, numOfTotal, pTask->id.idStr, pTask->info.taskLevel, pStartInfo->startTs, pStartInfo->readyTs, - pStartInfo->elapsedTime / 1000.0); - } - - taosWUnLockLatch(&pMeta->lock); + updateTaskReadyInMeta(pTask); + return TSDB_CODE_SUCCESS; } int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) { @@ -114,28 +92,19 @@ static int32_t doStartScanHistoryTask(SStreamTask* pTask) { } int32_t streamTaskStartScanHistory(SStreamTask* pTask) { - ASSERT(pTask->status.downstreamReady == 1); + ASSERT(pTask->status.downstreamReady == 1 && streamTaskGetStatus(pTask, NULL) == TASK_STATUS__SCAN_HISTORY); if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { - return doStartScanHistoryTask(pTask); - } else { - ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL); - stDebug("s-task:%s no need to scan-history data, status:%s, sched-status:%d, ver:%" PRId64, pTask->id.idStr, - streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus, - walReaderGetCurrentVer(pTask->exec.pWalReader)); - streamTaskEnablePause(pTask); - } + return doStartScanHistoryTask(pTask); } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { if (pTask->info.fillHistory) { streamSetParamForScanHistory(pTask); streamTaskEnablePause(pTask); } } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { - if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { - stDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr); - } + stDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr); } + return 0; } @@ -152,6 +121,8 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) { .stage = pTask->pMeta->stage, }; + ASSERT(pTask->status.downstreamReady == 0); + // serialize streamProcessScanHistoryFinishRsp if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { req.reqId = tGenIdPI64(); @@ -187,11 +158,7 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) { } } else { stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId); - - streamTaskSetReady(pTask, 0); - streamTaskSetRangeStreamCalc(pTask); - streamTaskStartScanHistory(pTask); - streamLaunchFillHistoryTask(pTask); + streamTaskOnHandleEventSuccess(pTask->status.pSM); } return 0; @@ -288,8 +255,57 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ } } -static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) { - streamTaskSetReady(pTask, numOfReqs); +int32_t onNormalTaskReady(SStreamTask* pTask) { + const char* id = pTask->id.idStr; + + streamTaskSetReady(pTask); + streamTaskSetRangeStreamCalc(pTask); + + char* p = NULL; + ETaskStatus status = streamTaskGetStatus(pTask, &p); + ASSERT(status == TASK_STATUS__NORMAL); + + // todo refactor: remove this later + if (pTask->info.fillHistory == 1) { + stDebug("s-task:%s fill-history is set normal when start it, try to remove it,set it task to be dropping", id); + pTask->status.taskStatus = TASK_STATUS__DROPPING; + ASSERT(pTask->hTaskInfo.id.taskId == 0); + } + + stDebug("s-task:%s no need to scan-history data, status:%s, sched-status:%d, ready for data from wal ver:%" PRId64, + id, p, pTask->status.schedStatus, walReaderGetCurrentVer(pTask->exec.pWalReader)); + + streamTaskEnablePause(pTask); + return TSDB_CODE_SUCCESS; +} + +int32_t onScanhistoryTaskReady(SStreamTask* pTask) { + const char* id = pTask->id.idStr; + + // set the state to be ready + streamTaskSetReady(pTask); + streamTaskSetRangeStreamCalc(pTask); + + char* p = NULL; + ETaskStatus status = streamTaskGetStatus(pTask, &p); + ASSERT(status == TASK_STATUS__SCAN_HISTORY); + + stDebug("s-task:%s enter into scan-history data stage, status:%s", id, p); + streamTaskStartScanHistory(pTask); + + // start the related fill-history task, when current task is ready + if (pTask->hTaskInfo.id.taskId != 0) { + streamLaunchFillHistoryTask(pTask); + } + + return TSDB_CODE_SUCCESS; +} + +// todo: refactor this function. +static void doProcessDownstreamReadyRsp(SStreamTask* pTask) { + streamTaskOnHandleEventSuccess(pTask->status.pSM); + +#if 0 const char* id = pTask->id.idStr; int8_t status = pTask->status.taskStatus; @@ -314,6 +330,7 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) { streamTaskEnablePause(pTask); } } +#endif } int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { @@ -349,7 +366,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs taosArrayDestroy(pTask->checkReqIds); pTask->checkReqIds = NULL; - doProcessDownstreamReadyRsp(pTask, numOfReqs); + doProcessDownstreamReadyRsp(pTask); } else { int32_t total = taosArrayGetSize(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos); stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id, @@ -361,7 +378,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs return -1; } - doProcessDownstreamReadyRsp(pTask, 1); + doProcessDownstreamReadyRsp(pTask); } } else { // not ready, wait for 100ms and retry if (pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) { @@ -438,18 +455,6 @@ int32_t streamSetStatusNormal(SStreamTask* pTask) { } } -int32_t streamSetStatusUnint(SStreamTask* pTask) { - int32_t status = atomic_load_8(&pTask->status.taskStatus); - if (status == TASK_STATUS__DROPPING) { - stError("s-task:%s cannot be set uninit, since in dropping state", pTask->id.idStr); - return -1; - } else { - stDebug("s-task:%s set task status to be uninit, prev:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); - atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__UNINIT); - return 0; - } -} - // source int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) { return qStreamSourceScanParamForHistoryScanStep1(pTask->exec.pExecutor, pVerRange, pWindow); @@ -515,9 +520,11 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory int32_t taskLevel = pTask->info.taskLevel; ASSERT(taskLevel == TASK_LEVEL__AGG || taskLevel == TASK_LEVEL__SINK); - if (pTask->status.taskStatus != TASK_STATUS__SCAN_HISTORY) { + char* p = NULL; + ETaskStatus status = streamTaskGetStatus(pTask, &p); + if (status != TASK_STATUS__SCAN_HISTORY) { stError("s-task:%s not in scan-history status, status:%s return upstream:0x%x scan-history finish directly", - pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), pReq->upstreamTaskId); + pTask->id.idStr, p, pReq->upstreamTaskId); void* pBuf = NULL; int32_t len = 0; @@ -571,12 +578,12 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory } int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) { - ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY); + ASSERT(/*pTask->status.taskStatus*/ streamTaskGetStatus(pTask, NULL) == TASK_STATUS__SCAN_HISTORY); SStreamMeta* pMeta = pTask->pMeta; // execute in the scan history complete call back msg, ready to process data from inputQ - streamSetStatusNormal(pTask); - streamTaskSetSchedStatusInActive(pTask); + streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_COMPLETED); + streamTaskSetSchedStatusInactive(pTask); taosWLockLatch(&pMeta->lock); streamMetaSaveTask(pMeta, pTask); @@ -604,15 +611,15 @@ static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { stDebug("s-task:%s set the launch condition for fill-history s-task:%s, window:%" PRId64 " - %" PRId64 - " ver range:%" PRId64 " - %" PRId64", init:%"PRId64, + " verRange:%" PRId64 " - %" PRId64", init:%"PRId64, pTask->id.idStr, pHTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer, pHTask->execInfo.init); } else { - stDebug("s-task:%s no fill history condition for non-source task:%s", pTask->id.idStr, pHTask->id.idStr); + stDebug("s-task:%s no fill-history condition for non-source task:%s", pTask->id.idStr, pHTask->id.idStr); } // check if downstream tasks have been ready - doCheckDownstreamStatus(pHTask); + streamTaskHandleEvent(pHTask->status.pSM, TASK_EVENT_INIT_SCAN_HISTORY); } static void tryLaunchHistoryTask(void* param, void* tmrId) { @@ -625,11 +632,11 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { ASSERT((*ppTask)->status.timerActive >= 1); if (streamTaskShouldStop(&(*ppTask)->status)) { - const char* pStatus = streamGetTaskStatusStr((*ppTask)->status.taskStatus); - + char* p = NULL; + streamTaskGetStatus((*ppTask), &p); int32_t ref = atomic_sub_fetch_32(&(*ppTask)->status.timerActive, 1); stDebug("s-task:%s status:%s should stop, quit launch fill-history task timer, retry:%d, ref:%d", - (*ppTask)->id.idStr, pStatus, (*ppTask)->hTaskInfo.retryTimes, ref); + (*ppTask)->id.idStr, p, (*ppTask)->hTaskInfo.retryTimes, ref); taosMemoryFree(pInfo); taosWUnLockLatch(&pMeta->lock); @@ -666,8 +673,10 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { // abort the timer if intend to stop task SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pHTaskInfo->id.streamId, pHTaskInfo->id.taskId); if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) { - const char* p = streamGetTaskStatusStr(pTask->status.taskStatus); - int32_t hTaskId = pHTaskInfo->id.taskId; + char* p = NULL; + int32_t hTaskId = pHTaskInfo->id.taskId; + + streamTaskGetStatus(pTask, &p); stDebug( "s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d", pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes); @@ -713,11 +722,8 @@ SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, in int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; int32_t hTaskId = pTask->hTaskInfo.id.taskId; - if (hTaskId == 0) { - return TSDB_CODE_SUCCESS; - } - ASSERT(pTask->status.downstreamReady == 1); + ASSERT((hTaskId != 0) && (pTask->status.downstreamReady == 1)); stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr, pTask->hTaskInfo.id.streamId, hTaskId); @@ -931,12 +937,12 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { // only the downstream tasks are ready, set the task to be ready to work. void streamTaskCheckDownstream(SStreamTask* pTask) { - if (pTask->info.fillHistory) { - stDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr); - return; - } +// if (pTask->info.fillHistory) { +// ASSERT(0); +// stDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr); +// return; +// } - ASSERT(pTask->status.downstreamReady == 0); doCheckDownstreamStatus(pTask); } @@ -1047,13 +1053,50 @@ void streamTaskEnablePause(SStreamTask* pTask) { void streamTaskResumeFromHalt(SStreamTask* pTask) { const char* id = pTask->id.idStr; - int8_t status = pTask->status.taskStatus; - if (status != TASK_STATUS__HALT) { - stError("s-task:%s not in halt status, status:%s", id, streamGetTaskStatusStr(status)); - return; + char* p = NULL; + + ASSERT(streamTaskGetStatus(pTask, NULL) == TASK_STATUS__HALT); +// int8_t status = pTask->status.taskStatus; +// if (status != TASK_STATUS__HALT) { +// stError("s-task:%s not in halt status, status:%s", id, streamGetTaskStatusStr(status)); +// return; +// } + +// pTask->status.taskStatus = pTask->status.keepTaskStatus; +// pTask->status.keepTaskStatus = TASK_STATUS__NORMAL; + streamTaskRestoreStatus(pTask); + streamTaskGetStatus(pTask, &p); + stDebug("s-task:%s resume from halt, current status:%s", id, p); +} + +int32_t updateTaskReadyInMeta(SStreamTask* pTask) { + SStreamMeta* pMeta = pTask->pMeta; + + taosWLockLatch(&pMeta->lock); + + STaskId id = streamTaskExtractKey(pTask); + taosHashPut(pMeta->startInfo.pReadyTaskSet, &id, sizeof(id), NULL, 0); + + int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); + + if (taosHashGetSize(pMeta->startInfo.pReadyTaskSet) == numOfTotal) { + STaskStartInfo* pStartInfo = &pMeta->startInfo; + pStartInfo->readyTs = pTask->execInfo.start; + + if (pStartInfo->startTs != 0) { + pStartInfo->elapsedTime = pStartInfo->readyTs - pStartInfo->startTs; + } else { + pStartInfo->elapsedTime = 0; + } + + streamMetaResetStartInfo(pStartInfo); + + stDebug("vgId:%d all %d task(s) are started successfully, last ready task:%s level:%d, startTs:%" PRId64 + ", readyTs:%" PRId64 " total elapsed time:%.2fs", + pMeta->vgId, numOfTotal, pTask->id.idStr, pTask->info.taskLevel, pStartInfo->startTs, pStartInfo->readyTs, + pStartInfo->elapsedTime / 1000.0); } - pTask->status.taskStatus = pTask->status.keepTaskStatus; - pTask->status.keepTaskStatus = TASK_STATUS__NORMAL; - stDebug("s-task:%s resume from halt, current status:%s", id, streamGetTaskStatusStr(pTask->status.taskStatus)); + taosWUnLockLatch(&pMeta->lock); + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 5898a02ea1..4b39d979b6 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -304,11 +304,11 @@ static void freeUpstreamItem(void* p) { void tFreeStreamTask(SStreamTask* pTask) { int32_t taskId = pTask->id.taskId; + char* p = NULL; + streamTaskGetStatus(pTask, &p); STaskExecStatisInfo* pStatis = &pTask->execInfo; - - stDebug("start to free s-task:0x%x, %p, state:%p, status:%s", taskId, pTask, pTask->pState, - streamGetTaskStatusStr(pTask->status.taskStatus)); + stDebug("start to free s-task:0x%x, %p, state:%p, status:%s", taskId, pTask, pTask->pState, p); stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64 ", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64 @@ -417,6 +417,13 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i return TSDB_CODE_OUT_OF_MEMORY; } + pTask->status.pSM = streamCreateStateMachine(pTask); + if (pTask->status.pSM == NULL) { + stError("s-task:%s failed create state-machine for stream task, initialization failed, code:%s", pTask->id.idStr, + tstrerror(terrno)); + return terrno; + } + pTask->execInfo.created = taosGetTimestampMs(); pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL; @@ -463,7 +470,9 @@ int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) { return 0; } else { int32_t type = pTask->outputInfo.type; - if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__TABLE) { + if (type == TASK_OUTPUT__TABLE) { + return 0; + } else if (type == TASK_OUTPUT__FIXED_DISPATCH) { return 1; } else { SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; @@ -677,7 +686,7 @@ int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask) { return status; } -int8_t streamTaskSetSchedStatusInActive(SStreamTask* pTask) { +int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) { taosThreadMutexLock(&pTask->lock); int8_t status = pTask->status.schedStatus; ASSERT(status == TASK_SCHED_STATUS__WAITING || status == TASK_SCHED_STATUS__ACTIVE || diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index c219556c6f..b4b4d026c2 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -13,7 +13,6 @@ * along with this program. If not, see . */ -#include "executor.h" #include "streamInt.h" #include "tmisce.h" #include "tstream.h" @@ -32,19 +31,42 @@ SStreamTaskState StreamTaskStatusList[8] = { {.state = TASK_STATUS__CK, .name = "checkpoint"}, }; -static STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn); -static int32_t initStateTransferTable(SStreamTaskSM* pSM); +SStreamEventInfo StreamTaskEventList[8] = { + {}, // dummy event, place holder + {.event = TASK_EVENT_INIT, .name = "initialize"}, + {.event = TASK_EVENT_INIT_SCAN_HISTORY, .name = "scan-history-initialize"}, + {.event = TASK_EVENT_SCANHIST_COMPLETED, .name = "scan-history-completed"}, +}; -static int32_t dummyFn(SStreamTask* UNUSED_PARAM(p)) { return 0; } -static int32_t streamTaskStartCheckDownstream(SStreamTask* pTask) { - stDebug("s-task:%s start to check downstream tasks", pTask->id.idStr); +static int32_t initStateTransferTable(SStreamTaskSM* pSM); +static STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, + __state_trans_fn preFn, __state_trans_fn fn, __state_trans_succ_fn succFn, + bool autoInvoke); +static int32_t streamTaskInitStatus(SStreamTask* pTask); +static int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask); + +static int32_t dummyFn(SStreamTask* UNUSED_PARAM(p)) { return TSDB_CODE_SUCCESS; } + +int32_t streamTaskInitStatus(SStreamTask* pTask) { + pTask->execInfo.init = taosGetTimestampMs(); + + stDebug("s-task:%s start init, and check downstream tasks, set the init ts:%" PRId64, pTask->id.idStr, + pTask->execInfo.init); streamTaskCheckDownstream(pTask); return 0; } + +int32_t streamTaskSetReadyForWal(SStreamTask* pTask) { + stDebug("s-task:%s ready for extract data from wal", pTask->id.idStr); + streamSetStatusNormal(pTask); // todo remove it + return TSDB_CODE_SUCCESS; +} + static int32_t streamTaskDoPause(SStreamTask* pTask) { stDebug("s-task:%s start to pause tasks", pTask->id.idStr); return 0; } + static int32_t streamTaskDoResume(SStreamTask* pTask) { stDebug("s-task:%s start to resume tasks", pTask->id.idStr); return 0; @@ -54,12 +76,32 @@ static int32_t streamTaskDoCheckpoint(SStreamTask* pTask) { return 0; } +int32_t streamTaskWaitBeforeHalt(SStreamTask* pTask) { + char* p = NULL; + while (streamTaskGetStatus(pTask, &p) != TASK_STATUS__NORMAL) { + stDebug("related stream task:%s(status:%s) not ready for halt, wait for 100ms and retry", pTask->id.idStr, p); + taosMsleep(100); + } + + return TSDB_CODE_SUCCESS; +} + +int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) { + ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask)); + + pTask->hTaskInfo.haltVer = walReaderGetCurrentVer(pTask->exec.pWalReader); + if (pTask->hTaskInfo.haltVer == -1) { + pTask->hTaskInfo.haltVer = pTask->dataRange.range.maxVer + 1; + } + return TSDB_CODE_SUCCESS; +} + // todo optimize the perf of find the trans objs by using hash table -static STaskStateTrans* streamTaskFindTransform(const SStreamTaskSM* pState, const EStreamTaskEvent* pEvent) { +static STaskStateTrans* streamTaskFindTransform(const SStreamTaskSM* pState, const EStreamTaskEvent event) { int32_t numOfTrans = taosArrayGetSize(pState->pTransList); for(int32_t i = 0; i < numOfTrans; ++i) { STaskStateTrans* pTrans = taosArrayGet(pState->pTransList, i); - if (pTrans->state.state == pState->current.state && pTrans->event == *pEvent) { + if (pTrans->state.state == pState->current.state && pTrans->event == event) { return pTrans; } } @@ -68,6 +110,20 @@ static STaskStateTrans* streamTaskFindTransform(const SStreamTaskSM* pState, con return NULL; } +void streamTaskRestoreStatus(SStreamTask* pTask) { + SStreamTaskSM* pSM = pTask->status.pSM; + taosThreadMutexLock(&pTask->lock); + ASSERT(pSM->pActiveTrans == NULL); + + SStreamTaskState state = pSM->current; + pSM->current = pSM->prev; + pSM->prev = state; + pSM->startTs = taosGetTimestampMs(); + + taosThreadMutexUnlock(&pTask->lock); + stDebug("s-task:%s restore status, %s -> %s", pTask->id.idStr, pSM->prev.name, pSM->current.name); +} + SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) { SStreamTaskSM* pSM = taosMemoryCalloc(1, sizeof(SStreamTaskSM)); if (pSM == NULL) { @@ -79,7 +135,7 @@ SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) { // set the initial state for the state-machine of stream task pSM->current = StreamTaskStatusList[TASK_STATUS__UNINIT]; - pSM->stateTs = taosGetTimestampMs(); + pSM->startTs = taosGetTimestampMs(); int32_t code = initStateTransferTable(pSM); if (code != TSDB_CODE_SUCCESS) { taosMemoryFree(pSM); @@ -88,36 +144,71 @@ SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) { return pSM; } -int32_t taskSMHandleEvent(SStreamTaskSM* pSM, const EStreamTaskEvent* pEvent) { - STaskStateTrans* pTrans = streamTaskFindTransform(pSM, pEvent); - ASSERT(pSM->pActiveTrans == NULL); - - stDebug("start to handle event:%d, state:%s", *pEvent, pSM->current.name); - pSM->pActiveTrans = pTrans; - pSM->stateTs = taosGetTimestampMs(); - return pTrans->pAction(pSM->pTask); -} - -int32_t taskSMOnHandleEventSuccess(SStreamTaskSM* pSM) { - STaskStateTrans* pTrans = pSM->pActiveTrans; - EStreamTaskEvent* pEvent = &pTrans->event; - - int64_t el = (taosGetTimestampMs() - pSM->stateTs); - stDebug("handle event:%d completed, elapsd time:%" PRId64 "ms new state:%s from %s", *pEvent, el, pTrans->next.name, +int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { + STaskStateTrans* pTrans = streamTaskFindTransform(pSM, event); + stDebug("s-task:%s start to handle event:%s, state:%s", pSM->pTask->id.idStr, StreamTaskEventList[event].name, pSM->current.name); - // todo: add lock - pSM->current = pTrans->next; + int32_t code = pTrans->preAction(pSM->pTask); + + taosThreadMutexLock(&pSM->pTask->lock); + ASSERT(pSM->pActiveTrans == NULL); + pSM->pActiveTrans = pTrans; + pSM->startTs = taosGetTimestampMs(); + taosThreadMutexUnlock(&pSM->pTask->lock); + + code = pTrans->pAction(pSM->pTask); + // todo handle error code; + + if (pTrans->autoInvokeEndFn) { + streamTaskOnHandleEventSuccess(pSM); + } + + return code; } +int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) { + STaskStateTrans* pTrans = pSM->pActiveTrans; + SStreamTask* pTask = pSM->pTask; + // do update the task status + taosThreadMutexLock(&pTask->lock); + SStreamTaskState current = pSM->current; -STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn) { + pSM->prev = pSM->current; + pSM->current = pTrans->next; + pSM->pActiveTrans = NULL; + + // on success callback, add into lock if necessary, or maybe we should add an option for this? + pTrans->pSuccAction(pTask); + taosThreadMutexUnlock(&pTask->lock); + + int64_t el = (taosGetTimestampMs() - pSM->startTs); + stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr, + StreamTaskEventList[pTrans->event].name, el, current.name, pSM->current.name); + return TSDB_CODE_SUCCESS; +} + +ETaskStatus streamTaskGetStatus(SStreamTask* pTask, char** pStr) { + SStreamTaskState s = pTask->status.pSM->current; // copy one obj in case of multi-thread environment + if (pStr != NULL) { + *pStr = s.name; + } + return s.state; +} + +STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, + __state_trans_fn preFn, __state_trans_fn fn, __state_trans_succ_fn succFn, + bool autoInvoke) { STaskStateTrans trans = {0}; trans.state = StreamTaskStatusList[current]; trans.next = StreamTaskStatusList[next]; trans.event = event; + + trans.preAction = (preFn != NULL)? preFn:dummyFn; trans.pAction = (fn != NULL)? fn : dummyFn; + trans.pSuccAction = (succFn != NULL)? succFn:dummyFn; + trans.autoInvokeEndFn = autoInvoke; return trans; } @@ -129,20 +220,50 @@ int32_t initStateTransferTable(SStreamTaskSM* pSM) { } } - STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__NORMAL, TASK_EVENT_INIT, streamTaskStartCheckDownstream); + // initialization event handle + STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__NORMAL, TASK_EVENT_INIT, NULL, + streamTaskInitStatus, onNormalTaskReady, false); taosArrayPush(pSM->pTransList, &trans); - trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, streamTaskDoPause); + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCAN_HISTORY, NULL, + streamTaskInitStatus, onScanhistoryTaskReady, false); taosArrayPush(pSM->pTransList, &trans); - trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__NORMAL, TASK_EVENT_RESUME, streamTaskDoResume); + trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__NORMAL, TASK_EVENT_SCANHIST_COMPLETED, NULL, + streamTaskSetReadyForWal, NULL, true); taosArrayPush(pSM->pTransList, &trans); - trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, streamTaskDoCheckpoint); + // pause & resume related event handle + trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, streamTaskDoPause, NULL, + true); taosArrayPush(pSM->pTransList, &trans); - trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__NORMAL, TASK_EVENT_PAUSE, NULL); + trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__NORMAL, TASK_EVENT_RESUME, NULL, streamTaskDoResume, + NULL, true); taosArrayPush(pSM->pTransList, &trans); + trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, + streamTaskDoCheckpoint, NULL, true); + taosArrayPush(pSM->pTransList, &trans); + + trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__NORMAL, TASK_EVENT_PAUSE, NULL, NULL, NULL, true); + taosArrayPush(pSM->pTransList, &trans); + + // halt stream task, from other task status + trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, NULL, + streamTaskKeepCurrentVerInWal, true); + taosArrayPush(pSM->pTransList, &trans); + + trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, streamTaskWaitBeforeHalt, + NULL, streamTaskKeepCurrentVerInWal, true); + taosArrayPush(pSM->pTransList, &trans); + + trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, streamTaskWaitBeforeHalt, NULL, + streamTaskKeepCurrentVerInWal, true); + taosArrayPush(pSM->pTransList, &trans); + + trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, NULL, + streamTaskKeepCurrentVerInWal, true); + taosArrayPush(pSM->pTransList, &trans); return 0; } \ No newline at end of file