From 9649e87cabf416c2a151178607116351886e65ae Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 2 Nov 2023 09:44:43 +0800 Subject: [PATCH] fix: rsma checkpoint --- source/dnode/vnode/src/inc/sma.h | 2 +- source/dnode/vnode/src/sma/smaCommit.c | 2 -- source/dnode/vnode/src/sma/smaEnv.c | 8 +++++++- source/dnode/vnode/src/sma/smaRollup.c | 3 +-- source/libs/executor/src/executor.c | 17 +++-------------- source/libs/executor/src/scanoperator.c | 3 +-- 6 files changed, 13 insertions(+), 22 deletions(-) diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index a8807483f6..bce5e1b0b2 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -115,7 +115,7 @@ struct SRSmaStat { SRSmaFS fs; // for recovery/snapshot r/w SHashObj *infoHash; // key: suid, value: SRSmaInfo tsem_t notEmpty; // has items in queue buffer - SSDataBlock dataBlock; + SArray *blocks; // SArray }; struct SSmaStat { diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 5a6144b3fa..652aab3c01 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -178,8 +178,6 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) { if (!isCommit) goto _exit; - - code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)); TSDB_CHECK_CODE(code, lino, _exit); diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index 94ecb46473..d47398bdff 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -209,7 +209,12 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS pRSmaStat->pSma = (SSma *)pSma; atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INIT); tsem_init(&pRSmaStat->notEmpty, 0, 0); - pRSmaStat->dataBlock.info.type = STREAM_CHECKPOINT; + if (!(pRSmaStat->blocks = taosArrayInit(1, sizeof(SSDataBlock)))) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + SSDataBlock datablock = {.info.type = STREAM_CHECKPOINT}; + taosArrayPush(pRSmaStat->blocks, &datablock); // init smaMgmt smaInit(); @@ -291,6 +296,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) { // step 5: free pStat tsem_destroy(&(pStat->notEmpty)); + taosArrayDestroy(pStat->blocks); taosMemoryFreeClear(pStat); } } diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 88bdc2df1d..8e00297564 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -1152,7 +1152,6 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { int32_t nTaskInfo = 0; SSma *pSma = pRSmaStat->pSma; SVnode *pVnode = pSma->pVnode; - SSDataBlock *pDataBlock = &pRSmaStat->dataBlock; SArray *pResList = NULL; SRSmaFS fs = {0}; @@ -1169,7 +1168,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { } for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { if (pRSmaInfo->taskInfo[i]) { - code = qSetSMAInput(pRSmaInfo->taskInfo[i], pDataBlock, 1, STREAM_INPUT__CHECKPOINT); + code = qSetSMAInput(pRSmaInfo->taskInfo[i], pRSmaStat->blocks, 1, STREAM_INPUT__CHECKPOINT); TSDB_CHECK_CODE(code, lino, _exit); pRSmaInfo->items[i].streamFlushed = 0; ++nTaskInfo; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index c08a2d38f9..8117ceb55c 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -58,35 +58,24 @@ static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOf SStreamScanInfo* pInfo = pOperator->info; if (type == STREAM_INPUT__MERGED_SUBMIT) { - qInfo("%s:%d type:%d, pDataBlock->info.type(N/A)", __func__, __LINE__, type); for (int32_t i = 0; i < numOfBlocks; i++) { SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData)); taosArrayPush(pInfo->pBlockLists, pReq); } pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; } else if (type == STREAM_INPUT__DATA_SUBMIT) { - qInfo("%s:%d type:%d, pDataBlock->info.type(N/A)", __func__, __LINE__, type); taosArrayPush(pInfo->pBlockLists, &input); pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; } else if (type == STREAM_INPUT__DATA_BLOCK) { for (int32_t i = 0; i < numOfBlocks; ++i) { SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; - qInfo("%s:%d type:%d, pDataBlock->info.type:%d", __func__, __LINE__, type, pDataBlock->info.type); - SPackedData tmp = { - .pDataBlock = pDataBlock, - }; + SPackedData tmp = {.pDataBlock = pDataBlock}; taosArrayPush(pInfo->pBlockLists, &tmp); } pInfo->blockType = STREAM_INPUT__DATA_BLOCK; } else if (type == STREAM_INPUT__CHECKPOINT) { - for (int32_t i = 0; i < numOfBlocks; ++i) { - SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; - qInfo("%s:%d type:%d, pDataBlock->info.type:%d", __func__, __LINE__, type, pDataBlock->info.type); - SPackedData tmp = { - .pDataBlock = pDataBlock, - }; - taosArrayPush(pInfo->pBlockLists, &tmp); - } + SPackedData tmp = {.pDataBlock = input}; + taosArrayPush(pInfo->pBlockLists, &tmp); pInfo->blockType = STREAM_INPUT__CHECKPOINT; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b7071d3f52..247dde7fc3 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2335,8 +2335,7 @@ FETCH_NEXT_BLOCK: qDebug("process %d/%d input data blocks, %s", current, (int32_t)total, id); SPackedData* pData = taosArrayGet(pInfo->pBlockLists, current); - // SSDataBlock* pBlock = taosArrayGet(pData->pDataBlock, 0); - SSDataBlock* pBlock = pData->pDataBlock; + SSDataBlock* pBlock = taosArrayGet(pData->pDataBlock, 0); if (pBlock->info.type == STREAM_CHECKPOINT) { streamScanOperatorSaveCheckpoint(pInfo);