diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index b2325e41f0..bafceb3f5f 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -82,8 +82,24 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { streamTaskResetUpstreamStageInfo(pTask); streamSetupScheduleTrigger(pTask); - qDebug("snode:%d expand stream task on snode, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", SNODE_HANDLE, - pTask->id.idStr, pTask->chkInfo.checkpointVer, pTask->info.selfChildId, pTask->info.taskLevel); + + SCheckpointInfo* pChkInfo = &pTask->chkInfo; + // checkpoint ver is the kept version, handled data should be the next version. + if (pTask->chkInfo.checkpointId != 0) { + pTask->chkInfo.currentVer = pTask->chkInfo.checkpointVer + 1; + qInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr, + pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer); + } else { + if (pTask->chkInfo.currentVer == -1) { + pTask->chkInfo.currentVer = 0; + } + } + + qInfo("snode:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64 + " child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms", + SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer, + pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), + pTask->info.fillHistory, pTask->triggerParam); return 0; } @@ -107,6 +123,10 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { goto FAIL; } + // todo fix it: send msg to mnode to rollback to an existed checkpoint, and broadcast the rollback msg to all other + // computing nodes. + pSnode->pMeta->stage = 0; + return pSnode; FAIL: diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 7ebc3bb775..43ad91aab5 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -270,7 +270,7 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR tEncoderClear(&encoder); initRpcMsg(&msg, TDMT_VND_STREAM_TASK_CHECK, buf, tlen + sizeof(SMsgHead)); - qDebug("s-task:%s (level:%d) dispatch check msg to s-task:0x%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr, + qDebug("s-task:%s (level:%d) send check msg to s-task:0x%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr, pTask->info.taskLevel, pReq->streamId, pReq->downstreamTaskId, nodeId); tmsgSendReq(pEpSet, &msg); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index c84260169f..eb1f4e6a45 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -97,8 +97,9 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { if (pTask->info.fillHistory) { streamSetParamForScanHistory(pTask); + streamTaskEnablePause(pTask); } - streamTaskEnablePause(pTask); + streamTaskScanHistoryPrepare(pTask); } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { qDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr); @@ -204,14 +205,15 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ ASSERT(pInfo != NULL); if (stage == -1) { - qDebug("s-task:%s receive msg from upstream task:0x%x, invalid stageId:%" PRId64 ", not ready", pTask->id.idStr, + qDebug("s-task:%s receive check msg from upstream task:0x%x, invalid stageId:%" PRId64 ", not ready", pTask->id.idStr, upstreamTaskId, stage); return 0; } if (pInfo->stage == -1) { pInfo->stage = stage; - qDebug("s-task:%s receive msg from upstream task:0x%x, init stage value:%"PRId64, pTask->id.idStr, upstreamTaskId, stage); + qDebug("s-task:%s receive check msg from upstream task:0x%x, init stage value:%" PRId64, pTask->id.idStr, + upstreamTaskId, stage); } if (pInfo->stage < stage) { @@ -424,14 +426,14 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory ASSERT(taskLevel == TASK_LEVEL__AGG || taskLevel == TASK_LEVEL__SINK); if (pTask->status.taskStatus != TASK_STATUS__SCAN_HISTORY) { - qError("s-task:%s not in scan-history status, return upstream:0x%x scan-history finish directly", pTask->id.idStr, - pReq->upstreamTaskId); + qError("s-task:%s not in scan-history status, status:%s return upstream:0x%x scan-history finish directly", + pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), pReq->upstreamTaskId); void* pBuf = NULL; int32_t len = 0; - SRpcMsg msg = {0}; - streamTaskBuildScanhistoryRspMsg(pTask, pReq, &pBuf, &len); + + SRpcMsg msg = {.info = *pRpcInfo}; initRpcMsg(&msg, 0, pBuf, sizeof(SMsgHead) + len); tmsgSendRsp(&msg); @@ -466,7 +468,10 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory streamTaskEnablePause(pTask); int32_t code = streamTaskScanHistoryDataComplete(pTask); } else { // for sink task, set normal - streamSetStatusNormal(pTask); + if (pTask->status.taskStatus != TASK_STATUS__PAUSE && pTask->status.taskStatus != TASK_STATUS__STOP && + pTask->status.taskStatus != TASK_STATUS__DROPPING) { + streamSetStatusNormal(pTask); + } } } else { qDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d",