From 498cbc62fc4438b15b59e2bb569281405efb21e7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 30 Apr 2022 15:13:18 +0800 Subject: [PATCH 1/4] fix(query): update the capacity for ssdatablock when merge new blocks. --- include/common/tcommon.h | 2 +- include/common/tdatablock.h | 2 +- source/common/src/tdatablock.c | 52 +++++++++++++++---------- source/libs/executor/src/executorimpl.c | 7 ++-- 4 files changed, 36 insertions(+), 27 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index c0eb53d8ca..16653ebfee 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -70,7 +70,7 @@ typedef struct SDataBlockInfo { uint64_t groupId; // no need to serialize int16_t numOfCols; int16_t hasVarCol; - int16_t capacity; + int32_t capacity; } SDataBlockInfo; typedef struct SSDataBlock { diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 71e2e4fba3..0d5262fe29 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -183,7 +183,7 @@ static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, u } int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull); -int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource, +int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity, const SColumnInfoData* pSource, uint32_t numOfRow2); int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows); int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 4a6fef4626..6619f46da0 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -168,13 +168,6 @@ static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, c uint32_t total = numOfRow1 + numOfRow2; - if (BitmapLen(numOfRow1) < BitmapLen(total)) { - char* tmp = taosMemoryRealloc(pColumnInfoData->nullbitmap, BitmapLen(total)); - uint32_t extend = BitmapLen(total) - BitmapLen(numOfRow1); - memset(tmp + BitmapLen(numOfRow1), 0, extend); - pColumnInfoData->nullbitmap = tmp; - } - uint32_t remindBits = BitPos(numOfRow1); uint32_t shiftBits = 8 - remindBits; @@ -209,10 +202,9 @@ static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, c } } -int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource, - uint32_t numOfRow2) { +int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity, + const SColumnInfoData* pSource, uint32_t numOfRow2) { ASSERT(pColumnInfoData != NULL && pSource != NULL && pColumnInfoData->info.type == pSource->info.type); - if (numOfRow2 == 0) { return numOfRow1; } @@ -221,14 +213,19 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, co pColumnInfoData->hasNull = pSource->hasNull; } + uint32_t finalNumOfRows = numOfRow1 + numOfRow2; if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { // Handle the bitmap - char* p = taosMemoryRealloc(pColumnInfoData->varmeta.offset, sizeof(int32_t) * (numOfRow1 + numOfRow2)); - if (p == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + if (finalNumOfRows > *capacity) { + char* p = taosMemoryRealloc(pColumnInfoData->varmeta.offset, sizeof(int32_t) * (numOfRow1 + numOfRow2)); + if (p == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + *capacity = finalNumOfRows; + pColumnInfoData->varmeta.offset = (int32_t*)p; } - pColumnInfoData->varmeta.offset = (int32_t*)p; for (int32_t i = 0; i < numOfRow2; ++i) { if (pSource->varmeta.offset[i] == -1) { pColumnInfoData->varmeta.offset[i + numOfRow1] = -1; @@ -253,15 +250,27 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, co memcpy(pColumnInfoData->pData + oldLen, pSource->pData, len); pColumnInfoData->varmeta.length = len + oldLen; } else { - doBitmapMerge(pColumnInfoData, numOfRow1, pSource, numOfRow2); + if (finalNumOfRows > *capacity) { + char* tmp = taosMemoryRealloc(pColumnInfoData->pData, finalNumOfRows); + if (tmp == NULL) { + return TSDB_CODE_VND_OUT_OF_MEMORY; + } - int32_t newSize = (numOfRow1 + numOfRow2) * pColumnInfoData->info.bytes; - char* tmp = taosMemoryRealloc(pColumnInfoData->pData, newSize); - if (tmp == NULL) { - return TSDB_CODE_VND_OUT_OF_MEMORY; + pColumnInfoData->pData = tmp; + + if (BitmapLen(numOfRow1) < BitmapLen(finalNumOfRows)) { + char* btmp = taosMemoryRealloc(pColumnInfoData->nullbitmap, BitmapLen(finalNumOfRows)); + uint32_t extend = BitmapLen(finalNumOfRows) - BitmapLen(numOfRow1); + memset(btmp + BitmapLen(numOfRow1), 0, extend); + + pColumnInfoData->nullbitmap = btmp; + } + + *capacity = finalNumOfRows; } - pColumnInfoData->pData = tmp; + doBitmapMerge(pColumnInfoData, numOfRow1, pSource, numOfRow2); + int32_t offset = pColumnInfoData->info.bytes * numOfRow1; memcpy(pColumnInfoData->pData + offset, pSource->pData, pSource->info.bytes * numOfRow2); } @@ -357,6 +366,7 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc, SArray* pInd if(pIndexMap) { mapIndex = *(int32_t*)taosArrayGet(pIndexMap, i); } + SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i); SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, mapIndex); @@ -367,7 +377,7 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc, SArray* pInd char* tmp = taosMemoryRealloc(pCol2->pData, newSize); if (tmp != NULL) { pCol2->pData = tmp; - colDataMergeCol(pCol2, pDest->info.rows, pCol1, pSrc->info.rows); + colDataMergeCol(pCol2, pDest->info.rows, &pDest->info.capacity, pCol1, pSrc->info.rows); } else { return TSDB_CODE_VND_OUT_OF_MEMORY; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index e38f757f10..9e5e17a053 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1199,7 +1199,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); if (pResult->info.rows > 0 && !createNewColModel) { - colDataMergeCol(pColInfoData, pResult->info.rows, pfCtx->input.pData[0], pfCtx->input.numOfRows); + colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pfCtx->input.pData[0], pfCtx->input.numOfRows); } else { colDataAssign(pColInfoData, pfCtx->input.pData[0], pfCtx->input.numOfRows); } @@ -1227,7 +1227,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest); int32_t startOffset = createNewColModel ? 0 : pResult->info.rows; - colDataMergeCol(pResColData, startOffset, &idata, dest.numOfRows); + colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows); numOfRows = dest.numOfRows; taosArrayDestroy(pBlockList); @@ -1268,7 +1268,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc } int32_t startOffset = createNewColModel ? 0 : pResult->info.rows; - colDataMergeCol(pResColData, startOffset, &idata, dest.numOfRows); + colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows); numOfRows = dest.numOfRows; taosArrayDestroy(pBlockList); @@ -7170,7 +7170,6 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator, bool* newgroup) { SJoinOperatorInfo* pJoinInfo = pOperator->info; - // SOptrBasicInfo* pInfo = &pJoinInfo->binfo; SSDataBlock* pRes = pJoinInfo->pRes; blockDataCleanup(pRes); From 912e0df6bb4e23e993f61984dac0dd0dd3a66336 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 30 Apr 2022 15:31:43 +0800 Subject: [PATCH 2/4] fix(query): set the correct memory buffer size when merging two column data. --- source/common/src/tdatablock.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 6619f46da0..e237f61e20 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -361,6 +361,8 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc, SArray* pInd assert(pSrc != NULL && pDest != NULL); int32_t numOfCols = pDest->info.numOfCols; + int32_t capacity = pDest->info.capacity; + for (int32_t i = 0; i < numOfCols; ++i) { int32_t mapIndex = i; if(pIndexMap) { @@ -377,12 +379,14 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc, SArray* pInd char* tmp = taosMemoryRealloc(pCol2->pData, newSize); if (tmp != NULL) { pCol2->pData = tmp; - colDataMergeCol(pCol2, pDest->info.rows, &pDest->info.capacity, pCol1, pSrc->info.rows); + capacity = pDest->info.capacity; + colDataMergeCol(pCol2, pDest->info.rows, &capacity, pCol1, pSrc->info.rows); } else { return TSDB_CODE_VND_OUT_OF_MEMORY; } } + pDest->info.capacity = capacity; pDest->info.rows += pSrc->info.rows; return TSDB_CODE_SUCCESS; } From 2bbeeb5c5b69c3c826868564c683847fdcafc822 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 30 Apr 2022 15:44:06 +0800 Subject: [PATCH 3/4] refactor: do some internal refactor. --- source/common/src/tdatablock.c | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index e237f61e20..b065ba4fd4 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -251,7 +251,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, in pColumnInfoData->varmeta.length = len + oldLen; } else { if (finalNumOfRows > *capacity) { - char* tmp = taosMemoryRealloc(pColumnInfoData->pData, finalNumOfRows); + char* tmp = taosMemoryRealloc(pColumnInfoData->pData, finalNumOfRows * pColumnInfoData->info.bytes); if (tmp == NULL) { return TSDB_CODE_VND_OUT_OF_MEMORY; } @@ -372,18 +372,18 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc, SArray* pInd SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i); SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, mapIndex); - uint32_t oldLen = colDataGetLength(pCol2, pDest->info.rows); - uint32_t newLen = colDataGetLength(pCol1, pSrc->info.rows); +// uint32_t oldLen = colDataGetLength(pCol2, pDest->info.rows); +// uint32_t newLen = colDataGetLength(pCol1, pSrc->info.rows); - int32_t newSize = oldLen + newLen; - char* tmp = taosMemoryRealloc(pCol2->pData, newSize); - if (tmp != NULL) { - pCol2->pData = tmp; +// int32_t newSize = oldLen + newLen; +// char* tmp = taosMemoryRealloc(pCol2->pData, newSize); +// if (tmp != NULL) { +// pCol2->pData = tmp; capacity = pDest->info.capacity; colDataMergeCol(pCol2, pDest->info.rows, &capacity, pCol1, pSrc->info.rows); - } else { - return TSDB_CODE_VND_OUT_OF_MEMORY; - } +// } else { +// return TSDB_CODE_VND_OUT_OF_MEMORY; +// } } pDest->info.capacity = capacity; From c52ca4cce8f96d8e3a24b29d95ae48a93110f103 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 30 Apr 2022 15:45:10 +0800 Subject: [PATCH 4/4] refactor: do some internal refactor. --- source/common/src/tdatablock.c | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index b065ba4fd4..e703063dfa 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -359,31 +359,19 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock) { // if pIndexMap = NULL, merger one column by on column int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc, SArray* pIndexMap) { assert(pSrc != NULL && pDest != NULL); - - int32_t numOfCols = pDest->info.numOfCols; int32_t capacity = pDest->info.capacity; - for (int32_t i = 0; i < numOfCols; ++i) { + for (int32_t i = 0; i < pDest->info.numOfCols; ++i) { int32_t mapIndex = i; - if(pIndexMap) { + if (pIndexMap) { mapIndex = *(int32_t*)taosArrayGet(pIndexMap, i); } SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i); SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, mapIndex); -// uint32_t oldLen = colDataGetLength(pCol2, pDest->info.rows); -// uint32_t newLen = colDataGetLength(pCol1, pSrc->info.rows); - -// int32_t newSize = oldLen + newLen; -// char* tmp = taosMemoryRealloc(pCol2->pData, newSize); -// if (tmp != NULL) { -// pCol2->pData = tmp; - capacity = pDest->info.capacity; - colDataMergeCol(pCol2, pDest->info.rows, &capacity, pCol1, pSrc->info.rows); -// } else { -// return TSDB_CODE_VND_OUT_OF_MEMORY; -// } + capacity = pDest->info.capacity; + colDataMergeCol(pCol2, pDest->info.rows, &capacity, pCol1, pSrc->info.rows); } pDest->info.capacity = capacity;