From d9c10a2f0fa7e080ca3eb986da3f51077bc623d7 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Thu, 11 Aug 2022 19:59:47 +0800 Subject: [PATCH] other: rsma submit --- include/common/tdatablock.h | 2 +- source/common/src/tdatablock.c | 11 +++--- source/dnode/vnode/src/sma/smaRollup.c | 49 ++++++++++++++------------ 3 files changed, 32 insertions(+), 30 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index c77cb16e86..410fa02ded 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -246,7 +246,7 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag); // for debug char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf); -int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, +int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataBlocks, STSchema* pTSchema, int32_t vgId, tb_uid_t suid); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index c5bcab4e76..8489627721 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1875,15 +1875,15 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) * @param suid * */ -int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, +int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataBlock, STSchema* pTSchema, int32_t vgId, tb_uid_t suid) { - int32_t sz = taosArrayGetSize(pDataBlocks); int32_t bufSize = sizeof(SSubmitReq); + int32_t sz = 1; for (int32_t i = 0; i < sz; ++i) { - SDataBlockInfo* pBlkInfo = &((SSDataBlock*)taosArrayGetP(pDataBlocks, i))->info; + const SDataBlockInfo* pBlkInfo = &pDataBlock->info; - int32_t numOfCols = taosArrayGetSize(pDataBlocks); - bufSize += pBlkInfo->rows * (TD_ROW_HEAD_LEN + pBlkInfo->rowSize + BitmapLen(numOfCols)); + int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); + bufSize += pBlkInfo->rows * (TD_ROW_HEAD_LEN + pBlkInfo->rowSize + BitmapLen(colNum)); bufSize += sizeof(SSubmitBlk); } @@ -1900,7 +1900,6 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks tdSRowInit(&rb, pTSchema->version); for (int32_t i = 0; i < sz; ++i) { - SSDataBlock* pDataBlock = taosArrayGetP(pDataBlocks, i); int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); int32_t rows = pDataBlock->info.rows; // int32_t rowSize = pDataBlock->info.rowSize; diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index bc71851160..b7a2efd489 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -615,7 +615,7 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm while (1) { uint64_t ts; int32_t code = qExecTaskOpt(taskInfo, pResList, &ts); - if (code < 0) { + if (code < 0) { if (code == TSDB_CODE_QRY_IN_EXEC) { break; } else { @@ -627,43 +627,46 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm if (taosArrayGetSize(pResList) == 0) { if (terrno == 0) { - smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched yet", SMA_VID(pSma), pItem->level); + // smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched yet", SMA_VID(pSma), pItem->level); } else { smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched since %s", SMA_VID(pSma), pItem->level, terrstr()); goto _err; } break; + } else { + smaDebug("vgId:%d, rsma %" PRIi8 " data fetched", SMA_VID(pSma), pItem->level); } -#if 0 +#if 1 char flag[10] = {0}; snprintf(flag, 10, "level %" PRIi8, pItem->level); blockDebugShowDataBlocks(pResList, flag); #endif - STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); - SSubmitReq *pReq = NULL; + for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) { + SSDataBlock *output = taosArrayGetP(pResList, i); + STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); + SSubmitReq *pReq = NULL; - // TODO: the schema update should be handled later(TD-17965) - if (buildSubmitReqFromDataBlock(&pReq, pResList, pTSchema, SMA_VID(pSma), suid) < 0) { - smaError("vgId:%d, build submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma), - suid, pItem->level, terrstr()); - goto _err; - } + // TODO: the schema update should be handled later(TD-17965) + if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, SMA_VID(pSma), suid) < 0) { + smaError("vgId:%d, build submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s", + SMA_VID(pSma), suid, pItem->level, terrstr()); + goto _err; + } - SSDataBlock *pBlk = (SSDataBlock *)taosArrayGet(pResList, 0); - if (pReq && tdProcessSubmitReq(sinkTsdb, pBlk->info.version, pReq) < 0) { + if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) { + taosMemoryFreeClear(pReq); + smaError("vgId:%d, process submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s", + SMA_VID(pSma), suid, pItem->level, terrstr()); + goto _err; + } taosMemoryFreeClear(pReq); - smaError("vgId:%d, process submit req for rsma stable %" PRIi64 " level %" PRIi8 " version %" PRIi64 - " failed since %s", - SMA_VID(pSma), suid, pItem->level, terrstr()); - goto _err; + + smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " version:%" PRIi64, + SMA_VID(pSma), suid, pItem->level, output->info.version); + } - - smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " version %" PRIi64, SMA_VID(pSma), - suid, pItem->level); - - taosMemoryFreeClear(pReq); } taosArrayDestroy(pResList); @@ -1385,7 +1388,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { } _end: - taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId); + // taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId); tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); }