From c32e60d199fa99cf97398dfa508975f24a09b294 Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 1 Nov 2023 16:27:41 +0800 Subject: [PATCH] chore: more code for rsma checkpoint --- source/dnode/vnode/src/inc/sma.h | 1 + source/dnode/vnode/src/sma/smaRollup.c | 5 +++-- source/libs/executor/src/executor.c | 8 ++++++-- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 5dd7df0962..48e9aed6c2 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -159,6 +159,7 @@ struct SRSmaInfo { void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t STaosQueue *queue; // buffer queue of SubmitReq STaosQall *qall; // buffer qall of SubmitReq + SSDataBlock dataBlock; }; #define RSMA_INFO_HEAD_LEN offsetof(SRSmaInfo, items) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 7c7f9fad25..303c222ae5 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -287,8 +287,8 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat } if (pStreamTask->chkInfo.checkpointId != -1) { - SSDataBlock dataBlock = {.info.type = STREAM_CHECKPOINT}; - if ((terrno = qSetSMAInput(pRSmaInfo->taskInfo[idx], &dataBlock, 1, STREAM_INPUT__CHECKPOINT)) < 0) { + SSDataBlock *pDataBlock = &pRSmaInfo->dataBlock; + if ((terrno = qSetSMAInput(pRSmaInfo->taskInfo[idx], pDataBlock, 1, STREAM_INPUT__CHECKPOINT)) < 0) { return TSDB_CODE_FAILED; } } @@ -370,6 +370,7 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con pRSmaInfo->pSma = pSma; pRSmaInfo->pTSchema = pTSchema; pRSmaInfo->suid = suid; + pRSmaInfo->dataBlock.info.type = STREAM_CHECKPOINT; T_REF_INIT_VAL(pRSmaInfo, 1); if (!(pRSmaInfo->queue = taosOpenQueue()) || !(pRSmaInfo->qall = taosAllocateQall()) || diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 2eac04db88..b46ae9e1c0 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -58,17 +58,20 @@ static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOf SStreamScanInfo* pInfo = pOperator->info; if (type == STREAM_INPUT__MERGED_SUBMIT) { + qInfo("%s:%d type:%d, pDataBlock->info.type(N/A)", __func__, __LINE__, type); for (int32_t i = 0; i < numOfBlocks; i++) { SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData)); taosArrayPush(pInfo->pBlockLists, pReq); } pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; } else if (type == STREAM_INPUT__DATA_SUBMIT) { + qInfo("%s:%d type:%d, pDataBlock->info.type(N/A)", __func__, __LINE__, type); taosArrayPush(pInfo->pBlockLists, &input); pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; } else if (type == STREAM_INPUT__DATA_BLOCK) { for (int32_t i = 0; i < numOfBlocks; ++i) { SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; + qInfo("%s:%d type:%d, pDataBlock->info.type:%d", __func__, __LINE__, type, pDataBlock->info.type); SPackedData tmp = { .pDataBlock = pDataBlock, }; @@ -78,8 +81,9 @@ static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOf } else if (type == STREAM_INPUT__CHECKPOINT) { for (int32_t i = 0; i < numOfBlocks; ++i) { SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; - SPackedData tmp = { - .pDataBlock = pDataBlock, + qInfo("%s:%d type:%d, pDataBlock->info.type:%d", __func__, __LINE__, type, pDataBlock->info.type); + SPackedData tmp = { + .pDataBlock = pDataBlock, }; taosArrayPush(pInfo->pBlockLists, &tmp); }