From c9a1b3ba011878265d1e2f08bd4933ee8385a711 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 18 Jan 2023 16:24:29 +0800 Subject: [PATCH 01/17] fix(query): handle the multi-group limit/offset in table group merge scan/ multiway-merge executor. --- source/libs/executor/inc/executorimpl.h | 3 ++- source/libs/executor/src/exchangeoperator.c | 11 ++++++--- source/libs/executor/src/executil.c | 5 ++++ source/libs/executor/src/projectoperator.c | 12 ++++++--- source/libs/executor/src/scanoperator.c | 27 +++++++++++++++------ source/libs/executor/src/sortoperator.c | 11 +++++---- 6 files changed, 48 insertions(+), 21 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index fffe687ff6..c68f7c4697 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -705,7 +705,8 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo); void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo); -void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator); +void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo); +bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator); void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput); diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 0de2836f55..037b33dc9f 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -738,9 +738,7 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa } // reset the value for a new group data - pLimitInfo->numOfOutputRows = 0; - pLimitInfo->remainOffset = pLimitInfo->limit.offset; - + resetLimitInfoForNextGroup(pLimitInfo); // existing rows that belongs to previous group. if (pBlock->info.rows > 0) { return PROJECT_RETRIEVE_DONE; @@ -766,7 +764,12 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows); blockDataKeepFirstNRows(pBlock, keepRows); if (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups) { - pOperator->status = OP_EXEC_DONE; + setOperatorCompleted(pOperator); + } else { + // current group limitation is reached, and future blocks of this group need to be discarded. + if (pBlock->info.rows == 0) { + return PROJECT_RETRIEVE_CONTINUE; + } } return PROJECT_RETRIEVE_DONE; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 36981f501e..757324a773 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1759,6 +1759,11 @@ void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimit pLimitInfo->remainGroupOffset = slimit.offset; } +void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo) { + pLimitInfo->numOfOutputRows = 0; + pLimitInfo->remainOffset = pLimitInfo->limit.offset; +} + uint64_t tableListGetSize(const STableListInfo* pTableList) { ASSERT(taosArrayGetSize(pTableList->pTableList) == taosHashGetSize(pTableList->map)); return taosArrayGetSize(pTableList->pTableList); diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 4d38f2c8e9..3e3610827b 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -175,8 +175,7 @@ static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, S // reset the value for a new group data // existing rows that belongs to previous group. - pLimitInfo->numOfOutputRows = 0; - pLimitInfo->remainOffset = pLimitInfo->limit.offset; + resetLimitInfoForNextGroup(pLimitInfo); } return PROJECT_RETRIEVE_DONE; @@ -200,10 +199,18 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS if (pLimitInfo->limit.limit >= 0 && pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) { int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows); blockDataKeepFirstNRows(pBlock, keepRows); + // TODO: optimize it later when partition by + limit + // all retrieved requirement has been fulfilled, let's finish this if ((pLimitInfo->slimit.limit == -1 && pLimitInfo->currentGroupId == 0) || (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) { setOperatorCompleted(pOperator); + } else { + // Even current group is done, there may be many vgroups remain existed, and we need to continue to retrieve data + // from next group. So let's continue this retrieve process + if (keepRows == 0) { + return PROJECT_RETRIEVE_CONTINUE; + } } } @@ -357,7 +364,6 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; } - // printDataBlock1(p, "project"); return (p->info.rows > 0) ? p : NULL; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index eb38299938..813763fffa 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -257,7 +257,7 @@ static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlo } // todo handle the slimit info -void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) { +bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) { SLimit* pLimit = &pLimitInfo->limit; const char* id = GET_TASKID(pTaskInfo); @@ -266,6 +266,7 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo pLimitInfo->remainOffset -= pBlock->info.rows; blockDataEmpty(pBlock); qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset, id); + return false; } else { blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset); pLimitInfo->remainOffset = 0; @@ -274,13 +275,14 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo if (pLimit->limit != -1 && pLimit->limit <= (pLimitInfo->numOfOutputRows + pBlock->info.rows)) { // limit the output rows - int32_t overflowRows = pLimitInfo->numOfOutputRows + pBlock->info.rows - pLimit->limit; - int32_t keep = pBlock->info.rows - overflowRows; + int32_t keep = (int32_t)(pLimit->limit - pLimitInfo->numOfOutputRows); blockDataKeepFirstNRows(pBlock, keep); qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, id); - pOperator->status = OP_EXEC_DONE; + return true; } + + return false; } static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableScanInfo, SSDataBlock* pBlock, @@ -391,7 +393,10 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca } } - applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo, pOperator); + bool limitReached = applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo, pOperator); + if (limitReached) { // set operator flag is done + setOperatorCompleted(pOperator); + } pCost->totalRows += pBlock->info.rows; pTableScanInfo->limitInfo.numOfOutputRows = pCost->totalRows; @@ -768,8 +773,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { // reset value for the next group data output pOperator->status = OP_OPENED; - pInfo->base.limitInfo.numOfOutputRows = 0; - pInfo->base.limitInfo.remainOffset = pInfo->base.limitInfo.limit.offset; + resetLimitInfoForNextGroup(&pInfo->base.limitInfo); int32_t num = 0; STableKeyInfo* pList = NULL; @@ -2685,9 +2689,12 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) { taosArrayDestroy(pInfo->queryConds); pInfo->queryConds = NULL; + resetLimitInfoForNextGroup(&pInfo->limitInfo); return TSDB_CODE_SUCCESS; } +// all data produced by this function only belongs to one group +// slimit/soffset does not need to be concerned here, since this function only deal with data within one group. SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pResBlock, int32_t capacity, SOperatorInfo* pOperator) { STableMergeScanInfo* pInfo = pOperator->info; @@ -2707,10 +2714,12 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* } } - qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), pResBlock->info.rows); applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo, pOperator); pInfo->limitInfo.numOfOutputRows += pResBlock->info.rows; + qDebug("%s get sorted row block, rows:%d, limit:%"PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows, + pInfo->limitInfo.numOfOutputRows); + return (pResBlock->info.rows > 0) ? pResBlock : NULL; } @@ -2749,11 +2758,13 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { pOperator->resultInfo.totalRows += pBlock->info.rows; return pBlock; } else { + // Data of this group are all dumped, let's try the next group stopGroupTableMergeScan(pOperator); if (pInfo->tableEndIndex >= tableListSize - 1) { setOperatorCompleted(pOperator); break; } + pInfo->tableStartIndex = pInfo->tableEndIndex + 1; pInfo->groupId = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->tableStartIndex)->groupId; startGroupTableMergeScan(pOperator); diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index f5dc6cc623..97b4fd9dc4 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -680,11 +680,13 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData break; } + bool limitReached = applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo, pOperator); + if (limitReached) { + resetLimitInfoForNextGroup(&pInfo->limitInfo); + } + if (p->info.rows > 0) { - applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo, pOperator); - if (p->info.rows > 0) { - break; - } + break; } } @@ -698,7 +700,6 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info); } - pInfo->limitInfo.numOfOutputRows += p->info.rows; pDataBlock->info.rows = p->info.rows; pDataBlock->info.id.groupId = pInfo->groupId; pDataBlock->info.dataLoad = 1; From f1d62626303e6dd33bcde6a3005186829db385fc Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Wed, 18 Jan 2023 12:26:01 +0800 Subject: [PATCH 02/17] enh: optimize execute plan when 'tail' function is used with 'partition by' clause --- source/libs/planner/src/planOptimizer.c | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 03e0275d7d..e901297f4d 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -1080,29 +1080,29 @@ static bool sortPriKeyOptMayBeOptimized(SLogicNode* pNode) { return false; } SSortLogicNode* pSort = (SSortLogicNode*)pNode; - if (pSort->groupSort || !sortPriKeyOptIsPriKeyOrderBy(pSort->pSortKeys) || 1 != LIST_LENGTH(pSort->node.pChildren)) { + if (!sortPriKeyOptIsPriKeyOrderBy(pSort->pSortKeys) || 1 != LIST_LENGTH(pSort->node.pChildren)) { return false; } return true; } -static int32_t sortPriKeyOptGetSequencingNodesImpl(SLogicNode* pNode, bool* pNotOptimize, +static int32_t sortPriKeyOptGetSequencingNodesImpl(SLogicNode* pNode, bool groupSort, bool* pNotOptimize, SNodeList** pSequencingNodes) { switch (nodeType(pNode)) { case QUERY_NODE_LOGIC_PLAN_SCAN: { SScanLogicNode* pScan = (SScanLogicNode*)pNode; - if (NULL != pScan->pGroupTags || TSDB_SYSTEM_TABLE == pScan->tableType) { + if ((!groupSort && NULL != pScan->pGroupTags) || TSDB_SYSTEM_TABLE == pScan->tableType) { *pNotOptimize = true; return TSDB_CODE_SUCCESS; } return nodesListMakeAppend(pSequencingNodes, (SNode*)pNode); } case QUERY_NODE_LOGIC_PLAN_JOIN: { - int32_t code = sortPriKeyOptGetSequencingNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), + int32_t code = sortPriKeyOptGetSequencingNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), groupSort, pNotOptimize, pSequencingNodes); if (TSDB_CODE_SUCCESS == code) { - code = sortPriKeyOptGetSequencingNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 1), pNotOptimize, - pSequencingNodes); + code = sortPriKeyOptGetSequencingNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 1), groupSort, + pNotOptimize, pSequencingNodes); } return code; } @@ -1121,13 +1121,13 @@ static int32_t sortPriKeyOptGetSequencingNodesImpl(SLogicNode* pNode, bool* pNot return TSDB_CODE_SUCCESS; } - return sortPriKeyOptGetSequencingNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), pNotOptimize, - pSequencingNodes); + return sortPriKeyOptGetSequencingNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), groupSort, + pNotOptimize, pSequencingNodes); } -static int32_t sortPriKeyOptGetSequencingNodes(SLogicNode* pNode, SNodeList** pSequencingNodes) { +static int32_t sortPriKeyOptGetSequencingNodes(SLogicNode* pNode, bool groupSort, SNodeList** pSequencingNodes) { bool notOptimize = false; - int32_t code = sortPriKeyOptGetSequencingNodesImpl(pNode, ¬Optimize, pSequencingNodes); + int32_t code = sortPriKeyOptGetSequencingNodesImpl(pNode, groupSort, ¬Optimize, pSequencingNodes); if (TSDB_CODE_SUCCESS != code || notOptimize) { NODES_CLEAR_LIST(*pSequencingNodes); } @@ -1175,8 +1175,8 @@ static int32_t sortPriKeyOptApply(SOptimizeContext* pCxt, SLogicSubplan* pLogicS static int32_t sortPrimaryKeyOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SSortLogicNode* pSort) { SNodeList* pSequencingNodes = NULL; - int32_t code = - sortPriKeyOptGetSequencingNodes((SLogicNode*)nodesListGetNode(pSort->node.pChildren, 0), &pSequencingNodes); + int32_t code = sortPriKeyOptGetSequencingNodes((SLogicNode*)nodesListGetNode(pSort->node.pChildren, 0), + pSort->groupSort, &pSequencingNodes); if (TSDB_CODE_SUCCESS == code && NULL != pSequencingNodes) { code = sortPriKeyOptApply(pCxt, pLogicSubplan, pSort, pSequencingNodes); } From d130d1a84af664b1620049ffe5415e328a2ade79 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 18 Jan 2023 17:13:35 +0800 Subject: [PATCH 03/17] fix: atexit memory leak issue --- source/libs/executor/src/executor.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 814ead57f0..9f5f7eca98 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -24,7 +24,10 @@ static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT; int32_t exchangeObjRefPool = -1; -static void initRefPool() { exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo); } +static void initRefPool() { + exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo); + atexit(cleanupRefPool); +} static void cleanupRefPool() { int32_t ref = atomic_val_compare_exchange_32(&exchangeObjRefPool, exchangeObjRefPool, 0); taosCloseRef(ref); @@ -442,7 +445,6 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; taosThreadOnce(&initPoolOnce, initRefPool); - atexit(cleanupRefPool); qDebug("start to create subplan task, TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, pSubplan->id.queryId); From 2d579cfc16d60d35764007a832bca566557c698c Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 18 Jan 2023 17:27:27 +0800 Subject: [PATCH 04/17] fix: fix compile issue --- source/libs/executor/src/executor.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 9f5f7eca98..6c354c3d61 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -24,15 +24,16 @@ static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT; int32_t exchangeObjRefPool = -1; -static void initRefPool() { - exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo); - atexit(cleanupRefPool); -} static void cleanupRefPool() { int32_t ref = atomic_val_compare_exchange_32(&exchangeObjRefPool, exchangeObjRefPool, 0); taosCloseRef(ref); } +static void initRefPool() { + exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo); + atexit(cleanupRefPool); +} + static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) { ASSERT(pOperator != NULL); if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { From 0251750c895a24117dc8a1652277b38bb9468235 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Fri, 20 Jan 2023 16:45:09 +0800 Subject: [PATCH 05/17] fix: packaging/tools/install.sh for main (#19649) --- packaging/tools/install.sh | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/packaging/tools/install.sh b/packaging/tools/install.sh index 2a078b5eab..4be179b04d 100755 --- a/packaging/tools/install.sh +++ b/packaging/tools/install.sh @@ -743,6 +743,34 @@ function is_version_compatible() { esac } +deb_erase() { + confirm="" + while [ "" == "${confirm}" ]; do + echo -e -n "${RED}Exist tdengine deb detected, do you want to remove it? [yes|no] ${NC}:" + read confirm + if [ "yes" == "$confirm" ]; then + ${csudo}dpkg --remove tdengine ||: + break + elif [ "no" == "$confirm" ]; then + break + fi + done +} + +rpm_erase() { + confirm="" + while [ "" == "${confirm}" ]; do + echo -e -n "${RED}Exist tdengine rpm detected, do you want to remove it? [yes|no] ${NC}:" + read confirm + if [ "yes" == "$confirm" ]; then + ${csudo}rpm -e tdengine ||: + break + elif [ "no" == "$confirm" ]; then + break + fi + done +} + function updateProduct() { # Check if version compatible if ! is_version_compatible; then @@ -755,6 +783,13 @@ function updateProduct() { echo "File ${tarName} does not exist" exit 1 fi + + if echo $osinfo | grep -qwi "centos"; then + rpm -q tdengine 2>&1 > /dev/null && rpm_erase tdengine ||: + elif echo $osinfo | grep -qwi "ubuntu"; then + dpkg -l tdengine 2>&1 > /dev/null && deb_erase tdengine ||: + fi + tar -zxf ${tarName} install_jemalloc From 490cbaf55b8be37e0395317869244ef5211bc86c Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Fri, 20 Jan 2023 22:09:37 +0800 Subject: [PATCH 06/17] fix: reenable -e in release.sh for main (#19650) * fix: taos-tools deb/rpm compn for main * fix: update taos-tools 5aa25e9 * fix: reenable -e in release.sh --- packaging/release.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packaging/release.sh b/packaging/release.sh index e442e07c6a..1dfbf2b112 100755 --- a/packaging/release.sh +++ b/packaging/release.sh @@ -2,7 +2,7 @@ # # Generate the deb package for ubuntu, or rpm package for centos, or tar.gz package for other linux os -# set -e +set -e # set -x # release.sh -v [cluster | edge] From 39b69ba39e8f25c3cafd367f0f08f12580413d2b Mon Sep 17 00:00:00 2001 From: kailixu Date: Sat, 21 Jan 2023 13:44:25 +0800 Subject: [PATCH 07/17] fix: row schema version for row merger --- source/dnode/vnode/src/tsdb/tsdbRead.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index ee93756f0e..85282a2340 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1758,11 +1758,14 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } if (minKey == k.ts) { + STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); + if (pSchema == NULL) { + return terrno; + } if (init) { - tRowMerge(&merge, pRow); + tRowMergerAdd(&merge, pRow, pSchema); } else { init = true; - STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); int32_t code = tRowMergerInit(&merge, pRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; From 2416d34a80068e721fdf25c349ec020c7d4e600f Mon Sep 17 00:00:00 2001 From: kailixu Date: Sat, 21 Jan 2023 16:07:27 +0800 Subject: [PATCH 08/17] fix: memory leak for row merge --- source/dnode/vnode/src/tsdb/tsdbUtil.c | 49 +++++++++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 86adc1dc80..030e482bf1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -722,7 +722,7 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { pTColumn = &pMerger->pTSchema->columns[iCol]; if (pTSchema->columns[jCol].colId < pTColumn->colId) { ++jCol; - --iCol; + // --iCol; continue; } else if (pTSchema->columns[jCol].colId > pTColumn->colId) { continue; @@ -731,6 +731,7 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal); if (key.version > pMerger->version) { +#if 0 if (!COL_VAL_IS_NONE(pColVal)) { if ((!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) { SColVal *tColVal = taosArrayGet(pMerger->pArray, iCol); @@ -743,10 +744,34 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { } tColVal->flag = 0; } else { + + taosArraySet(pMerger->pArray, iCol, pColVal); + } + } +#endif + if (!COL_VAL_IS_NONE(pColVal)) { + if (IS_VAR_DATA_TYPE(pColVal->type)) { + SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol); + if (!COL_VAL_IS_NULL(pColVal)) { + code = tRealloc(&pTColVal->value.pData, pColVal->value.nData); + if (code) return code; + + pTColVal->value.nData = pColVal->value.nData; + if (pTColVal->value.nData) { + memcpy(pTColVal->value.pData, pColVal->value.pData, pTColVal->value.nData); + } + pTColVal->flag = 0; + } else { + tFree(pTColVal->value.pData); + pTColVal->value.pData = NULL; + taosArraySet(pMerger->pArray, iCol, pColVal); + } + } else { taosArraySet(pMerger->pArray, iCol, pColVal); } } } else if (key.version < pMerger->version) { +#if 0 SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol); if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) { if ((!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) { @@ -762,6 +787,28 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { taosArraySet(pMerger->pArray, iCol, pColVal); } } +#endif + SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol); + if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) { + if (IS_VAR_DATA_TYPE(pColVal->type)) { + if (!COL_VAL_IS_NULL(pColVal)) { + code = tRealloc(&tColVal->value.pData, pColVal->value.nData); + if (code) return code; + + tColVal->value.nData = pColVal->value.nData; + if (tColVal->value.nData) { + memcpy(tColVal->value.pData, pColVal->value.pData, tColVal->value.nData); + } + tColVal->flag = 0; + } else { + tFree(tColVal->value.pData); + tColVal->value.pData = NULL; + taosArraySet(pMerger->pArray, iCol, pColVal); + } + } else { + taosArraySet(pMerger->pArray, iCol, pColVal); + } + } } else { ASSERT(0 && "dup versions not allowed"); } From 338a2a74cb48f6a6d266b52c9702fc3160730422 Mon Sep 17 00:00:00 2001 From: kailixu Date: Sat, 21 Jan 2023 16:08:33 +0800 Subject: [PATCH 09/17] chore: revert the code --- source/dnode/vnode/src/tsdb/tsdbUtil.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 030e482bf1..8da0b268fb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -722,7 +722,7 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { pTColumn = &pMerger->pTSchema->columns[iCol]; if (pTSchema->columns[jCol].colId < pTColumn->colId) { ++jCol; - // --iCol; + --iCol; continue; } else if (pTSchema->columns[jCol].colId > pTColumn->colId) { continue; From 08a5ed4a076d74bcb5ca7ec0110b6bfe7b24824a Mon Sep 17 00:00:00 2001 From: kailixu Date: Sat, 21 Jan 2023 16:11:33 +0800 Subject: [PATCH 10/17] chore: revert the code --- source/dnode/vnode/src/tsdb/tsdbUtil.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 8da0b268fb..6cc8540d38 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -744,7 +744,6 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { } tColVal->flag = 0; } else { - taosArraySet(pMerger->pArray, iCol, pColVal); } } From 1e06fb17836c7f8f5f030a1c8ede8ad654032e40 Mon Sep 17 00:00:00 2001 From: kailixu Date: Sat, 21 Jan 2023 16:15:12 +0800 Subject: [PATCH 11/17] chore: code optimization --- source/dnode/vnode/src/tsdb/tsdbUtil.c | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 6cc8540d38..ab10960970 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -770,7 +770,6 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { } } } else if (key.version < pMerger->version) { -#if 0 SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol); if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) { if ((!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) { @@ -786,28 +785,6 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { taosArraySet(pMerger->pArray, iCol, pColVal); } } -#endif - SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol); - if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) { - if (IS_VAR_DATA_TYPE(pColVal->type)) { - if (!COL_VAL_IS_NULL(pColVal)) { - code = tRealloc(&tColVal->value.pData, pColVal->value.nData); - if (code) return code; - - tColVal->value.nData = pColVal->value.nData; - if (tColVal->value.nData) { - memcpy(tColVal->value.pData, pColVal->value.pData, tColVal->value.nData); - } - tColVal->flag = 0; - } else { - tFree(tColVal->value.pData); - tColVal->value.pData = NULL; - taosArraySet(pMerger->pArray, iCol, pColVal); - } - } else { - taosArraySet(pMerger->pArray, iCol, pColVal); - } - } } else { ASSERT(0 && "dup versions not allowed"); } From 21d57a36240c506b47313911375b4e82c906c354 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 22 Jan 2023 01:45:29 +0800 Subject: [PATCH 12/17] fix(query): check for failure during add new buf pages. --- source/libs/executor/src/groupoperator.c | 10 ++- source/util/src/tpagedbuf.c | 104 +++++++++++------------ 2 files changed, 60 insertions(+), 54 deletions(-) diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 5676e19cdf..bf4b9a2599 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -593,8 +593,11 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf int32_t pageId = 0; pPage = getNewBufPage(pInfo->pBuf, &pageId); - taosArrayPush(p->pPageList, &pageId); + if (pPage == NULL) { + return pPage; + } + taosArrayPush(p->pPageList, &pageId); *(int32_t*)pPage = 0; } else { int32_t* curId = taosArrayGetLast(p->pPageList); @@ -612,6 +615,11 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf // add a new page for current group int32_t pageId = 0; pPage = getNewBufPage(pInfo->pBuf, &pageId); + if (pPage == NULL) { + qError("failed to get new buffer, code:%s", tstrerror(terrno)); + return NULL; + } + taosArrayPush(p->pPageList, &pageId); memset(pPage, 0, getBufPageSize(pInfo->pBuf)); } diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index 87b44b2d13..f696a02e6e 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -222,7 +222,6 @@ static char* flushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { char* p = doFlushPageToDisk(pBuf, pg); setPageNotInBuf(pg); pg->dirty = false; - return p; } @@ -286,32 +285,21 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) { } static char* evacOneDataPage(SDiskbasedBuf* pBuf) { - char* bufPage = NULL; SListNode* pn = getEldestUnrefedPage(pBuf); - terrno = 0; - - // all pages are referenced by user, try to allocate new space - if (pn == NULL) { - int32_t prev = pBuf->inMemPages; - - // increase by 50% of previous mem pages - pBuf->inMemPages = (int32_t)(pBuf->inMemPages * 1.5f); - - // qWarn("%p in memory buf page not sufficient, expand from %d to %d, page size:%d", pBuf, prev, - // pBuf->inMemPages, pBuf->pageSize); - } else { - tdListPopNode(pBuf->lruList, pn); - - SPageInfo* d = *(SPageInfo**)pn->data; - ASSERTS(d->pn == pn, "d->pn not equal pn"); - - d->pn = NULL; - taosMemoryFreeClear(pn); - - bufPage = flushPageToDisk(pBuf, d); + if (pn == NULL) { // no available buffer pages now, return. + return NULL; } - return bufPage; + terrno = 0; + tdListPopNode(pBuf->lruList, pn); + + SPageInfo* d = *(SPageInfo**)pn->data; + ASSERTS(d->pn == pn, "d->pn not equal pn"); + + d->pn = NULL; + taosMemoryFreeClear(pn); + + return flushPageToDisk(pBuf, d); } static void lruListPushFront(SList* pList, SPageInfo* pi) { @@ -338,7 +326,7 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem SDiskbasedBuf* pPBuf = *pBuf; if (pPBuf == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + goto _error; } pPBuf->pageSize = pagesize; @@ -362,24 +350,50 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem pPBuf->pIdList = taosArrayInit(4, POINTER_BYTES); pPBuf->assistBuf = taosMemoryMalloc(pPBuf->pageSize + 2); // EXTRA BYTES - pPBuf->all = taosHashInit(10, fn, true, false); - pPBuf->prefix = (char*) dir; + if (pPBuf->assistBuf == NULL) { + goto _error; + } + pPBuf->all = taosHashInit(10, fn, true, false); + if (pPBuf->all == NULL) { + goto _error; + } + + pPBuf->prefix = (char*) dir; pPBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t)); // qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId, - // pPBuf->pageSize, - // pPBuf->inMemPages, pPBuf->path); + // pPBuf->pageSize, pPBuf->inMemPages, pPBuf->path); return TSDB_CODE_SUCCESS; + _error: + destroyDiskbasedBuf(pPBuf); + return TSDB_CODE_OUT_OF_MEMORY; +} + +static char* doExtractPage(SDiskbasedBuf* pBuf) { + char* availablePage = NULL; + if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) { + availablePage = evacOneDataPage(pBuf); + if (availablePage == NULL) { + uWarn("no available buf pages, current:%d, max:%d", listNEles(pBuf->lruList), pBuf->inMemPages) + } + } else { + availablePage = taosMemoryCalloc(1, getAllocPageSize(pBuf->pageSize)); // add extract bytes in case of zipped buffer increased. + if (availablePage == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + } + } + + return availablePage; } void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) { pBuf->statis.getPages += 1; - char* availablePage = NULL; - if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) { - availablePage = evacOneDataPage(pBuf); + char* availablePage = doExtractPage(pBuf); + if (availablePage == NULL) { + return NULL; } SPageInfo* pi = NULL; @@ -402,16 +416,8 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) { } // add to LRU list - ASSERT(listNEles(pBuf->lruList) < pBuf->inMemPages && pBuf->inMemPages > 0); lruListPushFront(pBuf->lruList, pi); - - // allocate buf - if (availablePage == NULL) { - pi->pData = - taosMemoryCalloc(1, getAllocPageSize(pBuf->pageSize)); // add extract bytes in case of zipped buffer increased. - } else { - pi->pData = availablePage; - } + pi->pData = availablePage; ((void**)pi->pData)[0] = pi; #ifdef BUF_PAGE_DEBUG @@ -447,18 +453,9 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { ASSERT((*pi)->pData == NULL && (*pi)->pn == NULL && (((*pi)->length >= 0 && (*pi)->offset >= 0) || ((*pi)->length == -1 && (*pi)->offset == -1))); - char* availablePage = NULL; - if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) { - availablePage = evacOneDataPage(pBuf); - if (availablePage == NULL) { - return NULL; - } - } - - if (availablePage == NULL) { - (*pi)->pData = taosMemoryCalloc(1, getAllocPageSize(pBuf->pageSize)); - } else { - (*pi)->pData = availablePage; + (*pi)->pData = doExtractPage(pBuf); + if ((*pi)->pData == NULL) { + return NULL; } // set the ptr to the new SPageInfo @@ -471,6 +468,7 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { if ((*pi)->length > 0 && (*pi)->offset >= 0) { int32_t code = loadPageFromDisk(pBuf, *pi); if (code != 0) { + terrno = code; return NULL; } } From 0abd58a95e6cdaf343dd92402eb429fc3e55937d Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Sun, 22 Jan 2023 20:33:39 +0800 Subject: [PATCH 13/17] feat: taosbenchmark specfying vgroups for main (#19660) --- cmake/taostools_CMakeLists.txt.in | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in index 1e2247eedc..a69e1578c0 100644 --- a/cmake/taostools_CMakeLists.txt.in +++ b/cmake/taostools_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taos-tools ExternalProject_Add(taos-tools GIT_REPOSITORY https://github.com/taosdata/taos-tools.git - GIT_TAG 5c53cc8 + GIT_TAG 7d24ed5 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" BINARY_DIR "" #BUILD_IN_SOURCE TRUE From c1ffbb5c1ccdd0143196c4cffa227a88745a55db Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 23 Jan 2023 01:11:04 +0800 Subject: [PATCH 14/17] refactor:do some internal refactor. --- source/libs/function/src/builtinsimpl.c | 4 -- source/libs/function/src/tpercentile.c | 13 +++--- source/util/src/tpagedbuf.c | 60 +++++++++++++++---------- 3 files changed, 43 insertions(+), 34 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 324011f238..e4081ddf0d 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3061,14 +3061,12 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, if (pHandle->currentPage == -1) { pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage); if (pPage == NULL) { - terrno = TSDB_CODE_NO_AVAIL_DISK; return terrno; } pPage->num = sizeof(SFilePage); } else { pPage = getBufPage(pHandle->pBuf, pHandle->currentPage); if (pPage == NULL) { - terrno = TSDB_CODE_NO_AVAIL_DISK; return terrno; } if (pPage->num + length > getBufPageSize(pHandle->pBuf)) { @@ -3076,7 +3074,6 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, releaseBufPage(pHandle->pBuf, pPage); pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage); if (pPage == NULL) { - terrno = TSDB_CODE_NO_AVAIL_DISK; return terrno; } pPage->num = sizeof(SFilePage); @@ -3123,7 +3120,6 @@ static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf if (pHandle->pBuf != NULL) { SFilePage* pPage = getBufPage(pHandle->pBuf, pPos->pageId); if (pPage == NULL) { - terrno = TSDB_CODE_NO_AVAIL_DISK; return terrno; } memcpy(pPage->data + pPos->offset, pBuf, length); diff --git a/source/libs/function/src/tpercentile.c b/source/libs/function/src/tpercentile.c index 871b668cfd..6b85155486 100644 --- a/source/libs/function/src/tpercentile.c +++ b/source/libs/function/src/tpercentile.c @@ -43,8 +43,8 @@ static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx) if (pg == NULL) { return NULL; } - memcpy(buffer->data + offset, pg->data, (size_t)(pg->num * pMemBucket->bytes)); + memcpy(buffer->data + offset, pg->data, (size_t)(pg->num * pMemBucket->bytes)); offset += (int32_t)(pg->num * pMemBucket->bytes); } @@ -109,7 +109,7 @@ int32_t findOnlyResult(tMemBucket *pMemBucket, double *result) { int32_t *pageId = taosArrayGet(list, 0); SFilePage *pPage = getBufPage(pMemBucket->pBuffer, *pageId); if (pPage == NULL) { - return TSDB_CODE_NO_AVAIL_DISK; + return terrno; } ASSERT(pPage->num == 1); @@ -276,7 +276,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, return NULL; } - int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, "1", tsTempDir); + int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 1024, "1", tsTempDir); if (ret != 0) { tMemBucketDestroy(pBucket); return NULL; @@ -388,7 +388,7 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) { pSlot->info.data = getNewBufPage(pBucket->pBuffer, &pageId); if (pSlot->info.data == NULL) { - return TSDB_CODE_NO_AVAIL_DISK; + return terrno; } pSlot->info.pageId = pageId; taosArrayPush(pPageIdList, &pageId); @@ -482,8 +482,9 @@ int32_t getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction // data in buffer and file are merged together to be processed. SFilePage *buffer = loadDataFromFilePage(pMemBucket, i); if (buffer == NULL) { - return TSDB_CODE_NO_AVAIL_DISK; + return terrno; } + int32_t currentIdx = count - num; char *thisVal = buffer->data + pMemBucket->bytes * currentIdx; @@ -520,7 +521,7 @@ int32_t getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction int32_t *pageId = taosArrayGet(list, f); SFilePage *pg = getBufPage(pMemBucket->pBuffer, *pageId); if (pg == NULL) { - return TSDB_CODE_NO_AVAIL_DISK; + return terrno; } int32_t code = tMemBucketPut(pMemBucket, pg->data, (int32_t)pg->num); diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index f696a02e6e..99f555e20d 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -5,7 +5,7 @@ #include "thash.h" #include "tlog.h" -#define GET_DATA_PAYLOAD(_p) ((char*)(_p)->pData + POINTER_BYTES) +#define GET_PAYLOAD_DATA(_p) ((char*)(_p)->pData + POINTER_BYTES) #define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages) typedef struct SPageDiskInfo { @@ -89,7 +89,7 @@ static char* doDecompressData(void* data, int32_t srcSize, int32_t* dst, SDiskba return data; } -static uint64_t allocatePositionInFile(SDiskbasedBuf* pBuf, size_t size) { +static uint64_t allocateNewPositionInFile(SDiskbasedBuf* pBuf, size_t size) { if (pBuf->pFree == NULL) { return pBuf->nextPos; } else { @@ -114,8 +114,6 @@ static uint64_t allocatePositionInFile(SDiskbasedBuf* pBuf, size_t size) { static void setPageNotInBuf(SPageInfo* pPageInfo) { pPageInfo->pData = NULL; } -static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize + POINTER_BYTES + sizeof(SFilePage); } - /** * +--------------------------+-------------------+--------------+ * | PTR to SPageInfo (8bytes)| Payload (PageSize)| 2 Extra Bytes| @@ -124,13 +122,16 @@ static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize * @param pg * @return */ -static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { + +static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize + POINTER_BYTES + sizeof(SFilePage); } + +static char* doFlushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) { ASSERT(!pg->used && pg->pData != NULL); int32_t size = pBuf->pageSize; char* t = NULL; if (pg->offset == -1 || pg->dirty) { - void* payload = GET_DATA_PAYLOAD(pg); + void* payload = GET_PAYLOAD_DATA(pg); t = doCompressData(payload, pBuf->pageSize, &size, pBuf); ASSERTS(size >= 0, "size is negative"); } @@ -140,7 +141,7 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { if (pg->offset == -1) { ASSERTS(pg->dirty == true, "pg->dirty is false"); - pg->offset = allocatePositionInFile(pBuf, size); + pg->offset = allocateNewPositionInFile(pBuf, size); pBuf->nextPos += size; int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET); @@ -169,7 +170,7 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { taosArrayPush(pBuf->pFree, &dinfo); // 2. allocate new position, and update the info - pg->offset = allocatePositionInFile(pBuf, size); + pg->offset = allocateNewPositionInFile(pBuf, size); pBuf->nextPos += size; } @@ -208,9 +209,8 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { return pDataBuf; } -static char* flushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { +static char* flushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) { int32_t ret = TSDB_CODE_SUCCESS; - ASSERT(((int64_t)pBuf->numOfPages * pBuf->pageSize) == pBuf->totalBufSize && pBuf->numOfPages >= pBuf->inMemPages); if (pBuf->pFile == NULL) { if ((ret = createDiskFile(pBuf)) != TSDB_CODE_SUCCESS) { @@ -219,7 +219,7 @@ static char* flushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { } } - char* p = doFlushPageToDisk(pBuf, pg); + char* p = doFlushBufPage(pBuf, pg); setPageNotInBuf(pg); pg->dirty = false; return p; @@ -233,7 +233,7 @@ static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { return ret; } - void* pPage = (void*)GET_DATA_PAYLOAD(pg); + void* pPage = (void*)GET_PAYLOAD_DATA(pg); ret = (int32_t)taosReadFile(pBuf->pFile, pPage, pg->length); if (ret != pg->length) { ret = TAOS_SYSTEM_ERROR(errno); @@ -252,6 +252,10 @@ static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t pageId) { pBuf->numOfPages += 1; SPageInfo* ppi = taosMemoryMalloc(sizeof(SPageInfo)); + if (ppi == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } ppi->pageId = pageId; ppi->pData = NULL; @@ -274,17 +278,14 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) { ASSERT(pageInfo->pageId >= 0 && pageInfo->pn == pn); if (!pageInfo->used) { - // printf("%d is chosen\n", pageInfo->pageId); break; - } else { - // printf("page %d is used, dirty:%d\n", pageInfo->pageId, pageInfo->dirty); } } return pn; } -static char* evacOneDataPage(SDiskbasedBuf* pBuf) { +static char* evictBufPage(SDiskbasedBuf* pBuf) { SListNode* pn = getEldestUnrefedPage(pBuf); if (pn == NULL) { // no available buffer pages now, return. return NULL; @@ -299,7 +300,7 @@ static char* evacOneDataPage(SDiskbasedBuf* pBuf) { d->pn = NULL; taosMemoryFreeClear(pn); - return flushPageToDisk(pBuf, d); + return flushBufPage(pBuf, d); } static void lruListPushFront(SList* pList, SPageInfo* pi) { @@ -374,8 +375,9 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem static char* doExtractPage(SDiskbasedBuf* pBuf) { char* availablePage = NULL; if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) { - availablePage = evacOneDataPage(pBuf); + availablePage = evictBufPage(pBuf); if (availablePage == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; uWarn("no available buf pages, current:%d, max:%d", listNEles(pBuf->lruList), pBuf->inMemPages) } } else { @@ -409,6 +411,9 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) { // register page id info pi = registerPage(pBuf, *pageId); + if (pi == NULL) { + return NULL; + } // add to hash map taosHashPut(pBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES); @@ -423,11 +428,15 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) { #ifdef BUF_PAGE_DEBUG uDebug("page_getNewBufPage , pi->pData:%p, pageId:%d, offset:%" PRId64, pi->pData, pi->pageId, pi->offset); #endif - return (void*)(GET_DATA_PAYLOAD(pi)); + + return (void*)(GET_PAYLOAD_DATA(pi)); } void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { - ASSERT(pBuf != NULL && id >= 0); + if (id < 0) { + return NULL; + } + pBuf->statis.getPages += 1; SPageInfo** pi = taosHashGet(pBuf->all, &id, sizeof(int32_t)); @@ -437,7 +446,7 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { // no need to update the LRU list if only one page exists if (pBuf->numOfPages == 1) { (*pi)->used = true; - return (void*)(GET_DATA_PAYLOAD(*pi)); + return (void*)(GET_PAYLOAD_DATA(*pi)); } SPageInfo** pInfo = (SPageInfo**)((*pi)->pn->data); @@ -445,10 +454,11 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { lruListMoveToFront(pBuf->lruList, (*pi)); (*pi)->used = true; + #ifdef BUF_PAGE_DEBUG uDebug("page_getBufPage1 pageId:%d, offset:%" PRId64, (*pi)->pageId, (*pi)->offset); #endif - return (void*)(GET_DATA_PAYLOAD(*pi)); + return (void*)(GET_PAYLOAD_DATA(*pi)); } else { // not in memory ASSERT((*pi)->pData == NULL && (*pi)->pn == NULL && (((*pi)->length >= 0 && (*pi)->offset >= 0) || ((*pi)->length == -1 && (*pi)->offset == -1))); @@ -475,14 +485,15 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { #ifdef BUF_PAGE_DEBUG uDebug("page_getBufPage2 pageId:%d, offset:%" PRId64, (*pi)->pageId, (*pi)->offset); #endif - return (void*)(GET_DATA_PAYLOAD(*pi)); + return (void*)(GET_PAYLOAD_DATA(*pi)); } } void releaseBufPage(SDiskbasedBuf* pBuf, void* page) { - if (ASSERTS(pBuf != NULL && page != NULL, "pBuf or page is NULL")) { + if (page == NULL) { return; } + SPageInfo* ppi = getPageInfoFromPayload(page); releaseBufPageInfo(pBuf, ppi); } @@ -491,6 +502,7 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) { #ifdef BUF_PAGE_DEBUG uDebug("page_releaseBufPageInfo pageId:%d, used:%d, offset:%" PRId64, pi->pageId, pi->used, pi->offset); #endif + if (ASSERTS(pi->pData != NULL, "pi->pData is NULL")) { return; } From 670eec4db5a572f949252121fd1b05acf7b4a83f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 24 Jan 2023 22:54:09 +0800 Subject: [PATCH 15/17] refactor: remove assert --- source/util/src/tarray.c | 42 ++++++++++++++++++------------------- source/util/src/tpagedbuf.c | 27 ++++++++++++++++++------ 2 files changed, 42 insertions(+), 27 deletions(-) diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 4bd8294423..237edfdeb2 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -20,7 +20,10 @@ // todo refactor API SArray* taosArrayInit(size_t size, size_t elemSize) { - assert(elemSize > 0); + if (elemSize == 0) { + terrno = TSDB_CODE_INVALID_PARA; + return NULL; + } if (size < TARRAY_MIN_SIZE) { size = TARRAY_MIN_SIZE; @@ -96,8 +99,6 @@ void* taosArrayAddBatch(SArray* pArray, const void* pData, int32_t nEles) { } void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*)) { - assert(pArray); - size_t size = pArray->size; if (size <= 1) { return; @@ -136,8 +137,6 @@ void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp) } void taosArrayRemoveDuplicateP(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*)) { - assert(pArray); - size_t size = pArray->size; if (size <= 1) { return; @@ -195,11 +194,10 @@ void* taosArrayReserve(SArray* pArray, int32_t num) { } void* taosArrayPop(SArray* pArray) { - assert(pArray != NULL); - if (pArray->size == 0) { return NULL; } + pArray->size -= 1; return TARRAY_GET_ELEM(pArray, pArray->size); } @@ -208,16 +206,21 @@ void* taosArrayGet(const SArray* pArray, size_t index) { if (NULL == pArray) { return NULL; } - assert(index < pArray->size); + + if (index >= pArray->size) { + uError("index is out of range, current:%"PRIzu" max:%d", index, pArray->capacity); + return NULL; + } + return TARRAY_GET_ELEM(pArray, index); } void* taosArrayGetP(const SArray* pArray, size_t index) { - assert(index < pArray->size); - - void* d = TARRAY_GET_ELEM(pArray, index); - - return *(void**)d; + void** p = taosArrayGet(pArray, index); + if (p == NULL) { + return NULL; + } + return *p; } void* taosArrayGetLast(const SArray* pArray) { return TARRAY_GET_ELEM(pArray, pArray->size - 1); } @@ -296,9 +299,12 @@ void taosArrayRemove(SArray* pArray, size_t index) { } SArray* taosArrayFromList(const void* src, size_t size, size_t elemSize) { - assert(src != NULL && elemSize > 0); - SArray* pDst = taosArrayInit(size, elemSize); + if (elemSize <= 0) { + terrno = TSDB_CODE_INVALID_PARA; + return NULL; + } + SArray* pDst = taosArrayInit(size, elemSize); memcpy(pDst->pData, src, elemSize * size); pDst->size = size; @@ -306,8 +312,6 @@ SArray* taosArrayFromList(const void* src, size_t size, size_t elemSize) { } SArray* taosArrayDup(const SArray* pSrc, __array_item_dup_fn_t fn) { - assert(pSrc != NULL); - if (pSrc->size == 0) { // empty array list return taosArrayInit(8, pSrc->elemSize); } @@ -399,14 +403,10 @@ void taosArrayDestroyEx(SArray* pArray, FDelete fp) { } void taosArraySort(SArray* pArray, __compar_fn_t compar) { - ASSERT(pArray != NULL && compar != NULL); taosSort(pArray->pData, pArray->size, pArray->elemSize, compar); } void* taosArraySearch(const SArray* pArray, const void* key, __compar_fn_t comparFn, int32_t flags) { - assert(pArray != NULL && comparFn != NULL); - assert(key != NULL); - return taosbsearch(key, pArray->pData, pArray->size, pArray->elemSize, comparFn, flags); } diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index 99f555e20d..d4103ab578 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -126,7 +126,11 @@ static void setPageNotInBuf(SPageInfo* pPageInfo) { pPageInfo->pData = NULL; } static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize + POINTER_BYTES + sizeof(SFilePage); } static char* doFlushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) { - ASSERT(!pg->used && pg->pData != NULL); + if (pg->pData == NULL || pg->used) { + uError("invalid params in paged buffer process when flushing buf to disk, %s", pBuf->id); + terrno = TSDB_CODE_INVALID_PARA; + return NULL; + } int32_t size = pBuf->pageSize; char* t = NULL; @@ -333,7 +337,6 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem pPBuf->pageSize = pagesize; pPBuf->numOfPages = 0; // all pages are in buffer in the first place pPBuf->totalBufSize = 0; - pPBuf->inMemPages = inMemBufSize / pagesize; // maximum allowed pages, it is a soft limit. pPBuf->allocateId = -1; pPBuf->pFile = NULL; pPBuf->id = strdup(id); @@ -342,13 +345,22 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem pPBuf->freePgList = tdListNew(POINTER_BYTES); // at least more than 2 pages must be in memory - ASSERT(inMemBufSize >= pagesize * 2); + if (inMemBufSize < pagesize * 2) { + inMemBufSize = pagesize * 2; + } + pPBuf->inMemPages = inMemBufSize / pagesize; // maximum allowed pages, it is a soft limit. pPBuf->lruList = tdListNew(POINTER_BYTES); + if (pPBuf->lruList == NULL) { + goto _error; + } // init id hash table _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT); pPBuf->pIdList = taosArrayInit(4, POINTER_BYTES); + if (pPBuf->pIdList == NULL) { + goto _error; + } pPBuf->assistBuf = taosMemoryMalloc(pPBuf->pageSize + 2); // EXTRA BYTES if (pPBuf->assistBuf == NULL) { @@ -503,7 +515,12 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) { uDebug("page_releaseBufPageInfo pageId:%d, used:%d, offset:%" PRId64, pi->pageId, pi->used, pi->offset); #endif - if (ASSERTS(pi->pData != NULL, "pi->pData is NULL")) { + if (pi == NULL) { + return; + } + + if (pi->pData == NULL) { + uError("pi->pData (page data) is null"); return; } @@ -514,7 +531,6 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) { size_t getTotalBufSize(const SDiskbasedBuf* pBuf) { return (size_t)pBuf->totalBufSize; } SArray* getDataBufPagesIdList(SDiskbasedBuf* pBuf) { - ASSERT(pBuf != NULL); return pBuf->pIdList; } @@ -592,7 +608,6 @@ SPageInfo* getLastPageInfo(SArray* pList) { } int32_t getPageId(const SPageInfo* pPgInfo) { - ASSERT(pPgInfo != NULL); return pPgInfo->pageId; } From fc80c3d3735a526417a7b81baad5c499ce78b048 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 24 Jan 2023 23:56:29 +0800 Subject: [PATCH 16/17] refactor:do some internal refactor. --- source/util/src/tpagedbuf.c | 61 +++++++++++++++++++++++++------------ 1 file changed, 41 insertions(+), 20 deletions(-) diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index d4103ab578..7c60862c56 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -6,6 +6,9 @@ #include "tlog.h" #define GET_PAYLOAD_DATA(_p) ((char*)(_p)->pData + POINTER_BYTES) +#define BUF_PAGE_IN_MEM(_p) ((_p)->pData != NULL) +#define CLEAR_BUF_PAGE_IN_MEM_FLAG(_p) ((_p)->pData = NULL) +#define HAS_DATA_IN_DISK(_p) ((_p)->offset >= 0) #define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages) typedef struct SPageDiskInfo { @@ -14,7 +17,7 @@ typedef struct SPageDiskInfo { } SPageDiskInfo, SFreeListItem; struct SPageInfo { - SListNode* pn; // point to list node struct + SListNode* pn; // point to list node struct. it is NULL when the page is evicted from the in-memory buffer void* pData; int64_t offset; int32_t pageId; @@ -112,8 +115,6 @@ static uint64_t allocateNewPositionInFile(SDiskbasedBuf* pBuf, size_t size) { } } -static void setPageNotInBuf(SPageInfo* pPageInfo) { pPageInfo->pData = NULL; } - /** * +--------------------------+-------------------+--------------+ * | PTR to SPageInfo (8bytes)| Payload (PageSize)| 2 Extra Bytes| @@ -134,17 +135,18 @@ static char* doFlushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) { int32_t size = pBuf->pageSize; char* t = NULL; - if (pg->offset == -1 || pg->dirty) { + if ((!HAS_DATA_IN_DISK(pg)) || pg->dirty) { void* payload = GET_PAYLOAD_DATA(pg); t = doCompressData(payload, pBuf->pageSize, &size, pBuf); - ASSERTS(size >= 0, "size is negative"); + if (size < 0) { + uError("failed to compress data when flushing data to disk, %s", pBuf->id); + return NULL; + } } // this page is flushed to disk for the first time if (pg->dirty) { - if (pg->offset == -1) { - ASSERTS(pg->dirty == true, "pg->dirty is false"); - + if (!HAS_DATA_IN_DISK(pg)) { pg->offset = allocateNewPositionInFile(pBuf, size); pBuf->nextPos += size; @@ -160,6 +162,7 @@ static char* doFlushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) { return NULL; } + // extend the file size if (pBuf->fileSize < pg->offset + size) { pBuf->fileSize = pg->offset + size; } @@ -202,13 +205,13 @@ static char* doFlushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) { size = pg->length; } - ASSERT(size > 0 || (pg->offset == -1 && pg->length == -1)); - char* pDataBuf = pg->pData; memset(pDataBuf, 0, getAllocPageSize(pBuf->pageSize)); + #ifdef BUF_PAGE_DEBUG uDebug("page_flush %p, pageId:%d, offset:%d", pDataBuf, pg->pageId, pg->offset); #endif + pg->length = size; // on disk size return pDataBuf; } @@ -224,13 +227,19 @@ static char* flushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) { } char* p = doFlushBufPage(pBuf, pg); - setPageNotInBuf(pg); + CLEAR_BUF_PAGE_IN_MEM_FLAG(pg); + pg->dirty = false; return p; } // load file block data in disk static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { + if (pg->offset < 0 || pg->length <= 0) { + uError("failed to load buf page from disk, offset:%"PRId64", length:%d, %s", pg->offset, pg->length, pBuf->id); + return TSDB_CODE_INVALID_PARA; + } + int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET); if (ret == -1) { ret = TAOS_SYSTEM_ERROR(errno); @@ -252,7 +261,7 @@ static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { return 0; } -static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t pageId) { +static SPageInfo* registerNewPageInfo(SDiskbasedBuf* pBuf, int32_t pageId) { pBuf->numOfPages += 1; SPageInfo* ppi = taosMemoryMalloc(sizeof(SPageInfo)); @@ -279,7 +288,9 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) { SListNode* pn = NULL; while ((pn = tdListNext(&iter)) != NULL) { SPageInfo* pageInfo = *(SPageInfo**)pn->data; - ASSERT(pageInfo->pageId >= 0 && pageInfo->pn == pn); + + SPageInfo* p = *(SPageInfo**)(pageInfo->pData); + ASSERT(pageInfo->pageId >= 0 && pageInfo->pn == pn && p == pageInfo); if (!pageInfo->used) { break; @@ -299,7 +310,6 @@ static char* evictBufPage(SDiskbasedBuf* pBuf) { tdListPopNode(pBuf->lruList, pn); SPageInfo* d = *(SPageInfo**)pn->data; - ASSERTS(d->pn == pn, "d->pn not equal pn"); d->pn = NULL; taosMemoryFreeClear(pn); @@ -422,7 +432,7 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) { *pageId = (++pBuf->allocateId); // register page id info - pi = registerPage(pBuf, *pageId); + pi = registerNewPageInfo(pBuf, *pageId); if (pi == NULL) { return NULL; } @@ -446,15 +456,21 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) { void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { if (id < 0) { + terrno = TSDB_CODE_INVALID_PARA; + uError("invalid page id:%d, %s", id, pBuf->id); return NULL; } pBuf->statis.getPages += 1; SPageInfo** pi = taosHashGet(pBuf->all, &id, sizeof(int32_t)); - ASSERT(pi != NULL && *pi != NULL); + if (pi == NULL || *pi == NULL) { + uError("failed to locate the buffer page:%d, %s", id, pBuf->id); + terrno = TSDB_CODE_INVALID_PARA; + return NULL; + } - if ((*pi)->pData != NULL) { // it is in memory + if (BUF_PAGE_IN_MEM(*pi)) { // it is in memory // no need to update the LRU list if only one page exists if (pBuf->numOfPages == 1) { (*pi)->used = true; @@ -462,7 +478,10 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { } SPageInfo** pInfo = (SPageInfo**)((*pi)->pn->data); - ASSERT(*pInfo == *pi); + if (*pInfo != *pi) { + uError("inconsistently data in paged buffer, pInfo:%p, pi:%p, %s", *pInfo, *pi, pBuf->id); + return NULL; + } lruListMoveToFront(pBuf->lruList, (*pi)); (*pi)->used = true; @@ -472,10 +491,12 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { #endif return (void*)(GET_PAYLOAD_DATA(*pi)); } else { // not in memory - ASSERT((*pi)->pData == NULL && (*pi)->pn == NULL && + ASSERT((!BUF_PAGE_IN_MEM(*pi)) && (*pi)->pn == NULL && (((*pi)->length >= 0 && (*pi)->offset >= 0) || ((*pi)->length == -1 && (*pi)->offset == -1))); (*pi)->pData = doExtractPage(pBuf); + + // failed to evict buffer page, return with error code. if ((*pi)->pData == NULL) { return NULL; } @@ -487,7 +508,7 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { (*pi)->used = true; // some data has been flushed to disk, and needs to be loaded into buffer again. - if ((*pi)->length > 0 && (*pi)->offset >= 0) { + if (HAS_DATA_IN_DISK(*pi)) { int32_t code = loadPageFromDisk(pBuf, *pi); if (code != 0) { terrno = code; From ab1f87d19f183bed690edb1c57bc0774dab076d9 Mon Sep 17 00:00:00 2001 From: freemine Date: Wed, 25 Jan 2023 15:32:09 +0800 Subject: [PATCH 17/17] typo correction (#19667) --- source/util/src/tcache.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/util/src/tcache.c b/source/util/src/tcache.c index 7d1686ef80..761da6986b 100644 --- a/source/util/src/tcache.c +++ b/source/util/src/tcache.c @@ -921,7 +921,7 @@ void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void *param1) void taosStopCacheRefreshWorker(void) { stopRefreshWorker = true; TdThreadOnce tmp = PTHREAD_ONCE_INIT; - if (memcmp(&cacheRefreshWorker, &tmp, sizeof(TdThreadOnce)) != 0) taosThreadJoin(cacheRefreshWorker, NULL); + if (memcmp(&cacheThreadInit, &tmp, sizeof(TdThreadOnce)) != 0) taosThreadJoin(cacheRefreshWorker, NULL); taosArrayDestroy(pCacheArrayList); }