From 2d597659bc2e4eaa55e4ba6a8bd3f7aaaff61af3 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 2 Nov 2023 15:20:54 +0800 Subject: [PATCH] enh: rsma checkpoint --- source/dnode/vnode/src/sma/smaRollup.c | 69 ++++++++++++------------ source/dnode/vnode/src/vnd/vnodeCommit.c | 4 +- source/libs/executor/src/executil.c | 10 ---- source/libs/executor/src/scanoperator.c | 4 +- 4 files changed, 39 insertions(+), 48 deletions(-) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 986d0da677..f35eec786c 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -630,7 +630,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma #endif for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) { output = taosArrayGetP(pResList, i); - if(output->info.type == STREAM_CHECKPOINT) { + if (output->info.type == STREAM_CHECKPOINT) { if (streamFlushed) *streamFlushed = 1; continue; } @@ -1076,18 +1076,17 @@ _err: } int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { - int32_t code = 0; - int32_t lino = 0; - int32_t nTaskInfo = 0; - SSma *pSma = pRSmaStat->pSma; - SVnode *pVnode = pSma->pVnode; - SArray *pResList = NULL; - SRSmaFS fs = {0}; + int32_t code = 0; + int32_t lino = 0; + int32_t nTaskInfo = 0; + SSma *pSma = pRSmaStat->pSma; + SVnode *pVnode = pSma->pVnode; + SArray *pResList = NULL; + SRSmaFS fs = {0}; if (taosHashGetSize(pInfoHash) <= 0) { return TSDB_CODE_SUCCESS; } - void *infoHash = NULL; // stream state: trigger checkpoint while ((infoHash = taosHashIterate(pInfoHash, infoHash))) { @@ -1120,7 +1119,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { pRSmaInfo->suid, &pResList, &streamFlushed); TSDB_CHECK_CODE(code, lino, _exit); if (streamFlushed && (++nStreamFlushed >= nTaskInfo)) { - smaInfo("%s:%d checkpoint ready, %d us consumed, received/total: %d/%d", __func__, __LINE__, nSleep * 10, + smaInfo("vgId:%d checkpoint ready, %d us consumed, received/total: %d/%d", TD_VID(pVnode), nSleep * 10, nStreamFlushed, nTaskInfo); goto _checkpoint; } @@ -1129,38 +1128,40 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { } taosUsleep(10); ++nSleep; - smaInfo("%s:%d wait for checkpoint ready, %d us elapsed, received/total: %d/%d", __func__, __LINE__, nSleep * 10, - nStreamFlushed, nTaskInfo); + smaDebug("vgId:%d, wait for checkpoint ready, %d us elapsed, received/total: %d/%d", TD_VID(pVnode), nSleep * 10, + nStreamFlushed, nTaskInfo); } - // stream state: build checkpoint in backend + _checkpoint: - while ((infoHash = taosHashIterate(pInfoHash, infoHash))) { - SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash; - if (RSMA_INFO_IS_DEL(pRSmaInfo)) { - continue; - } + do { + void *infHash = NULL; + while ((infHash = taosHashIterate(pInfoHash, infHash))) { + SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infHash; + if (RSMA_INFO_IS_DEL(pRSmaInfo)) { + continue; + } - for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { - SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, i); - if (pItem && pItem->pStreamTask) { - SStreamTask *pTask = pItem->pStreamTask; - atomic_store_32(&pTask->pMeta->chkptNotReadyTasks, 1); - pTask->checkpointingId = taosGetTimestampNs(); - pTask->chkInfo.checkpointId = pTask->checkpointingId; - code = streamTaskBuildCheckpoint(pTask); - TSDB_CHECK_CODE(code, lino, _exit); + for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { + SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, i); + if (pItem && pItem->pStreamTask) { + SStreamTask *pTask = pItem->pStreamTask; + atomic_store_32(&pTask->pMeta->chkptNotReadyTasks, 1); + pTask->checkpointingId = taosGetTimestampNs(); + pTask->chkInfo.checkpointId = pTask->checkpointingId; + code = streamTaskBuildCheckpoint(pTask); + TSDB_CHECK_CODE(code, lino, _exit); - // save checkpointId to vnode.json - (pVnode->config.tsdbCfg.retentions + i + 1)->checkpointId = pTask->checkpointingId; + // save checkpointId to vnode.json + (pVnode->config.tsdbCfg.retentions + i + 1)->checkpointId = pTask->checkpointingId; - smaInfo("vgId:%d, commit task:%p, build stream checkpoint success, table:%" PRIi64 - ", level:%d, checkpointId:%" PRIi64, - TD_VID(pVnode), pTask, pRSmaInfo->suid, i + 1, pTask->checkpointingId); + smaInfo("vgId:%d, commit task:%p, build stream checkpoint success, table:%" PRIi64 + ", level:%d, checkpointId:%" PRIi64, + TD_VID(pVnode), pTask, pRSmaInfo->suid, i + 1, pTask->checkpointingId); + } } } - } - + } while (0); _exit: taosArrayDestroy(pResList); if (code) { diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index ca4335f391..9e0106dff4 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -290,8 +290,8 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { tsem_wait(&pVnode->canCommit); if(syncNodeGetConfig(pVnode->sync, &pVnode->config.syncCfg) != 0) goto _exit; - - code = smaPrepareAsyncCommit(pVnode->pSma); + + code = smaPrepareAsyncCommit(pVnode->pSma); // prepare checkpointId and save to vnode.json if (code) goto _exit; pVnode->state.commitTerm = pVnode->state.applyTerm; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index a1bd5a7483..753d3e680c 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1967,19 +1967,9 @@ int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); } - for(int32_t i=0; i< taosArrayGetSize(pTableList->pTableList); ++i) { - STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, i); - if(pKeyInfo->uid == uid) { - assert(0); - } - } - STableKeyInfo keyInfo = {.uid = uid, .groupId = gid}; taosArrayPush(pTableList->pTableList, &keyInfo); - if(taosHashGet(pTableList->map, &uid, sizeof(uid))) { - assert(0); - } int32_t slot = (int32_t)taosArrayGetSize(pTableList->pTableList) - 1; taosHashPut(pTableList->map, &uid, sizeof(uid), &slot, sizeof(slot)); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 247dde7fc3..efbc978323 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2331,8 +2331,8 @@ FETCH_NEXT_BLOCK: return NULL; } - int32_t current = pInfo->validBlockIndex++; - qDebug("process %d/%d input data blocks, %s", current, (int32_t)total, id); + int32_t current = pInfo->validBlockIndex++; + qDebug("process %d/%d input data blocks, %s", current, (int32_t) total, id); SPackedData* pData = taosArrayGet(pInfo->pBlockLists, current); SSDataBlock* pBlock = taosArrayGet(pData->pDataBlock, 0);