diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 0a541a34ab..6564bcc769 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -75,7 +75,7 @@ typedef enum ETaskStatus { TASK_STATUS__HALT, // pause, but not be manipulated by user command TASK_STATUS__PAUSE, // pause TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore - TASK_STATUS_STREAM_SCAN_HISTORY, + TASK_STATUS__STREAM_SCAN_HISTORY, } ETaskStatus; enum { @@ -720,6 +720,7 @@ bool streamTaskIsIdle(const SStreamTask* pTask); char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); ETaskStatus streamTaskGetStatus(const SStreamTask* pTask, char** pStr); +const char* streamTaskGetStatusStr(ETaskStatus status); void streamTaskResetStatus(SStreamTask* pTask); void streamTaskSetStatusReady(SStreamTask* pTask); @@ -755,8 +756,6 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); // common int32_t streamRestoreParam(SStreamTask* pTask); -int32_t streamSetStatusUnint(SStreamTask* pTask); -const char* streamGetTaskStatusStr(int32_t status); void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta); void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta); void streamTaskResumeFromHalt(SStreamTask* pTask); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 0a9e3c5336..7e254e5efb 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1183,7 +1183,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { if (pEntry->status != TASK_STATUS__READY) { mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, checkpoint msg not issued", - pEntry->id.streamId, (int32_t)pEntry->id.taskId, 0, streamGetTaskStatusStr(pEntry->status)); + pEntry->id.streamId, (int32_t)pEntry->id.taskId, 0, streamTaskGetStatusStr(pEntry->status)); ready = false; break; } @@ -1567,7 +1567,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock continue; } - const char* pStatus = streamGetTaskStatusStr(pe->status); + const char* pStatus = streamTaskGetStatusStr(pe->status); STR_TO_VARSTR(status, pStatus); // status @@ -2615,7 +2615,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { pEntry->status = p->status; if (p->status != TASK_STATUS__READY) { - mDebug("received s-task:0x%"PRIx64" not in ready status:%s", p->id.taskId, streamGetTaskStatusStr(p->status)); + mDebug("received s-task:0x%"PRIx64" not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status)); } } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 750a9d942a..8f7657f98c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1446,7 +1446,7 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); if (pTask != NULL) { // drop the related fill-history task firstly - if (pTask->hTaskInfo.id.taskId != 0) { + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { STaskId* pHTaskId = &pTask->hTaskInfo.id; streamMetaUnregisterTask(pMeta, pHTaskId->streamId, pHTaskId->taskId); tqDebug("vgId:%d drop fill-history task:0x%x dropped firstly", vgId, (int32_t)pHTaskId->taskId); @@ -1486,7 +1486,7 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg streamTaskPause(pTask, pMeta); SStreamTask* pHistoryTask = NULL; - if (pTask->hTaskInfo.id.taskId != 0) { + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { pHistoryTask = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId); if (pHistoryTask == NULL) { tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64 @@ -1868,7 +1868,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { streamTaskResetStatus(pTask); SStreamTask** ppHTask = NULL; - if (pTask->hTaskInfo.id.taskId != 0) { + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id)); if (ppHTask == NULL || *ppHTask == NULL) { tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already", diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 346f6cefcb..8c37e785dd 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -308,7 +308,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { pStreamTask->id.idStr); } - int8_t status = streamTaskGetStatus(pStreamTask, NULL); + ETaskStatus status = streamTaskGetStatus(pStreamTask, NULL); ASSERT(((status == TASK_STATUS__DROPPING) || (pStreamTask->hTaskInfo.id.taskId == pTask->id.taskId)) && pTask->status.appendTranstateBlock == true); @@ -352,7 +352,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { // 3. resume the state of stream task, after this function, the stream task will run immidately. But it can not be // pause, since the pause allowed attribute is not set yet. - streamTaskResumeFromHalt(pStreamTask); + streamTaskResumeFromHalt(pStreamTask); // todo refactor: use streamTaskResume. stDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index ee4f1e2340..206f1fcfc6 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -348,6 +348,7 @@ void streamMetaCloseImpl(void* arg) { stDebug("end to close stream meta"); } +// todo let's check the status for each task int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { void* buf = NULL; int32_t len; diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 58ea042079..66865c8e25 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -42,7 +42,8 @@ int32_t streamTaskSetReady(SStreamTask* pTask) { int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask); ETaskStatus status = streamTaskGetStatus(pTask, &p); - if (status == TASK_STATUS__SCAN_HISTORY && pTask->info.taskLevel != TASK_LEVEL__SOURCE) { + if ((status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__STREAM_SCAN_HISTORY) && + pTask->info.taskLevel != TASK_LEVEL__SOURCE) { pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->upstreamInfo.pList); stDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s", pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream, p); @@ -92,16 +93,20 @@ static int32_t doStartScanHistoryTask(SStreamTask* pTask) { } int32_t streamTaskStartScanHistory(SStreamTask* pTask) { - ASSERT(pTask->status.downstreamReady == 1 && streamTaskGetStatus(pTask, NULL) == TASK_STATUS__SCAN_HISTORY); + int32_t level = pTask->info.taskLevel; + ETaskStatus status = streamTaskGetStatus(pTask, NULL); - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + ASSERT(pTask->status.downstreamReady == 1 && + ((status == TASK_STATUS__SCAN_HISTORY) || (status == TASK_STATUS__STREAM_SCAN_HISTORY))); + + if (level == TASK_LEVEL__SOURCE) { return doStartScanHistoryTask(pTask); - } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { + } else if (level == TASK_LEVEL__AGG) { if (pTask->info.fillHistory) { streamSetParamForScanHistory(pTask); streamTaskEnablePause(pTask); } - } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { + } else if (level == TASK_LEVEL__SINK) { stDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr); } @@ -292,13 +297,13 @@ int32_t onScanhistoryTaskReady(SStreamTask* pTask) { char* p = NULL; ETaskStatus status = streamTaskGetStatus(pTask, &p); - ASSERT(status == TASK_STATUS__SCAN_HISTORY); + ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__STREAM_SCAN_HISTORY); stDebug("s-task:%s enter into scan-history data stage, status:%s", id, p); streamTaskStartScanHistory(pTask); // start the related fill-history task, when current task is ready - if (pTask->hTaskInfo.id.taskId != 0) { + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { streamLaunchFillHistoryTask(pTask); } @@ -516,7 +521,7 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory char* p = NULL; ETaskStatus status = streamTaskGetStatus(pTask, &p); - if (status != TASK_STATUS__SCAN_HISTORY) { + if (status != TASK_STATUS__SCAN_HISTORY && status != TASK_STATUS__STREAM_SCAN_HISTORY) { stError("s-task:%s not in scan-history status, status:%s return upstream:0x%x scan-history finish directly", id, p, pReq->upstreamTaskId); @@ -571,7 +576,9 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory } int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) { - ASSERT(/*pTask->status.taskStatus*/ streamTaskGetStatus(pTask, NULL) == TASK_STATUS__SCAN_HISTORY); + ETaskStatus status = streamTaskGetStatus(pTask, NULL); + + ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__STREAM_SCAN_HISTORY); SStreamMeta* pMeta = pTask->pMeta; // execute in the scan history complete call back msg, ready to process data from inputQ @@ -939,9 +946,7 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { doCheckDownstreamStatus(pTask); } -// normal -> pause, pause/stop/dropping -> pause, halt -> pause, scan-history -> pause void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) { - int64_t st = taosGetTimestampMs(); #if 0 int8_t status = pTask->status.taskStatus; if (status == TASK_STATUS__DROPPING) { @@ -1013,9 +1018,7 @@ void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) { qKillTask(pExecutor, TSDB_CODE_SUCCESS); } - int64_t el = taosGetTimestampMs() - st; - stDebug("vgId:%d s-task:%s set pause flag, prev:%s, pause elapsed time:%dms", pMeta->vgId, pTask->id.idStr, - streamGetTaskStatusStr(pTask->status.keepTaskStatus), (int32_t)el); + stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr); } void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta) { diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index fdcfcfa9a9..1d12401d12 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -732,20 +732,6 @@ void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo) { pInfo->retryTimes += 1; } -const char* streamGetTaskStatusStr(int32_t status) { - switch(status) { - case TASK_STATUS__READY: return "normal"; - case TASK_STATUS__SCAN_HISTORY: return "scan-history"; - case TASK_STATUS__HALT: return "halt"; - case TASK_STATUS__PAUSE: return "paused"; - case TASK_STATUS__CK: return "check-point"; - case TASK_STATUS__DROPPING: return "dropping"; - case TASK_STATUS__STOP: return "stop"; - case TASK_STATUS__UNINIT: return "uninitialized"; - default:return ""; - } -} - void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask) { pEntry->id.streamId = pTask->id.streamId; pEntry->id.taskId = pTask->id.taskId; diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 49a434af94..addd563388 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -29,10 +29,10 @@ SStreamTaskState StreamTaskStatusList[9] = { {.state = TASK_STATUS__HALT, .name = "halt"}, {.state = TASK_STATUS__PAUSE, .name = "paused"}, {.state = TASK_STATUS__CK, .name = "checkpoint"}, - {.state = TASK_STATUS_STREAM_SCAN_HISTORY, .name = "stream-scan-history"}, + {.state = TASK_STATUS__STREAM_SCAN_HISTORY, .name = "stream-scan-history"}, }; -SStreamEventInfo StreamTaskEventList[10] = { +SStreamEventInfo StreamTaskEventList[11] = { {}, // dummy event, place holder {.event = TASK_EVENT_INIT, .name = "initialize"}, {.event = TASK_EVENT_INIT_SCANHIST, .name = "scan-history-initialize"}, @@ -43,6 +43,7 @@ SStreamEventInfo StreamTaskEventList[10] = { {.event = TASK_EVENT_PAUSE, .name = "pausing"}, {.event = TASK_EVENT_RESUME, .name = "resuming"}, {.event = TASK_EVENT_HALT, .name = "halting"}, + {.event = TASK_EVENT_DROPPING, .name = "dropping"}, }; static TdThreadOnce streamTaskStateMachineInit = PTHREAD_ONCE_INIT; @@ -173,35 +174,45 @@ void* streamDestroyStateMachine(SStreamTaskSM* pSM) { } int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { - taosThreadMutexLock(&pSM->pTask->lock); + SStreamTask* pTask = pSM->pTask; + + taosThreadMutexLock(&pTask->lock); STaskStateTrans* pTrans = streamTaskFindTransform(pSM, event); - stDebug("s-task:%s start to handle event:%s, state:%s", pSM->pTask->id.idStr, StreamTaskEventList[event].name, + stDebug("s-task:%s start to handle event:%s, state:%s", pTask->id.idStr, StreamTaskEventList[event].name, pSM->current.name); if (pTrans->attachEvent.event != 0) { - attachEvent(pSM->pTask, &pTrans->attachEvent); - taosThreadMutexUnlock(&pSM->pTask->lock); + attachEvent(pTask, &pTrans->attachEvent); + taosThreadMutexUnlock(&pTask->lock); while (1) { // wait for the task to be here - ETaskStatus s = streamTaskGetStatus(pSM->pTask, NULL); + taosThreadMutexLock(&pTask->lock); + ETaskStatus s = streamTaskGetStatus(pTask, NULL); + taosThreadMutexUnlock(&pTask->lock); + if (s == pTrans->attachEvent.status) { return TSDB_CODE_SUCCESS; } else {// this event has been handled already - stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", pSM->pTask->id.idStr, + stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", pTask->id.idStr, StreamTaskEventList[event].name); taosMsleep(100); } } } else { - ASSERT(pSM->pActiveTrans == NULL); + if (pSM->pActiveTrans != NULL) { + ASSERT(!pSM->pActiveTrans->autoInvokeEndFn); + stWarn("s-task:%s status:%s handle event:%s is interrupted, handle the new event:%s", pTask->id.idStr, + pSM->current.name, StreamTaskEventList[pSM->pActiveTrans->event].name, StreamTaskEventList[event].name); + } + pSM->pActiveTrans = pTrans; pSM->startTs = taosGetTimestampMs(); - taosThreadMutexUnlock(&pSM->pTask->lock); + taosThreadMutexUnlock(&pTask->lock); - int32_t code = pTrans->pAction(pSM->pTask); + int32_t code = pTrans->pAction(pTask); // todo handle error code; if (pTrans->autoInvokeEndFn) { @@ -273,6 +284,10 @@ ETaskStatus streamTaskGetStatus(const SStreamTask* pTask, char** pStr) { return s.state; } +const char* streamTaskGetStatusStr(ETaskStatus status) { + return StreamTaskStatusList[status].name; +} + void streamTaskResetStatus(SStreamTask* pTask) { SStreamTaskSM* pSM = pTask->status.pSM; pSM->current = StreamTaskStatusList[TASK_STATUS__UNINIT]; @@ -304,6 +319,9 @@ STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStr if (pEventInfo != NULL) { trans.attachEvent = *pEventInfo; + } else { + trans.attachEvent.event = 0; + trans.attachEvent.status = 0; } trans.pAction = (fn != NULL) ? fn : dummyFn; @@ -329,15 +347,16 @@ void doInitStateTransferTable(void) { streamTaskInitStatus, onScanhistoryTaskReady, false, false); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS_STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, streamTaskInitStatus, onScanhistoryTaskReady, false, false); taosArrayPush(streamTaskSMTrans, &trans); + // scan-history related event trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, streamTaskSetReadyForWal, NULL, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS_STREAM_SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, + trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, streamTaskSetReadyForWal, NULL, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); @@ -347,7 +366,7 @@ void doInitStateTransferTable(void) { taosArrayPush(streamTaskSMTrans, &trans); SAttachedEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT}; - trans = createStateTransform(TASK_STATUS_STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, + trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true); taosArrayPush(streamTaskSMTrans, &trans); @@ -375,7 +394,7 @@ void doInitStateTransferTable(void) { taosArrayPush(streamTaskSMTrans, &trans); info = (SAttachedEventInfo){.status = TASK_STATUS__READY, .event = TASK_EVENT_PAUSE}; - trans = createStateTransform(TASK_STATUS_STREAM_SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true); + trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true); taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true); taosArrayPush(streamTaskSMTrans, &trans); @@ -410,7 +429,7 @@ void doInitStateTransferTable(void) { taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS_STREAM_SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); // dropping related event @@ -430,6 +449,6 @@ void doInitStateTransferTable(void) { taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS_STREAM_SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); } \ No newline at end of file