chore: more code for rsma checkpoint

This commit is contained in:
kailixu 2023-11-01 16:27:41 +08:00
parent 59be62c96e
commit c32e60d199
3 changed files with 10 additions and 4 deletions

View File

@ -159,6 +159,7 @@ struct SRSmaInfo {
void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t
STaosQueue *queue; // buffer queue of SubmitReq STaosQueue *queue; // buffer queue of SubmitReq
STaosQall *qall; // buffer qall of SubmitReq STaosQall *qall; // buffer qall of SubmitReq
SSDataBlock dataBlock;
}; };
#define RSMA_INFO_HEAD_LEN offsetof(SRSmaInfo, items) #define RSMA_INFO_HEAD_LEN offsetof(SRSmaInfo, items)

View File

@ -287,8 +287,8 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
} }
if (pStreamTask->chkInfo.checkpointId != -1) { if (pStreamTask->chkInfo.checkpointId != -1) {
SSDataBlock dataBlock = {.info.type = STREAM_CHECKPOINT}; SSDataBlock *pDataBlock = &pRSmaInfo->dataBlock;
if ((terrno = qSetSMAInput(pRSmaInfo->taskInfo[idx], &dataBlock, 1, STREAM_INPUT__CHECKPOINT)) < 0) { if ((terrno = qSetSMAInput(pRSmaInfo->taskInfo[idx], pDataBlock, 1, STREAM_INPUT__CHECKPOINT)) < 0) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
} }
@ -370,6 +370,7 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
pRSmaInfo->pSma = pSma; pRSmaInfo->pSma = pSma;
pRSmaInfo->pTSchema = pTSchema; pRSmaInfo->pTSchema = pTSchema;
pRSmaInfo->suid = suid; pRSmaInfo->suid = suid;
pRSmaInfo->dataBlock.info.type = STREAM_CHECKPOINT;
T_REF_INIT_VAL(pRSmaInfo, 1); T_REF_INIT_VAL(pRSmaInfo, 1);
if (!(pRSmaInfo->queue = taosOpenQueue()) || !(pRSmaInfo->qall = taosAllocateQall()) || if (!(pRSmaInfo->queue = taosOpenQueue()) || !(pRSmaInfo->qall = taosAllocateQall()) ||

View File

@ -58,17 +58,20 @@ static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOf
SStreamScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
if (type == STREAM_INPUT__MERGED_SUBMIT) { if (type == STREAM_INPUT__MERGED_SUBMIT) {
qInfo("%s:%d type:%d, pDataBlock->info.type(N/A)", __func__, __LINE__, type);
for (int32_t i = 0; i < numOfBlocks; i++) { for (int32_t i = 0; i < numOfBlocks; i++) {
SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData)); SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
taosArrayPush(pInfo->pBlockLists, pReq); taosArrayPush(pInfo->pBlockLists, pReq);
} }
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
} else if (type == STREAM_INPUT__DATA_SUBMIT) { } else if (type == STREAM_INPUT__DATA_SUBMIT) {
qInfo("%s:%d type:%d, pDataBlock->info.type(N/A)", __func__, __LINE__, type);
taosArrayPush(pInfo->pBlockLists, &input); taosArrayPush(pInfo->pBlockLists, &input);
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
} else if (type == STREAM_INPUT__DATA_BLOCK) { } else if (type == STREAM_INPUT__DATA_BLOCK) {
for (int32_t i = 0; i < numOfBlocks; ++i) { for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
qInfo("%s:%d type:%d, pDataBlock->info.type:%d", __func__, __LINE__, type, pDataBlock->info.type);
SPackedData tmp = { SPackedData tmp = {
.pDataBlock = pDataBlock, .pDataBlock = pDataBlock,
}; };
@ -78,8 +81,9 @@ static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOf
} else if (type == STREAM_INPUT__CHECKPOINT) { } else if (type == STREAM_INPUT__CHECKPOINT) {
for (int32_t i = 0; i < numOfBlocks; ++i) { for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
SPackedData tmp = { qInfo("%s:%d type:%d, pDataBlock->info.type:%d", __func__, __LINE__, type, pDataBlock->info.type);
.pDataBlock = pDataBlock, SPackedData tmp = {
.pDataBlock = pDataBlock,
}; };
taosArrayPush(pInfo->pBlockLists, &tmp); taosArrayPush(pInfo->pBlockLists, &tmp);
} }