diff --git a/include/common/tcommon.h b/include/common/tcommon.h index c0eb53d8ca..16653ebfee 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -70,7 +70,7 @@ typedef struct SDataBlockInfo { uint64_t groupId; // no need to serialize int16_t numOfCols; int16_t hasVarCol; - int16_t capacity; + int32_t capacity; } SDataBlockInfo; typedef struct SSDataBlock { diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 71e2e4fba3..0d5262fe29 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -183,7 +183,7 @@ static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, u } int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull); -int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource, +int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity, const SColumnInfoData* pSource, uint32_t numOfRow2); int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows); int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 4a6fef4626..e703063dfa 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -168,13 +168,6 @@ static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, c uint32_t total = numOfRow1 + numOfRow2; - if (BitmapLen(numOfRow1) < BitmapLen(total)) { - char* tmp = taosMemoryRealloc(pColumnInfoData->nullbitmap, BitmapLen(total)); - uint32_t extend = BitmapLen(total) - BitmapLen(numOfRow1); - memset(tmp + BitmapLen(numOfRow1), 0, extend); - pColumnInfoData->nullbitmap = tmp; - } - uint32_t remindBits = BitPos(numOfRow1); uint32_t shiftBits = 8 - remindBits; @@ -209,10 +202,9 @@ static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, c } } -int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource, - uint32_t numOfRow2) { +int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity, + const SColumnInfoData* pSource, uint32_t numOfRow2) { ASSERT(pColumnInfoData != NULL && pSource != NULL && pColumnInfoData->info.type == pSource->info.type); - if (numOfRow2 == 0) { return numOfRow1; } @@ -221,14 +213,19 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, co pColumnInfoData->hasNull = pSource->hasNull; } + uint32_t finalNumOfRows = numOfRow1 + numOfRow2; if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { // Handle the bitmap - char* p = taosMemoryRealloc(pColumnInfoData->varmeta.offset, sizeof(int32_t) * (numOfRow1 + numOfRow2)); - if (p == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + if (finalNumOfRows > *capacity) { + char* p = taosMemoryRealloc(pColumnInfoData->varmeta.offset, sizeof(int32_t) * (numOfRow1 + numOfRow2)); + if (p == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + *capacity = finalNumOfRows; + pColumnInfoData->varmeta.offset = (int32_t*)p; } - pColumnInfoData->varmeta.offset = (int32_t*)p; for (int32_t i = 0; i < numOfRow2; ++i) { if (pSource->varmeta.offset[i] == -1) { pColumnInfoData->varmeta.offset[i + numOfRow1] = -1; @@ -253,15 +250,27 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, co memcpy(pColumnInfoData->pData + oldLen, pSource->pData, len); pColumnInfoData->varmeta.length = len + oldLen; } else { - doBitmapMerge(pColumnInfoData, numOfRow1, pSource, numOfRow2); + if (finalNumOfRows > *capacity) { + char* tmp = taosMemoryRealloc(pColumnInfoData->pData, finalNumOfRows * pColumnInfoData->info.bytes); + if (tmp == NULL) { + return TSDB_CODE_VND_OUT_OF_MEMORY; + } - int32_t newSize = (numOfRow1 + numOfRow2) * pColumnInfoData->info.bytes; - char* tmp = taosMemoryRealloc(pColumnInfoData->pData, newSize); - if (tmp == NULL) { - return TSDB_CODE_VND_OUT_OF_MEMORY; + pColumnInfoData->pData = tmp; + + if (BitmapLen(numOfRow1) < BitmapLen(finalNumOfRows)) { + char* btmp = taosMemoryRealloc(pColumnInfoData->nullbitmap, BitmapLen(finalNumOfRows)); + uint32_t extend = BitmapLen(finalNumOfRows) - BitmapLen(numOfRow1); + memset(btmp + BitmapLen(numOfRow1), 0, extend); + + pColumnInfoData->nullbitmap = btmp; + } + + *capacity = finalNumOfRows; } - pColumnInfoData->pData = tmp; + doBitmapMerge(pColumnInfoData, numOfRow1, pSource, numOfRow2); + int32_t offset = pColumnInfoData->info.bytes * numOfRow1; memcpy(pColumnInfoData->pData + offset, pSource->pData, pSource->info.bytes * numOfRow2); } @@ -350,29 +359,22 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock) { // if pIndexMap = NULL, merger one column by on column int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc, SArray* pIndexMap) { assert(pSrc != NULL && pDest != NULL); + int32_t capacity = pDest->info.capacity; - int32_t numOfCols = pDest->info.numOfCols; - for (int32_t i = 0; i < numOfCols; ++i) { + for (int32_t i = 0; i < pDest->info.numOfCols; ++i) { int32_t mapIndex = i; - if(pIndexMap) { + if (pIndexMap) { mapIndex = *(int32_t*)taosArrayGet(pIndexMap, i); } + SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i); SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, mapIndex); - uint32_t oldLen = colDataGetLength(pCol2, pDest->info.rows); - uint32_t newLen = colDataGetLength(pCol1, pSrc->info.rows); - - int32_t newSize = oldLen + newLen; - char* tmp = taosMemoryRealloc(pCol2->pData, newSize); - if (tmp != NULL) { - pCol2->pData = tmp; - colDataMergeCol(pCol2, pDest->info.rows, pCol1, pSrc->info.rows); - } else { - return TSDB_CODE_VND_OUT_OF_MEMORY; - } + capacity = pDest->info.capacity; + colDataMergeCol(pCol2, pDest->info.rows, &capacity, pCol1, pSrc->info.rows); } + pDest->info.capacity = capacity; pDest->info.rows += pSrc->info.rows; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index f55d2acf28..4fabe128a8 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1197,7 +1197,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); if (pResult->info.rows > 0 && !createNewColModel) { - colDataMergeCol(pColInfoData, pResult->info.rows, pfCtx->input.pData[0], pfCtx->input.numOfRows); + colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pfCtx->input.pData[0], pfCtx->input.numOfRows); } else { colDataAssign(pColInfoData, pfCtx->input.pData[0], pfCtx->input.numOfRows); } @@ -1225,7 +1225,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest); int32_t startOffset = createNewColModel ? 0 : pResult->info.rows; - colDataMergeCol(pResColData, startOffset, &idata, dest.numOfRows); + colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows); numOfRows = dest.numOfRows; taosArrayDestroy(pBlockList); @@ -1266,7 +1266,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc } int32_t startOffset = createNewColModel ? 0 : pResult->info.rows; - colDataMergeCol(pResColData, startOffset, &idata, dest.numOfRows); + colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows); numOfRows = dest.numOfRows; taosArrayDestroy(pBlockList); @@ -7168,7 +7168,6 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator, bool* newgroup) { SJoinOperatorInfo* pJoinInfo = pOperator->info; - // SOptrBasicInfo* pInfo = &pJoinInfo->binfo; SSDataBlock* pRes = pJoinInfo->pRes; blockDataCleanup(pRes);