diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e4bca14a9e..d46c107712 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -294,6 +294,9 @@ typedef struct SCheckpointInfo { int64_t checkpointVer; // latest checkpointId version int64_t nextProcessVer; // current offset in WAL, not serialize it int64_t failedId; // record the latest failed checkpoint id + int64_t checkpointingId; + int32_t downstreamAlignNum; + int32_t checkpointNotReadyTasks; int64_t msgVer; } SCheckpointInfo; @@ -427,9 +430,6 @@ struct SStreamTask { int64_t checkReqId; SArray* checkReqIds; // shuffle int32_t refCnt; - int64_t checkpointingId; - int32_t checkpointAlignCnt; - int32_t checkpointNotReadyTasks; int32_t transferStateAlignCnt; struct SStreamMeta* pMeta; SSHashObj* pNameMap; @@ -477,7 +477,6 @@ typedef struct SStreamMeta { SHashObj* pUpdateTaskSet; int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta int32_t numOfPausedTasks; - int32_t chkptNotReadyTasks; int64_t rid; int64_t chkpId; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4d5ddec233..2d831b9dd1 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1705,7 +1705,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) // Downstream not ready, current the stream tasks are not all ready. Ignore this checkpoint req. if (pTask->status.downstreamReady != 1) { pTask->chkInfo.failedId = req.checkpointId; // record the latest failed checkpoint id - pTask->checkpointingId = req.checkpointId; + pTask->chkInfo.checkpointingId = req.checkpointId; tqError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpoint:%" PRId64 ", set it failure", @@ -1744,10 +1744,6 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) // set the initial value for generating check point // set the mgmt epset info according to the checkout source msg from mnode, todo update mgmt epset if needed - if (pMeta->chkptNotReadyTasks == 0) { - pMeta->chkptNotReadyTasks = pMeta->numOfStreamTasks; - } - total = pMeta->numOfStreamTasks; streamMetaWUnLock(pMeta); diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 095461bd92..7e2e1d6553 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -112,7 +112,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq); -int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, SStreamTask* p, int64_t checkpointId); +int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId); int32_t streamTaskBuildCheckpoint(SStreamTask* pTask); int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index bf2c89bea4..4f6778a286 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -94,12 +94,12 @@ int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointRea static int32_t streamAlignCheckpoint(SStreamTask* pTask) { int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList); - int64_t old = atomic_val_compare_exchange_32(&pTask->checkpointAlignCnt, 0, num); + int64_t old = atomic_val_compare_exchange_32(&pTask->chkInfo.downstreamAlignNum, 0, num); if (old == 0) { stDebug("s-task:%s set initial align upstream num:%d", pTask->id.idStr, num); } - return atomic_sub_fetch_32(&pTask->checkpointAlignCnt, 1); + return atomic_sub_fetch_32(&pTask->chkInfo.downstreamAlignNum, 1); } static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType) { @@ -117,7 +117,7 @@ static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpoint } pBlock->info.type = STREAM_CHECKPOINT; - pBlock->info.version = pTask->checkpointingId; + pBlock->info.version = pTask->chkInfo.checkpointingId; pBlock->info.rows = 1; pBlock->info.childId = pTask->info.selfChildId; @@ -140,8 +140,8 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo // 1. set task status to be prepared for check point, no data are allowed to put into inputQ. streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); - pTask->checkpointingId = pReq->checkpointId; - pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask); + pTask->chkInfo.checkpointingId = pReq->checkpointId; + pTask->chkInfo.checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask); pTask->chkInfo.startTs = taosGetTimestampMs(); pTask->execInfo.checkpoint += 1; @@ -173,7 +173,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc // set task status if (streamTaskGetStatus(pTask, NULL) != TASK_STATUS__CK) { - pTask->checkpointingId = checkpointId; + pTask->chkInfo.checkpointingId = checkpointId; code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s handle checkpoint-trigger block failed, code:%s", id, tstrerror(code)); @@ -181,17 +181,6 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc } } - { // todo: remove this when the pipeline checkpoint generating is used. - SStreamMeta* pMeta = pTask->pMeta; - streamMetaWLock(pMeta); - - if (pMeta->chkptNotReadyTasks == 0) { - pMeta->chkptNotReadyTasks = pMeta->numOfStreamTasks; - } - - streamMetaWUnLock(pMeta); - } - // todo fix race condition: set the status and append checkpoint block int32_t taskLevel = pTask->info.taskLevel; if (taskLevel == TASK_LEVEL__SOURCE) { @@ -200,7 +189,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId); continueDispatchCheckpointBlock(pBlock, pTask); } else { // only one task exists, no need to dispatch downstream info - atomic_add_fetch_32(&pTask->checkpointNotReadyTasks, 1); + atomic_add_fetch_32(&pTask->chkInfo.checkpointNotReadyTasks, 1); streamProcessCheckpointReadyMsg(pTask); streamFreeQitem((SStreamQueueItem*)pBlock); } @@ -235,7 +224,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc // set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this task // can start local checkpoint procedure - pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask); + pTask->chkInfo.checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask); // Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task // already. And then, dispatch check point msg to all downstream tasks @@ -254,7 +243,7 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG); // only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task - int32_t notReady = atomic_sub_fetch_32(&pTask->checkpointNotReadyTasks, 1); + int32_t notReady = atomic_sub_fetch_32(&pTask->chkInfo.checkpointNotReadyTasks, 1); ASSERT(notReady >= 0); if (notReady == 0) { @@ -270,35 +259,27 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask) { } void streamTaskClearCheckInfo(SStreamTask* pTask) { - pTask->checkpointingId = 0; // clear the checkpoint id + pTask->chkInfo.checkpointingId = 0; // clear the checkpoint id pTask->chkInfo.failedId = 0; pTask->chkInfo.startTs = 0; // clear the recorded start time - pTask->checkpointNotReadyTasks = 0; - pTask->checkpointAlignCnt = 0; + pTask->chkInfo.downstreamAlignNum = 0; + pTask->chkInfo.checkpointNotReadyTasks = 0; streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks } -int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, SStreamTask* p, int64_t checkpointId) { - int32_t vgId = pMeta->vgId; - int32_t code = 0; +int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { + SStreamMeta* pMeta = p->pMeta; + int32_t vgId = pMeta->vgId; + int32_t code = 0; - streamMetaWLock(pMeta); - - // for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) { - // STaskId* pId = taosArrayGet(pMeta->pTaskList, i); - // SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId)); - // if (ppTask == NULL) { - // continue; - // } - - // SStreamTask* p = *ppTask; if (p->info.fillHistory == 1) { - // continue; + return code; } - ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId); + streamMetaWLock(pMeta); + ASSERT(p->chkInfo.checkpointId < p->chkInfo.checkpointingId && p->chkInfo.checkpointingId == checkpointId); - p->chkInfo.checkpointId = p->checkpointingId; + p->chkInfo.checkpointId = p->chkInfo.checkpointingId; streamTaskClearCheckInfo(p); char* str = NULL; @@ -316,8 +297,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, SStreamTask* p, int64_t chec stDebug( "vgId:%d s-task:%s level:%d open upstream inputQ, commit task status after checkpoint completed, " "checkpointId:%" PRId64 ", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status to be normal, prev:%s", - pMeta->vgId, p->id.idStr, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.nextProcessVer, - str); + vgId, p->id.idStr, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.nextProcessVer, str); code = streamMetaCommit(pMeta); if (code < 0) { @@ -332,28 +312,21 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, SStreamTask* p, int64_t chec } int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { - int32_t code = 0; + int32_t code = TSDB_CODE_SUCCESS; // check for all tasks, and do generate the vnode-wide checkpoint data. - SStreamMeta* pMeta = pTask->pMeta; -// int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1); -// ASSERT(remain >= 0); + int64_t checkpointStartTs = pTask->chkInfo.startTs; - double el = (taosGetTimestampMs() - pTask->chkInfo.startTs) / 1000.0; -// if (remain == 0) { // all tasks are ready - stDebug("s-task:%s all downstreams are ready, ready for do checkpoint", pTask->id.idStr); - streamBackendDoCheckpoint(pTask->pBackend, pTask->checkpointingId); - streamSaveAllTaskStatus(pMeta, pTask, pTask->checkpointingId); - stInfo( - "vgId:%d vnode wide checkpoint completed, save all tasks status, last:%s, level:%d elapsed time:%.2f Sec " - "checkpointId:%" PRId64, - pMeta->vgId, pTask->id.idStr, pTask->info.taskLevel, el, pTask->checkpointingId); -// } else { -// stInfo( -// "vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, level:%d elapsed time:%.2f Sec " -// "not ready:%d/%d", -// pMeta->vgId, pTask->id.idStr, pTask->info.taskLevel, el, remain, pMeta->numOfStreamTasks); -// } + // sink task do not need to save the status, and generated the checkpoint + if (pTask->info.taskLevel != TASK_LEVEL__SINK) { + stDebug("s-task:%s level:%d start gen checkpoint", pTask->id.idStr, pTask->info.taskLevel); + streamBackendDoCheckpoint(pTask->pBackend, pTask->chkInfo.checkpointingId); + streamSaveTaskCheckpointInfo(pTask, pTask->chkInfo.checkpointingId); + } + + double el = (taosGetTimestampMs() - checkpointStartTs) / 1000.0; + stInfo("s-task:%s vgId:%d checkpointId:%" PRId64 " save all tasks status, level:%d elapsed time:%.2f Sec ", + pTask->id.idStr, pTask->pMeta->vgId, pTask->chkInfo.checkpointingId, pTask->info.taskLevel, el); // send check point response to upstream task if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { @@ -364,17 +337,16 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { if (code != TSDB_CODE_SUCCESS) { // record the failure checkpoint id - pTask->chkInfo.failedId = pTask->checkpointingId; + pTask->chkInfo.failedId = pTask->chkInfo.checkpointingId; // todo: let's retry send rsp to upstream/mnode stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", pTask->id.idStr, - pTask->checkpointingId, tstrerror(code)); + pTask->chkInfo.checkpointingId, tstrerror(code)); } return code; } - //static int64_t kBlockSize = 64 * 1024; //static int sendCheckpointToS3(char* id, SArray* fileList){ // code = s3PutObjectFromFile2(from->fname, object_name); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index b6c973f0d0..1a814eaa84 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -230,7 +230,7 @@ int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) { int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char *key) { SStreamTask* pTask = arg; - int64_t chkpId = pTask->checkpointingId; + int64_t chkpId = pTask->chkInfo.checkpointingId; taosThreadMutexLock(&pMeta->backendMutex); void** ppBackend = taosHashGet(pMeta->pTaskDbUnique, key, strlen(key)); @@ -442,7 +442,6 @@ void streamMetaClear(SStreamMeta* pMeta) { taosArrayClear(pMeta->chkpInUse); pMeta->numOfStreamTasks = 0; pMeta->numOfPausedTasks = 0; - pMeta->chkptNotReadyTasks = 0; streamMetaResetStartInfo(&pMeta->startInfo); } @@ -1078,9 +1077,9 @@ void metaHbToMnode(void* param, void* tmrId) { entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize); } - if ((*pTask)->checkpointingId != 0) { - entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->checkpointingId); - entry.activeCheckpointId = (*pTask)->checkpointingId; + if ((*pTask)->chkInfo.checkpointingId != 0) { + entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId); + entry.activeCheckpointId = (*pTask)->chkInfo.checkpointingId; } if ((*pTask)->exec.pWalReader != NULL) {