diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f98a69097b..ffebd783ac 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1205,8 +1205,15 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) streamProcessCheckpointSourceReq(pTask, &req); taosThreadMutexUnlock(&pTask->lock); - qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", transId:%d", pTask->id.idStr, - vgId, pTask->info.taskLevel, req.checkpointId, req.transId); + if (req.mndTrigger) { + qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", transId:%d, ", pTask->id.idStr, + vgId, pTask->info.taskLevel, req.checkpointId, req.transId); + } else { + const char* pPrevStatus = streamTaskGetStatusStr(streamTaskGetPrevStatus(pTask)); + qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 + ", transId:%d after transfer-state, prev status:%s", + pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, req.transId, pPrevStatus); + } code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask, 1); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 87f63b48ed..830fbdcfff 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -130,7 +130,7 @@ int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem); void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size); const char* streamQueueItemGetTypeStr(int32_t type); SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); -int32_t streamTransferStateToStreamTask(SStreamTask* pTask); +int32_t streamTransferStatePrepare(SStreamTask* pTask); SStreamQueue* streamQueueOpen(int64_t cap); void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 607e31bfe6..f58c72eded 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -300,6 +300,8 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { taosThreadMutexLock(&p->lock); SStreamTaskState* pStatus = streamTaskGetStatus(p); + ETaskStatus prevStatus = pStatus->state; + if (pStatus->state == TASK_STATUS__CK) { ASSERT(pCKInfo->checkpointId <= pCKInfo->checkpointingId && pCKInfo->checkpointingId == checkpointId && pCKInfo->checkpointVer <= pCKInfo->processedVer); @@ -325,8 +327,9 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { } stDebug("vgId:%d s-task:%s level:%d open upstream inputQ, save status after checkpoint, checkpointId:%" PRId64 - ", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status: normal, prev:%s", - vgId, id, p->info.taskLevel, checkpointId, pCKInfo->checkpointVer, pCKInfo->nextProcessVer, pStatus->name); + ", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status: ready, prev:%s", + vgId, id, p->info.taskLevel, checkpointId, pCKInfo->checkpointVer, pCKInfo->nextProcessVer, + streamTaskGetStatusStr(prevStatus)); // save the task if not sink task if (p->info.taskLevel <= TASK_LEVEL__SINK) { @@ -437,9 +440,11 @@ int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) { if (type == UPLOAD_DISABLE) { return 0; } + if (pTask == NULL || pTask->pBackend == NULL) { return 0; } + SAsyncUploadArg* arg = taosMemoryCalloc(1, sizeof(SAsyncUploadArg)); arg->type = type; arg->taskId = taosStrdup(taskId); @@ -448,16 +453,19 @@ int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) { return streamMetaAsyncExec(pTask->pMeta, doUploadChkp, arg, NULL); } -int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { - int32_t code = TSDB_CODE_SUCCESS; - int64_t startTs = pTask->chkInfo.startTs; - int64_t ckId = pTask->chkInfo.checkpointingId; - const char* id = pTask->id.idStr; - bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT); - // sink task do not need to save the status, and generated the checkpoint +int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { + int32_t code = TSDB_CODE_SUCCESS; + int64_t startTs = pTask->chkInfo.startTs; + int64_t ckId = pTask->chkInfo.checkpointingId; + const char* id = pTask->id.idStr; + bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT); + SStreamMeta* pMeta = pTask->pMeta; + + // sink task does not need to save the status, and generated the checkpoint if (pTask->info.taskLevel != TASK_LEVEL__SINK) { stDebug("s-task:%s level:%d start gen checkpoint, checkpointId:%" PRId64, id, pTask->info.taskLevel, ckId); + code = streamBackendDoCheckpoint(pTask->pBackend, ckId); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(terrno)); @@ -500,10 +508,11 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { SStreamTaskId hTaskId = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId}; stDebug("s-task:%s fill-history finish checkpoint done, drop related fill-history task:0x%x", id, hTaskId.taskId); - streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pTask->pMeta->vgId, &hTaskId, 1); + streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &hTaskId, 1); } else { stWarn("s-task:%s related fill-history task:0x%x is erased", id, (int32_t)pTask->hTaskInfo.id.taskId); } + taosThreadMutexUnlock(&pTask->lock); } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 78b914c3db..b76195f214 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1086,10 +1086,10 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i // transtate msg has been sent to downstream successfully. let's transfer the fill-history task state if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__TRANS_STATE) { - stDebug("s-task:%s dispatch transtate msgId:%d to downstream successfully, start to transfer state", id, msgId); + stDebug("s-task:%s dispatch transtate msgId:%d to downstream successfully, start to prepare transfer state", id, msgId); ASSERT(pTask->info.fillHistory == 1); - code = streamTransferStateToStreamTask(pTask); + code = streamTransferStatePrepare(pTask); if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 27cd98aac6..fa1b508e23 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -21,7 +21,7 @@ #define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1) // 1MiB result data #define STREAM_SCAN_HISTORY_TIMESLICE 1000 // 1000 ms -static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask); +static int32_t streamTransferStateDoPrepare(SStreamTask* pTask); bool streamTaskShouldStop(const SStreamTask* pTask) { SStreamTaskState* pState = streamTaskGetStatus(pTask); @@ -316,7 +316,7 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { } } -int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { +int32_t streamTransferStateDoPrepare(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; const char* id = pTask->id.idStr; @@ -340,9 +340,9 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { } else { double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.; stDebug( - "s-task:%s fill-history task end, scan wal elapsed time:%.2fSec,update related stream task:%s info, transfer " - "exec state", - id, el, pStreamTask->id.idStr); + "s-task:%s fill-history task end, status:%s, scan wal elapsed time:%.2fSec, update related stream task:%s " + "info, prepare transfer exec state", + id, streamTaskGetStatus(pTask)->name, el, pStreamTask->id.idStr); } ETaskStatus status = streamTaskGetStatus(pStreamTask)->state; @@ -365,9 +365,6 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { } } - // wait for the stream task to handle all in the inputQ, and to be idle - waitForTaskIdle(pTask, pStreamTask); - // In case of sink tasks, no need to halt them. // In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to // start the task state transfer procedure. @@ -393,17 +390,14 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { stDebug("s-task:%s no need to update/reset filter time window for non-source tasks", pStreamTask->id.idStr); } - // 2. transfer the ownership of executor state - streamTaskReleaseState(pTask); - streamTaskReloadState(pStreamTask); - - // 3. send msg to mnode to launch a checkpoint to keep the state for current stream + // NOTE: transfer the ownership of executor state before handle the checkpoint block during stream exec + // 2. send msg to mnode to launch a checkpoint to keep the state for current stream streamTaskSendCheckpointReq(pStreamTask); - // 4. assign the status to the value that will be kept in disk + // 3. assign the status to the value that will be kept in disk pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask)->state; - // 5. open the inputQ for all upstream tasks + // 4. open the inputQ for all upstream tasks streamTaskOpenAllUpstreamInput(pStreamTask); streamMetaReleaseTask(pMeta, pStreamTask); @@ -416,7 +410,7 @@ static int32_t haltCallback(SStreamTask* pTask, void* param) { return TSDB_CODE_SUCCESS; } -int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { +int32_t streamTransferStatePrepare(SStreamTask* pTask) { int32_t code = TSDB_CODE_SUCCESS; SStreamMeta* pMeta = pTask->pMeta; @@ -424,7 +418,7 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { int32_t level = pTask->info.taskLevel; if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states. - code = streamDoTransferStateToStreamTask(pTask); + code = streamTransferStateDoPrepare(pTask); } else { // no state transfer for sink tasks, and drop fill-history task, followed by opening inputQ of sink task. SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId); @@ -540,7 +534,7 @@ int32_t streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBloc stDebug("s-task:%s non-dispatch task, level:%d start to transfer state directly", id, level); ASSERT(pTask->info.fillHistory == 1); - code = streamTransferStateToStreamTask(pTask); + code = streamTransferStatePrepare(pTask); if (code != TSDB_CODE_SUCCESS) { /*int8_t status = */ streamTaskSetSchedStatusInactive(pTask); } @@ -621,10 +615,31 @@ int32_t doStreamExecTask(SStreamTask* pTask) { } } - int64_t st = taosGetTimestampMs(); + if (type == STREAM_INPUT__CHECKPOINT) { + // transfer the state from fill-history to related stream task before generating the checkpoint. + bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT); + if (dropRelHTask) { + ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask)); - const SStreamQueueItem* pItem = pInput; - stDebug("s-task:%s start to process batch of blocks, num:%d, type:%d", id, numOfBlocks, pItem->type); + STaskId* pHTaskId = &pTask->hTaskInfo.id; + SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId); + if (pHTask != NULL) { + // 2. transfer the ownership of executor state + streamTaskReleaseState(pHTask); + streamTaskReloadState(pTask); + stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr, + streamTaskGetStatus(pHTask)->name); + + streamMetaReleaseTask(pTask->pMeta, pHTask); + } else { + stError("s-task:%s related fill-history task:0x%x failed to acquire, transfer state failed", id, + (int32_t)pHTaskId->taskId); + } + } + } + + int64_t st = taosGetTimestampMs(); + stDebug("s-task:%s start to process batch of blocks, num:%d, type:%s", id, numOfBlocks, streamQueueItemGetTypeStr(type)); int64_t ver = pTask->chkInfo.processedVer; doSetStreamInputBlock(pTask, pInput, &ver, id); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 959d1382a3..af11cc9ddb 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -791,8 +791,10 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt, bool CLEAR_RELATED_FILLHISTORY_TASK((*ppStreamTask)); if (resetRelHalt) { + stDebug("s-task:0x%" PRIx64 " set the persistent status attr to be ready, prev:%s, status in sm:%s", + sTaskId.taskId, streamTaskGetStatusStr((*ppStreamTask)->status.taskStatus), + streamTaskGetStatus(*ppStreamTask)->name); (*ppStreamTask)->status.taskStatus = TASK_STATUS__READY; - stDebug("s-task:0x%" PRIx64 " set the status to be ready", sTaskId.taskId); } streamMetaSaveTask(pMeta, *ppStreamTask);