fix: rsma checkpoint

This commit is contained in:
kailixu 2023-11-02 09:44:43 +08:00
parent bacf771ada
commit 9649e87cab
6 changed files with 13 additions and 22 deletions

View File

@ -115,7 +115,7 @@ struct SRSmaStat {
SRSmaFS fs; // for recovery/snapshot r/w SRSmaFS fs; // for recovery/snapshot r/w
SHashObj *infoHash; // key: suid, value: SRSmaInfo SHashObj *infoHash; // key: suid, value: SRSmaInfo
tsem_t notEmpty; // has items in queue buffer tsem_t notEmpty; // has items in queue buffer
SSDataBlock dataBlock; SArray *blocks; // SArray<SSDataBlock>
}; };
struct SSmaStat { struct SSmaStat {

View File

@ -178,8 +178,6 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
if (!isCommit) goto _exit; if (!isCommit) goto _exit;
code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)); code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat));
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);

View File

@ -209,7 +209,12 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
pRSmaStat->pSma = (SSma *)pSma; pRSmaStat->pSma = (SSma *)pSma;
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INIT); atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INIT);
tsem_init(&pRSmaStat->notEmpty, 0, 0); tsem_init(&pRSmaStat->notEmpty, 0, 0);
pRSmaStat->dataBlock.info.type = STREAM_CHECKPOINT; if (!(pRSmaStat->blocks = taosArrayInit(1, sizeof(SSDataBlock)))) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
SSDataBlock datablock = {.info.type = STREAM_CHECKPOINT};
taosArrayPush(pRSmaStat->blocks, &datablock);
// init smaMgmt // init smaMgmt
smaInit(); smaInit();
@ -291,6 +296,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
// step 5: free pStat // step 5: free pStat
tsem_destroy(&(pStat->notEmpty)); tsem_destroy(&(pStat->notEmpty));
taosArrayDestroy(pStat->blocks);
taosMemoryFreeClear(pStat); taosMemoryFreeClear(pStat);
} }
} }

View File

@ -1152,7 +1152,6 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
int32_t nTaskInfo = 0; int32_t nTaskInfo = 0;
SSma *pSma = pRSmaStat->pSma; SSma *pSma = pRSmaStat->pSma;
SVnode *pVnode = pSma->pVnode; SVnode *pVnode = pSma->pVnode;
SSDataBlock *pDataBlock = &pRSmaStat->dataBlock;
SArray *pResList = NULL; SArray *pResList = NULL;
SRSmaFS fs = {0}; SRSmaFS fs = {0};
@ -1169,7 +1168,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
} }
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pRSmaInfo->taskInfo[i]) { if (pRSmaInfo->taskInfo[i]) {
code = qSetSMAInput(pRSmaInfo->taskInfo[i], pDataBlock, 1, STREAM_INPUT__CHECKPOINT); code = qSetSMAInput(pRSmaInfo->taskInfo[i], pRSmaStat->blocks, 1, STREAM_INPUT__CHECKPOINT);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
pRSmaInfo->items[i].streamFlushed = 0; pRSmaInfo->items[i].streamFlushed = 0;
++nTaskInfo; ++nTaskInfo;

View File

@ -58,35 +58,24 @@ static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOf
SStreamScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
if (type == STREAM_INPUT__MERGED_SUBMIT) { if (type == STREAM_INPUT__MERGED_SUBMIT) {
qInfo("%s:%d type:%d, pDataBlock->info.type(N/A)", __func__, __LINE__, type);
for (int32_t i = 0; i < numOfBlocks; i++) { for (int32_t i = 0; i < numOfBlocks; i++) {
SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData)); SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
taosArrayPush(pInfo->pBlockLists, pReq); taosArrayPush(pInfo->pBlockLists, pReq);
} }
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
} else if (type == STREAM_INPUT__DATA_SUBMIT) { } else if (type == STREAM_INPUT__DATA_SUBMIT) {
qInfo("%s:%d type:%d, pDataBlock->info.type(N/A)", __func__, __LINE__, type);
taosArrayPush(pInfo->pBlockLists, &input); taosArrayPush(pInfo->pBlockLists, &input);
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
} else if (type == STREAM_INPUT__DATA_BLOCK) { } else if (type == STREAM_INPUT__DATA_BLOCK) {
for (int32_t i = 0; i < numOfBlocks; ++i) { for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
qInfo("%s:%d type:%d, pDataBlock->info.type:%d", __func__, __LINE__, type, pDataBlock->info.type); SPackedData tmp = {.pDataBlock = pDataBlock};
SPackedData tmp = {
.pDataBlock = pDataBlock,
};
taosArrayPush(pInfo->pBlockLists, &tmp); taosArrayPush(pInfo->pBlockLists, &tmp);
} }
pInfo->blockType = STREAM_INPUT__DATA_BLOCK; pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
} else if (type == STREAM_INPUT__CHECKPOINT) { } else if (type == STREAM_INPUT__CHECKPOINT) {
for (int32_t i = 0; i < numOfBlocks; ++i) { SPackedData tmp = {.pDataBlock = input};
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
qInfo("%s:%d type:%d, pDataBlock->info.type:%d", __func__, __LINE__, type, pDataBlock->info.type);
SPackedData tmp = {
.pDataBlock = pDataBlock,
};
taosArrayPush(pInfo->pBlockLists, &tmp); taosArrayPush(pInfo->pBlockLists, &tmp);
}
pInfo->blockType = STREAM_INPUT__CHECKPOINT; pInfo->blockType = STREAM_INPUT__CHECKPOINT;
} }

View File

@ -2335,8 +2335,7 @@ FETCH_NEXT_BLOCK:
qDebug("process %d/%d input data blocks, %s", current, (int32_t)total, id); qDebug("process %d/%d input data blocks, %s", current, (int32_t)total, id);
SPackedData* pData = taosArrayGet(pInfo->pBlockLists, current); SPackedData* pData = taosArrayGet(pInfo->pBlockLists, current);
// SSDataBlock* pBlock = taosArrayGet(pData->pDataBlock, 0); SSDataBlock* pBlock = taosArrayGet(pData->pDataBlock, 0);
SSDataBlock* pBlock = pData->pDataBlock;
if (pBlock->info.type == STREAM_CHECKPOINT) { if (pBlock->info.type == STREAM_CHECKPOINT) {
streamScanOperatorSaveCheckpoint(pInfo); streamScanOperatorSaveCheckpoint(pInfo);