From 87bedef2cd8b12dc501bd42e54f68d63f791536b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 10 Jul 2023 19:20:05 +0800 Subject: [PATCH] fix(stream): fix error in checkpointing. --- include/libs/stream/tstream.h | 3 +- source/dnode/vnode/src/tq/tq.c | 32 ++++++++----- source/libs/stream/inc/streamInt.h | 2 +- source/libs/stream/src/streamDispatch.c | 2 +- source/libs/stream/src/streamExec.c | 63 +++++++++++++------------ source/libs/stream/src/streamTask.c | 4 +- 6 files changed, 59 insertions(+), 47 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e90cdb8473..4581d48d1a 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -258,7 +258,7 @@ typedef struct SStreamId { } SStreamId; typedef struct SCheckpointInfo { - int64_t id; + int64_t keptCheckpointId; int64_t version; // offset in WAL int64_t currentVer; // current offset in WAL, not serialize it } SCheckpointInfo; @@ -311,7 +311,6 @@ struct SStreamTask { SStreamId historyTaskId; SStreamId streamTaskId; SArray* pUpstreamEpInfoList; // SArray, // children info - SArray* checkpointInfo; // SArray SArray* pRpcMsgList; // SArray // output union { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 97b0d2c5de..7f1f4f1538 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -777,11 +777,13 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { return -1; } - SReadHandle handle = {.vnode = pTq->pVnode, - .initTqReader = 1, - .pStateBackend = pTask->pState, - .fillHistory = pTask->info.fillHistory, - .winRange = pTask->dataRange.window}; + SReadHandle handle = { + .vnode = pTq->pVnode, + .initTqReader = 1, + .pStateBackend = pTask->pState, + .fillHistory = pTask->info.fillHistory, + .winRange = pTask->dataRange.window, + }; initStorageAPI(&handle.api); pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId); @@ -804,13 +806,15 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { } int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->pUpstreamEpInfoList); - SReadHandle handle = {.vnode = NULL, - .numOfVgroups = numOfVgroups, - .pStateBackend = pTask->pState, - .fillHistory = pTask->info.fillHistory, - .winRange = pTask->dataRange.window}; - initStorageAPI(&handle.api); + SReadHandle handle = { + .vnode = NULL, + .numOfVgroups = numOfVgroups, + .pStateBackend = pTask->pState, + .fillHistory = pTask->info.fillHistory, + .winRange = pTask->dataRange.window, + }; + initStorageAPI(&handle.api); pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId); if (pTask->exec.pExecutor == NULL) { return -1; @@ -819,6 +823,12 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); } + if (pTask->status.taskStatus != TASK_STATUS__SCAN_HISTORY) { + tqInfo("s-task:%s status is set to %s prev in meta:%s", pTask->id.idStr, + streamGetTaskStatusStr(TASK_STATUS__SCAN_HISTORY), streamGetTaskStatusStr(pTask->status.taskStatus)); + pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY; + } + pTask->pRpcMsgList = taosArrayInit(4, sizeof(SRpcMsg)); // sink diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 5de5bea7fd..e2632ee25a 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -51,7 +51,7 @@ int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* p int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData); int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamCheckpointReq* pReq, int32_t nodeId, SEpSet* pEpSet); -int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask, int32_t vgId); +int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask); int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask); int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, const char* id); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index e8220a787d..b2e2bfbda8 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -475,7 +475,7 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) { } // this function is usually invoked by sink/agg task -int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask, int32_t vgId) { +int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask) { int32_t num = taosArrayGetSize(pTask->pRpcMsgList); ASSERT(taosArrayGetSize(pTask->pUpstreamEpInfoList) == num); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 5e5eecc392..7044eb3b2a 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -18,7 +18,7 @@ // maximum allowed processed block batches. One block may include several submit blocks #define MAX_STREAM_RESULT_DUMP_THRESHOLD 100 -static int32_t updateCheckPointInfo(SStreamTask* pTask); +static int32_t updateCheckPointInfo(SStreamTask* pTask, int64_t checkpointId); bool streamTaskShouldStop(const SStreamStatus* pStatus) { int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus); @@ -32,12 +32,6 @@ bool streamTaskShouldPause(const SStreamStatus* pStatus) { static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize, int32_t* totalBlocks) { - int32_t code = updateCheckPointInfo(pTask); - if (code != TSDB_CODE_SUCCESS) { - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - return code; - } - int32_t numOfBlocks = taosArrayGetSize(pRes); if (numOfBlocks > 0) { SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(pItem, pTask, size, pRes); @@ -50,7 +44,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks, size / 1048576.0); - code = streamTaskOutputResultBlock(pTask, pStreamBlocks); + int32_t code = streamTaskOutputResultBlock(pTask, pStreamBlocks); if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position destroyStreamDataBlock(pStreamBlocks); return -1; @@ -301,31 +295,17 @@ int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) { } #endif -int32_t updateCheckPointInfo(SStreamTask* pTask) { +int32_t updateCheckPointInfo(SStreamTask* pTask, int64_t checkpointId) { int64_t ckId = 0; int64_t dataVer = 0; qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId); SCheckpointInfo* pCkInfo = &pTask->chkInfo; - if (ckId > pCkInfo->id) { // save it since the checkpoint is updated - qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64 - ", checkPoint id:%" PRId64 " -> %" PRId64, - pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->id, ckId); - - pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pCkInfo->currentVer}; - - taosWLockLatch(&pTask->pMeta->lock); - - streamMetaSaveTask(pTask->pMeta, pTask); - if (streamMetaCommit(pTask->pMeta) < 0) { - taosWUnLockLatch(&pTask->pMeta->lock); - qError("s-task:%s failed to commit stream meta, since %s", pTask->id.idStr, terrstr()); - return -1; - } else { - taosWUnLockLatch(&pTask->pMeta->lock); - qDebug("s-task:%s update checkpoint ver succeed", pTask->id.idStr); - } - } + qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64 ", checkpointId:%" PRId64 + " -> %" PRId64, + pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->keptCheckpointId, checkpointId); + pCkInfo->keptCheckpointId = checkpointId; + pCkInfo->version = dataVer; return TSDB_CODE_SUCCESS; } @@ -544,15 +524,38 @@ int32_t streamTryExec(SStreamTask* pTask) { if (remain == 0) { // all tasks are in TASK_STATUS__CK_READY state streamBackendDoCheckpoint(pMeta, pTask->checkpointingId); + qDebug("vgId:%d do vnode wide checkpoint completed, checkpoint id:%"PRId64, pMeta->vgId); + } + + code = updateCheckPointInfo(pTask, pTask->checkpointingId); + if (code != TSDB_CODE_SUCCESS) { + return code; } // send check point response to upstream task if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - streamTaskSendCheckpointSourceRsp(pTask); + code = streamTaskSendCheckpointSourceRsp(pTask); } else { - streamTaskSendCheckpointRsp(pTask, pMeta->vgId); + code = streamTaskSendCheckpointRsp(pTask); } + if (code == TSDB_CODE_SUCCESS) { + taosWLockLatch(&pTask->pMeta->lock); + + streamMetaSaveTask(pTask->pMeta, pTask); + if (streamMetaCommit(pTask->pMeta) < 0) { + taosWUnLockLatch(&pTask->pMeta->lock); + qError("s-task:%s failed to commit stream meta, since %s", pTask->id.idStr, terrstr()); + return -1; + } else { + taosWUnLockLatch(&pTask->pMeta->lock); + qDebug("s-task:%s commit after checkpoint generating", pTask->id.idStr); + } + + qInfo("vgId:%d s-task:%s commit task status after checkpoint completed", pMeta->vgId, pTask->id.idStr); + } else { + // todo: let's retry send rsp to upstream/mnode + } } else { if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) && (!streamTaskShouldPause(&pTask->status))) { diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 1684227a81..30d9b650a1 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -85,7 +85,7 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tEncodeI32(pEncoder, pTask->info.nodeId) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pTask->info.epSet) < 0) return -1; - if (tEncodeI64(pEncoder, pTask->chkInfo.id) < 0) return -1; + if (tEncodeI64(pEncoder, pTask->chkInfo.keptCheckpointId) < 0) return -1; if (tEncodeI64(pEncoder, pTask->chkInfo.version) < 0) return -1; if (tEncodeI8(pEncoder, pTask->info.fillHistory) < 0) return -1; @@ -148,7 +148,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeI32(pDecoder, &pTask->info.nodeId) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->info.epSet) < 0) return -1; - if (tDecodeI64(pDecoder, &pTask->chkInfo.id) < 0) return -1; + if (tDecodeI64(pDecoder, &pTask->chkInfo.keptCheckpointId) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->chkInfo.version) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->info.fillHistory) < 0) return -1;