fix(query): update the capacity for ssdatablock when merge new blocks.
This commit is contained in:
parent
0ebd90b549
commit
498cbc62fc
|
@ -70,7 +70,7 @@ typedef struct SDataBlockInfo {
|
||||||
uint64_t groupId; // no need to serialize
|
uint64_t groupId; // no need to serialize
|
||||||
int16_t numOfCols;
|
int16_t numOfCols;
|
||||||
int16_t hasVarCol;
|
int16_t hasVarCol;
|
||||||
int16_t capacity;
|
int32_t capacity;
|
||||||
} SDataBlockInfo;
|
} SDataBlockInfo;
|
||||||
|
|
||||||
typedef struct SSDataBlock {
|
typedef struct SSDataBlock {
|
||||||
|
|
|
@ -183,7 +183,7 @@ static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, u
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
|
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
|
||||||
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource,
|
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity, const SColumnInfoData* pSource,
|
||||||
uint32_t numOfRow2);
|
uint32_t numOfRow2);
|
||||||
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows);
|
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows);
|
||||||
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock);
|
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock);
|
||||||
|
|
|
@ -168,13 +168,6 @@ static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, c
|
||||||
|
|
||||||
uint32_t total = numOfRow1 + numOfRow2;
|
uint32_t total = numOfRow1 + numOfRow2;
|
||||||
|
|
||||||
if (BitmapLen(numOfRow1) < BitmapLen(total)) {
|
|
||||||
char* tmp = taosMemoryRealloc(pColumnInfoData->nullbitmap, BitmapLen(total));
|
|
||||||
uint32_t extend = BitmapLen(total) - BitmapLen(numOfRow1);
|
|
||||||
memset(tmp + BitmapLen(numOfRow1), 0, extend);
|
|
||||||
pColumnInfoData->nullbitmap = tmp;
|
|
||||||
}
|
|
||||||
|
|
||||||
uint32_t remindBits = BitPos(numOfRow1);
|
uint32_t remindBits = BitPos(numOfRow1);
|
||||||
uint32_t shiftBits = 8 - remindBits;
|
uint32_t shiftBits = 8 - remindBits;
|
||||||
|
|
||||||
|
@ -209,10 +202,9 @@ static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, c
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource,
|
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity,
|
||||||
uint32_t numOfRow2) {
|
const SColumnInfoData* pSource, uint32_t numOfRow2) {
|
||||||
ASSERT(pColumnInfoData != NULL && pSource != NULL && pColumnInfoData->info.type == pSource->info.type);
|
ASSERT(pColumnInfoData != NULL && pSource != NULL && pColumnInfoData->info.type == pSource->info.type);
|
||||||
|
|
||||||
if (numOfRow2 == 0) {
|
if (numOfRow2 == 0) {
|
||||||
return numOfRow1;
|
return numOfRow1;
|
||||||
}
|
}
|
||||||
|
@ -221,14 +213,19 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, co
|
||||||
pColumnInfoData->hasNull = pSource->hasNull;
|
pColumnInfoData->hasNull = pSource->hasNull;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint32_t finalNumOfRows = numOfRow1 + numOfRow2;
|
||||||
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
||||||
// Handle the bitmap
|
// Handle the bitmap
|
||||||
char* p = taosMemoryRealloc(pColumnInfoData->varmeta.offset, sizeof(int32_t) * (numOfRow1 + numOfRow2));
|
if (finalNumOfRows > *capacity) {
|
||||||
if (p == NULL) {
|
char* p = taosMemoryRealloc(pColumnInfoData->varmeta.offset, sizeof(int32_t) * (numOfRow1 + numOfRow2));
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
if (p == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
*capacity = finalNumOfRows;
|
||||||
|
pColumnInfoData->varmeta.offset = (int32_t*)p;
|
||||||
}
|
}
|
||||||
|
|
||||||
pColumnInfoData->varmeta.offset = (int32_t*)p;
|
|
||||||
for (int32_t i = 0; i < numOfRow2; ++i) {
|
for (int32_t i = 0; i < numOfRow2; ++i) {
|
||||||
if (pSource->varmeta.offset[i] == -1) {
|
if (pSource->varmeta.offset[i] == -1) {
|
||||||
pColumnInfoData->varmeta.offset[i + numOfRow1] = -1;
|
pColumnInfoData->varmeta.offset[i + numOfRow1] = -1;
|
||||||
|
@ -253,15 +250,27 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, co
|
||||||
memcpy(pColumnInfoData->pData + oldLen, pSource->pData, len);
|
memcpy(pColumnInfoData->pData + oldLen, pSource->pData, len);
|
||||||
pColumnInfoData->varmeta.length = len + oldLen;
|
pColumnInfoData->varmeta.length = len + oldLen;
|
||||||
} else {
|
} else {
|
||||||
doBitmapMerge(pColumnInfoData, numOfRow1, pSource, numOfRow2);
|
if (finalNumOfRows > *capacity) {
|
||||||
|
char* tmp = taosMemoryRealloc(pColumnInfoData->pData, finalNumOfRows);
|
||||||
|
if (tmp == NULL) {
|
||||||
|
return TSDB_CODE_VND_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t newSize = (numOfRow1 + numOfRow2) * pColumnInfoData->info.bytes;
|
pColumnInfoData->pData = tmp;
|
||||||
char* tmp = taosMemoryRealloc(pColumnInfoData->pData, newSize);
|
|
||||||
if (tmp == NULL) {
|
if (BitmapLen(numOfRow1) < BitmapLen(finalNumOfRows)) {
|
||||||
return TSDB_CODE_VND_OUT_OF_MEMORY;
|
char* btmp = taosMemoryRealloc(pColumnInfoData->nullbitmap, BitmapLen(finalNumOfRows));
|
||||||
|
uint32_t extend = BitmapLen(finalNumOfRows) - BitmapLen(numOfRow1);
|
||||||
|
memset(btmp + BitmapLen(numOfRow1), 0, extend);
|
||||||
|
|
||||||
|
pColumnInfoData->nullbitmap = btmp;
|
||||||
|
}
|
||||||
|
|
||||||
|
*capacity = finalNumOfRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
pColumnInfoData->pData = tmp;
|
doBitmapMerge(pColumnInfoData, numOfRow1, pSource, numOfRow2);
|
||||||
|
|
||||||
int32_t offset = pColumnInfoData->info.bytes * numOfRow1;
|
int32_t offset = pColumnInfoData->info.bytes * numOfRow1;
|
||||||
memcpy(pColumnInfoData->pData + offset, pSource->pData, pSource->info.bytes * numOfRow2);
|
memcpy(pColumnInfoData->pData + offset, pSource->pData, pSource->info.bytes * numOfRow2);
|
||||||
}
|
}
|
||||||
|
@ -357,6 +366,7 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc, SArray* pInd
|
||||||
if(pIndexMap) {
|
if(pIndexMap) {
|
||||||
mapIndex = *(int32_t*)taosArrayGet(pIndexMap, i);
|
mapIndex = *(int32_t*)taosArrayGet(pIndexMap, i);
|
||||||
}
|
}
|
||||||
|
|
||||||
SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
|
SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
|
||||||
SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, mapIndex);
|
SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, mapIndex);
|
||||||
|
|
||||||
|
@ -367,7 +377,7 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc, SArray* pInd
|
||||||
char* tmp = taosMemoryRealloc(pCol2->pData, newSize);
|
char* tmp = taosMemoryRealloc(pCol2->pData, newSize);
|
||||||
if (tmp != NULL) {
|
if (tmp != NULL) {
|
||||||
pCol2->pData = tmp;
|
pCol2->pData = tmp;
|
||||||
colDataMergeCol(pCol2, pDest->info.rows, pCol1, pSrc->info.rows);
|
colDataMergeCol(pCol2, pDest->info.rows, &pDest->info.capacity, pCol1, pSrc->info.rows);
|
||||||
} else {
|
} else {
|
||||||
return TSDB_CODE_VND_OUT_OF_MEMORY;
|
return TSDB_CODE_VND_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1199,7 +1199,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
||||||
if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query
|
if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
||||||
if (pResult->info.rows > 0 && !createNewColModel) {
|
if (pResult->info.rows > 0 && !createNewColModel) {
|
||||||
colDataMergeCol(pColInfoData, pResult->info.rows, pfCtx->input.pData[0], pfCtx->input.numOfRows);
|
colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pfCtx->input.pData[0], pfCtx->input.numOfRows);
|
||||||
} else {
|
} else {
|
||||||
colDataAssign(pColInfoData, pfCtx->input.pData[0], pfCtx->input.numOfRows);
|
colDataAssign(pColInfoData, pfCtx->input.pData[0], pfCtx->input.numOfRows);
|
||||||
}
|
}
|
||||||
|
@ -1227,7 +1227,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
||||||
scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest);
|
scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest);
|
||||||
|
|
||||||
int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
|
int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
|
||||||
colDataMergeCol(pResColData, startOffset, &idata, dest.numOfRows);
|
colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows);
|
||||||
|
|
||||||
numOfRows = dest.numOfRows;
|
numOfRows = dest.numOfRows;
|
||||||
taosArrayDestroy(pBlockList);
|
taosArrayDestroy(pBlockList);
|
||||||
|
@ -1268,7 +1268,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
|
int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
|
||||||
colDataMergeCol(pResColData, startOffset, &idata, dest.numOfRows);
|
colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows);
|
||||||
|
|
||||||
numOfRows = dest.numOfRows;
|
numOfRows = dest.numOfRows;
|
||||||
taosArrayDestroy(pBlockList);
|
taosArrayDestroy(pBlockList);
|
||||||
|
@ -7170,7 +7170,6 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo
|
||||||
|
|
||||||
static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator, bool* newgroup) {
|
static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
SJoinOperatorInfo* pJoinInfo = pOperator->info;
|
SJoinOperatorInfo* pJoinInfo = pOperator->info;
|
||||||
// SOptrBasicInfo* pInfo = &pJoinInfo->binfo;
|
|
||||||
|
|
||||||
SSDataBlock* pRes = pJoinInfo->pRes;
|
SSDataBlock* pRes = pJoinInfo->pRes;
|
||||||
blockDataCleanup(pRes);
|
blockDataCleanup(pRes);
|
||||||
|
|
Loading…
Reference in New Issue