diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 22e9356b3d..7dcb2e1796 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -287,19 +287,24 @@ typedef struct SStreamId { const char* idStr; } SStreamId; +typedef struct SCheckpointInfo { + int64_t id; + int64_t version; // offset in WAL +} SCheckpointInfo; + struct SStreamTask { - SStreamId id; - int32_t totalLevel; - int8_t taskLevel; - int8_t outputType; - int16_t dispatchMsgType; - int8_t taskStatus; - int8_t schedStatus; - int32_t selfChildId; - int32_t nodeId; - SEpSet epSet; - int64_t recoverSnapVer; - int64_t startVer; + SStreamId id; + int32_t totalLevel; + int8_t taskLevel; + int8_t outputType; + int16_t dispatchMsgType; + int8_t taskStatus; + int8_t schedStatus; + int32_t selfChildId; + int32_t nodeId; + SEpSet epSet; + SCheckpointInfo chkInfo; + STaskExec exec; // fill history int8_t fillHistory; @@ -309,9 +314,6 @@ struct SStreamTask { int32_t nextCheckId; SArray* checkpointInfo; // SArray - // exec - STaskExec exec; - // output union { STaskDispatcherFixedEp fixedEpDispatcher; @@ -587,7 +589,7 @@ void streamMetaClose(SStreamMeta* streamMeta); int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask); -int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen); +int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t checkpointVer, char* msg, int32_t msgLen); // SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId); SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 447c90eb58..ee6f649d52 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -76,7 +76,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; pTask->pMsgCb = &pSnode->msgCb; - pTask->startVer = ver; + pTask->chkInfo.version = ver; pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1); if (pTask->pState == NULL) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 91e2569a8c..afdbf4e7c8 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -906,7 +906,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; pTask->pMsgCb = &pTq->pVnode->msgCb; - pTask->startVer = ver; + pTask->chkInfo.version = ver; pTask->pMeta = pTq->pStreamMeta; // expand executor @@ -981,7 +981,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { streamSetupTrigger(pTask); tqInfo("vgId:%d expand stream task, s-task:%s, ver:%" PRId64 " child id:%d, level:%d", vgId, pTask->id.idStr, - pTask->startVer, pTask->selfChildId, pTask->taskLevel); + pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel); return 0; } @@ -1124,7 +1124,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { } // check param - int64_t fillVer1 = pTask->startVer; + int64_t fillVer1 = pTask->chkInfo.version; if (fillVer1 <= 0) { streamMetaReleaseTask(pTq->pStreamMeta, pTask); return -1; diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 14054ad998..5f4e0ced11 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -96,7 +96,7 @@ void initOffsetForAllRestoreTasks(STQ* pTq) { STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key); if (pOffset == NULL) { - doSaveTaskOffset(pTq->pOffsetStore, key, pTask->startVer); + doSaveTaskOffset(pTq->pOffsetStore, key, pTask->chkInfo.version); } } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index b80c952ee0..61f7747ae4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -2088,7 +2088,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, pBlockScanInfo->lastKey = tsLastBlock; return TSDB_CODE_SUCCESS; } else { - int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema); + code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2112,7 +2112,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, } } } else { // not merge block data - int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema); + code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2352,7 +2352,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* tsdbRowMergerAdd(&merge, piRow, piSchema); } else { init = true; - STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); + pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); code = tsdbRowMergerInit(&merge, pSchema, piRow, piSchema); if (code != TSDB_CODE_SUCCESS) { return code; @@ -2575,7 +2575,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc SRow* pTSRow = NULL; SRowMerger merge = {0}; - int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema); + code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3242,8 +3242,8 @@ static int32_t readRowsCountFromFiles(STsdbReader* pReader) { int32_t code = TSDB_CODE_SUCCESS; while (1) { - bool hasNext = false; - int32_t code = filesetIteratorNext(&pReader->status.fileIter, pReader, &hasNext); + bool hasNext = false; + code = filesetIteratorNext(&pReader->status.fileIter, pReader, &hasNext); if (code) { return code; } @@ -3515,8 +3515,8 @@ SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_ int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion; int64_t endVer = 0; - if (pCond->endVersion == - -1) { // user not specified end version, set current maximum version of vnode as the endVersion + if (pCond->endVersion == -1) { + // user not specified end version, set current maximum version of vnode as the endVersion endVer = pVnode->state.applied; } else { endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 8dafafcc5f..98052ec6ba 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -288,9 +288,9 @@ int32_t streamExecForAll(SStreamTask* pTask) { int64_t ckId = 0; int64_t dataVer = 0; qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId); - if (dataVer > pTask->startVer) { // save it since the checkpoint is updated - qDebug("s-task:%s exec end, checkpoint ver from %"PRId64" to %"PRId64, pTask->id.idStr, pTask->startVer, dataVer); - pTask->startVer = dataVer; + if (dataVer > pTask->chkInfo.version) { // save it since the checkpoint is updated + qDebug("s-task:%s exec end, checkpoint ver from %"PRId64" to %"PRId64, pTask->id.idStr, pTask->chkInfo.version, dataVer); + pTask->chkInfo = (SCheckpointInfo) {.version = dataVer, .id = ckId}; streamMetaSaveTask(pTask->pMeta, pTask); if (streamMetaCommit(pTask->pMeta) < 0) { diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index c1f50178dd..b1f0a63c2e 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -70,8 +70,8 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1; - if (tEncodeI64(pEncoder, pTask->recoverSnapVer) < 0) return -1; - if (tEncodeI64(pEncoder, pTask->startVer) < 0) return -1; + if (tEncodeI64(pEncoder, pTask->chkInfo.id) < 0) return -1; + if (tEncodeI64(pEncoder, pTask->chkInfo.version) < 0) return -1; if (tEncodeI8(pEncoder, pTask->fillHistory) < 0) return -1; int32_t epSz = taosArrayGetSize(pTask->childEpInfo); @@ -123,8 +123,8 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1; - if (tDecodeI64(pDecoder, &pTask->recoverSnapVer) < 0) return -1; - if (tDecodeI64(pDecoder, &pTask->startVer) < 0) return -1; + if (tDecodeI64(pDecoder, &pTask->chkInfo.id) < 0) return -1; + if (tDecodeI64(pDecoder, &pTask->chkInfo.version) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->fillHistory) < 0) return -1; int32_t epSz;