From 3803f952f9d763f8b27b0e706fe39f9ad3306ebf Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 1 Nov 2023 20:05:00 +0800 Subject: [PATCH] chore: rsma checkpoint verify --- source/dnode/vnode/src/inc/sma.h | 5 +- source/dnode/vnode/src/sma/smaCommit.c | 2 + source/dnode/vnode/src/sma/smaEnv.c | 1 + source/dnode/vnode/src/sma/smaRollup.c | 181 +++++++++++++++++++----- source/libs/executor/src/executor.c | 2 +- source/libs/executor/src/scanoperator.c | 7 +- 6 files changed, 156 insertions(+), 42 deletions(-) diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 48e9aed6c2..a8807483f6 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -115,6 +115,7 @@ struct SRSmaStat { SRSmaFS fs; // for recovery/snapshot r/w SHashObj *infoHash; // key: suid, value: SRSmaInfo tsem_t notEmpty; // has items in queue buffer + SSDataBlock dataBlock; }; struct SSmaStat { @@ -140,7 +141,8 @@ struct SRSmaInfoItem { int8_t fetchLevel : 4; int8_t triggerStat; uint16_t nScanned; - int32_t maxDelay; // ms + int32_t streamFlushed : 1; + int32_t maxDelay : 31; // ms tmr_h tmrId; void *pStreamState; void *pStreamTask; // SStreamTask @@ -159,7 +161,6 @@ struct SRSmaInfo { void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t STaosQueue *queue; // buffer queue of SubmitReq STaosQall *qall; // buffer qall of SubmitReq - SSDataBlock dataBlock; }; #define RSMA_INFO_HEAD_LEN offsetof(SRSmaInfo, items) diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 652aab3c01..5a6144b3fa 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -178,6 +178,8 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) { if (!isCommit) goto _exit; + + code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)); TSDB_CHECK_CODE(code, lino, _exit); diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index 04a254fc7a..94ecb46473 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -209,6 +209,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS pRSmaStat->pSma = (SSma *)pSma; atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INIT); tsem_init(&pRSmaStat->notEmpty, 0, 0); + pRSmaStat->dataBlock.info.type = STREAM_CHECKPOINT; // init smaMgmt smaInit(); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 303c222ae5..88bdc2df1d 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -78,32 +78,26 @@ static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t l */ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) { if (pInfo) { - for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { - SRSmaInfoItem *pItem = &pInfo->items[i]; + if (isDeepFree) { + for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { + SRSmaInfoItem *pItem = &pInfo->items[i]; - if (isDeepFree && pItem->tmrId) { - smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pItem->tmrId, - pInfo->suid, i + 1); - taosTmrStopA(&pItem->tmrId); - } + if (pItem->tmrId) { + smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pItem->tmrId, + pInfo->suid, i + 1); + taosTmrStopA(&pItem->tmrId); + } - if (isDeepFree && pItem->pStreamState) { - streamStateClose(pItem->pStreamState, false); - } + if (pItem->pStreamState) { + streamStateClose(pItem->pStreamState, false); + } - if(isDeepFree && pItem->pStreamTask) { taosMemoryFreeClear(pItem->pStreamTask); - } - - if (isDeepFree && pInfo->taskInfo[i]) { tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1); } - } - if (isDeepFree) { - taosMemoryFreeClear(pInfo->pTSchema); - } - if (isDeepFree) { + taosMemoryFreeClear(pInfo->pTSchema); + if (pInfo->queue) { taosCloseQueue(pInfo->queue); pInfo->queue = NULL; @@ -286,13 +280,6 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat return TSDB_CODE_FAILED; } - if (pStreamTask->chkInfo.checkpointId != -1) { - SSDataBlock *pDataBlock = &pRSmaInfo->dataBlock; - if ((terrno = qSetSMAInput(pRSmaInfo->taskInfo[idx], pDataBlock, 1, STREAM_INPUT__CHECKPOINT)) < 0) { - return TSDB_CODE_FAILED; - } - } - SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]); pItem->triggerStat = TASK_TRIGGER_STAT_ACTIVE; // fetch the data when reboot pItem->pStreamState = pStreamState; @@ -370,7 +357,6 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con pRSmaInfo->pSma = pSma; pRSmaInfo->pTSchema = pTSchema; pRSmaInfo->suid = suid; - pRSmaInfo->dataBlock.info.type = STREAM_CHECKPOINT; T_REF_INIT_VAL(pRSmaInfo, 1); if (!(pRSmaInfo->queue = taosOpenQueue()) || !(pRSmaInfo->qall = taosAllocateQall()) || @@ -1075,22 +1061,145 @@ _err: return code; } -#if 1 + +static int32_t tdRSmaExecVerifyCheckPoint(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, + int64_t suid, SArray **ppResList, int8_t *streamFlushed) { + int32_t code = 0; + int32_t lino = 0; + SSDataBlock *output = NULL; + SArray *pResList = NULL; + + if (!(*ppResList)) { + pResList = taosArrayInit(1, POINTER_BYTES); + if (pResList == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + *ppResList = pResList; + } else { + pResList = *ppResList; + } + + while (1) { + uint64_t ts; + bool hasMore = false; + code = qExecTaskOpt(taskInfo, pResList, &ts, &hasMore, NULL); + if (code == TSDB_CODE_QRY_IN_EXEC) { + code = 0; + break; + } + TSDB_CHECK_CODE(code, lino, _exit); + + if (taosArrayGetSize(pResList) == 0) { + break; + } +#if 0 + char flag[10] = {0}; + snprintf(flag, 10, "level %" PRIi8, pItem->level); + blockDebugShowDataBlocks(pResList, flag); +#endif + for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) { + output = taosArrayGetP(pResList, i); + if(output->info.type == STREAM_CHECKPOINT) { + if (streamFlushed) *streamFlushed = 1; + continue; + } + smaDebug("vgId:%d, result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%" PRIi64, SMA_VID(pSma), + output->info.id.uid, output->info.id.groupId, output->info.rows); + + STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); + SSubmitReq2 *pReq = NULL; + + // TODO: the schema update should be handled later(TD-17965) + if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, output->info.id.groupId, SMA_VID(pSma), suid) < 0) { + code = terrno ? terrno : TSDB_CODE_RSMA_RESULT; + TSDB_CHECK_CODE(code, lino, _exit); + } + + if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) { + code = terrno ? terrno : TSDB_CODE_RSMA_RESULT; + tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE); + taosMemoryFree(pReq); + TSDB_CHECK_CODE(code, lino, _exit); + } + + smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level %" PRIi8 " ver %" PRIi64, + SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, output->info.version); + + if (pReq) { + tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE); + taosMemoryFree(pReq); + } + } + } +_exit: + if (code) { + smaError("vgId:%d, %s failed at line %d since %s, suid:%" PRIi64 ", level:%" PRIi8 ", uid:%" PRIi64 + ", ver:%" PRIi64, + SMA_VID(pSma), __func__, lino, tstrerror(code), suid, pItem->level, output ? output->info.id.uid : -1, + output ? output->info.version : -1); + } else { + smaDebug("vgId:%d, %s succeed, suid:%" PRIi64 ", level:%" PRIi8, SMA_VID(pSma), __func__, suid, pItem->level); + } + taosArrayDestroy(pResList); + qCleanExecTaskBlockBuf(taskInfo); + return code; +} + int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { - int32_t code = 0; - int32_t lino = 0; - SSma *pSma = pRSmaStat->pSma; - SVnode *pVnode = pSma->pVnode; - SRSmaFS fs = {0}; + int32_t code = 0; + int32_t lino = 0; + int32_t nTaskInfo = 0; + SSma *pSma = pRSmaStat->pSma; + SVnode *pVnode = pSma->pVnode; + SSDataBlock *pDataBlock = &pRSmaStat->dataBlock; + SArray *pResList = NULL; + SRSmaFS fs = {0}; if (taosHashGetSize(pInfoHash) <= 0) { return TSDB_CODE_SUCCESS; } void *infoHash = NULL; + // stream state: trigger checkpoint while ((infoHash = taosHashIterate(pInfoHash, infoHash))) { SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash; + if (RSMA_INFO_IS_DEL(pRSmaInfo)) { + continue; + } + for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { + if (pRSmaInfo->taskInfo[i]) { + code = qSetSMAInput(pRSmaInfo->taskInfo[i], pDataBlock, 1, STREAM_INPUT__CHECKPOINT); + TSDB_CHECK_CODE(code, lino, _exit); + pRSmaInfo->items[i].streamFlushed = 0; + ++nTaskInfo; + } + } + } + // stream state: process checkpoint response in async mode + int32_t nStreamFlushed = 0; + while ((infoHash = taosHashIterate(pInfoHash, infoHash))) { + SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash; + if (RSMA_INFO_IS_DEL(pRSmaInfo)) { + continue; + } + for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { + if (pRSmaInfo->taskInfo[i] && (0 == pRSmaInfo->items[i].streamFlushed)) { + int8_t streamFlushed = 0; + code = tdRSmaExecVerifyCheckPoint(pSma, pRSmaInfo->taskInfo[i], &pRSmaInfo->items[i], pRSmaInfo->pTSchema, + pRSmaInfo->suid, &pResList, &streamFlushed); + TSDB_CHECK_CODE(code, lino, _exit); + if (streamFlushed && (++nStreamFlushed >= nTaskInfo)) { + goto _checkpoint; + } + } + } + } + // stream state: build checkpoint in backend +_checkpoint: + while ((infoHash = taosHashIterate(pInfoHash, infoHash))) { + SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash; if (RSMA_INFO_IS_DEL(pRSmaInfo)) { continue; } @@ -1100,11 +1209,11 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { if (pItem && pItem->pStreamTask) { SStreamTask *pTask = pItem->pStreamTask; // adaption for API streamTaskBuildCheckpoint - atomic_store_32(&pTask->pMeta->chkptNotReadyTasks, 1); + atomic_store_32(&pTask->pMeta->chkptNotReadyTasks, 1); pTask->checkpointingId = taosGetTimestampNs(); code = streamTaskBuildCheckpoint(pTask); TSDB_CHECK_CODE(code, lino, _exit); - + // save checkpointId to vnode.json (pVnode->config.tsdbCfg.retentions + i + 1)->checkpointId = pTask->checkpointingId; @@ -1123,7 +1232,7 @@ _exit: terrno = code; return code; } -#endif + /** * @brief trigger to get rsma result in async mode * diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index b46ae9e1c0..c08a2d38f9 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -646,7 +646,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo blockIndex += 1; current += p->info.rows; - ASSERT(p->info.rows > 0); + ASSERT(p->info.rows > 0 || p->info.type == STREAM_CHECKPOINT); taosArrayPush(pResList, &p); if (current >= rowsThreshold) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index efbc978323..b7071d3f52 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2331,11 +2331,12 @@ FETCH_NEXT_BLOCK: return NULL; } - int32_t current = pInfo->validBlockIndex++; - qDebug("process %d/%d input data blocks, %s", current, (int32_t) total, id); + int32_t current = pInfo->validBlockIndex++; + qDebug("process %d/%d input data blocks, %s", current, (int32_t)total, id); SPackedData* pData = taosArrayGet(pInfo->pBlockLists, current); - SSDataBlock* pBlock = taosArrayGet(pData->pDataBlock, 0); + // SSDataBlock* pBlock = taosArrayGet(pData->pDataBlock, 0); + SSDataBlock* pBlock = pData->pDataBlock; if (pBlock->info.type == STREAM_CHECKPOINT) { streamScanOperatorSaveCheckpoint(pInfo);