This commit is contained in:
factosea 2024-06-04 11:39:47 +08:00
parent a6217eec03
commit c42e627a41
12 changed files with 84 additions and 27 deletions

View File

@ -239,7 +239,7 @@ typedef struct SDataBlockInfo {
} SDataBlockInfo; } SDataBlockInfo;
typedef struct SSDataBlock { typedef struct SSDataBlock {
SColumnDataAgg** pBlockAgg; SColumnDataAgg* pBlockAgg;
SArray* pDataBlock; // SArray<SColumnInfoData> SArray* pDataBlock; // SArray<SColumnInfoData>
SDataBlockInfo info; SDataBlockInfo info;
} SSDataBlock; } SSDataBlock;

View File

@ -256,6 +256,7 @@ SSDataBlock* createDataBlock();
void* blockDataDestroy(SSDataBlock* pBlock); void* blockDataDestroy(SSDataBlock* pBlock);
void blockDataFreeRes(SSDataBlock* pBlock); void blockDataFreeRes(SSDataBlock* pBlock);
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData); SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
SSDataBlock* createBlockDataNotLoaded(SSDataBlock* pDataBlock);
SSDataBlock* createSpecialDataBlock(EStreamType type); SSDataBlock* createSpecialDataBlock(EStreamType type);
SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx); SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx);

View File

@ -58,7 +58,7 @@ extern int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict
extern int32_t filterConverNcharColumns(SFilterInfo *pFilterInfo, int32_t rows, bool *gotNchar); extern int32_t filterConverNcharColumns(SFilterInfo *pFilterInfo, int32_t rows, bool *gotNchar);
extern int32_t filterFreeNcharColumns(SFilterInfo *pFilterInfo); extern int32_t filterFreeNcharColumns(SFilterInfo *pFilterInfo);
extern void filterFreeInfo(SFilterInfo *info); extern void filterFreeInfo(SFilterInfo *info);
extern bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg **pColsAgg, int32_t numOfCols, int32_t numOfRows); extern bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pColsAgg, int32_t numOfCols, int32_t numOfRows);
/* condition split interface */ /* condition split interface */
int32_t filterPartitionCond(SNode **pCondition, SNode **pPrimaryKeyCond, SNode **pTagIndexCond, SNode **pTagCond, int32_t filterPartitionCond(SNode **pCondition, SNode **pPrimaryKeyCond, SNode **pTagIndexCond, SNode **pTagCond,

View File

@ -848,7 +848,7 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
if (pBlock->pBlockAgg == NULL) { if (pBlock->pBlockAgg == NULL) {
isNull = colDataIsNull_s(pColData, j); isNull = colDataIsNull_s(pColData, j);
} else { } else {
isNull = colDataIsNull(pColData, pBlock->info.rows, j, pBlock->pBlockAgg[i]); isNull = colDataIsNull(pColData, pBlock->info.rows, j, &pBlock->pBlockAgg[i]);
} }
if (isNull) { if (isNull) {
@ -1733,6 +1733,62 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
return pDstBlock; return pDstBlock;
} }
SSDataBlock* createBlockDataNotLoaded(SSDataBlock* pDataBlock) {
if (pDataBlock == NULL) {
return NULL;
}
SSDataBlock* pDstBlock = createDataBlock();
pDstBlock->info = pDataBlock->info;
pDstBlock->info.rows = 0;
pDstBlock->info.capacity = 0;
pDstBlock->info.rowSize = 0;
pDstBlock->info.id = pDataBlock->info.id;
pDstBlock->info.blankFill = pDataBlock->info.blankFill;
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
SColumnInfoData colInfo = {.hasNull = true, .info = p->info};
blockDataAppendColInfo(pDstBlock, &colInfo);
}
copyPkVal(&pDstBlock->info, &pDataBlock->info);
int32_t code = blockDataEnsureCapacity(pDstBlock, pDataBlock->info.rows);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
blockDataDestroy(pDstBlock);
return NULL;
}
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i);
SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);
colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info);
}
pDstBlock->info.rows = pDataBlock->info.rows;
pDstBlock->info.capacity = pDataBlock->info.rows;
pDstBlock->pBlockAgg = pDataBlock->pBlockAgg;
pDataBlock->pBlockAgg = NULL;
// int numOfSlots = sizeof(pDataBlock->pBlockAgg)/POINTER_BYTES;
// if (pDataBlock->pBlockAgg != NULL) {
// pDstBlock->pBlockAgg = taosMemoryCalloc(numOfSlots, POINTER_BYTES);
// if (pDstBlock->pBlockAgg == NULL) {
// terrno = TSDB_CODE_OUT_OF_MEMORY;
// return NULL;
// }
// for (int j = 0; j < numOfSlots; ++j) {
// pDstBlock->pBlockAgg[j] = &(*pDataBlock->pBlockAgg)[j];
// }
// }
return pDstBlock;
}
SSDataBlock* createDataBlock() { SSDataBlock* createDataBlock() {
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (pBlock == NULL) { if (pBlock == NULL) {

View File

@ -4903,7 +4903,7 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_
} }
int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave, bool* hasNullSMA) { int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave, bool* hasNullSMA) {
SColumnDataAgg*** pBlockSMA = &pDataBlock->pBlockAgg; SColumnDataAgg** pBlockSMA = &pDataBlock->pBlockAgg;
int32_t code = 0; int32_t code = 0;
*allHave = false; *allHave = false;
@ -4958,7 +4958,7 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock,
if (pResBlock->pBlockAgg == NULL) { if (pResBlock->pBlockAgg == NULL) {
size_t num = taosArrayGetSize(pResBlock->pDataBlock); size_t num = taosArrayGetSize(pResBlock->pDataBlock);
pResBlock->pBlockAgg = taosMemoryCalloc(num, POINTER_BYTES); pResBlock->pBlockAgg = taosMemoryCalloc(num, sizeof(SColumnDataAgg));
} }
// do fill all null column value SMA info // do fill all null column value SMA info
@ -4970,13 +4970,13 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock,
while (j < numOfCols && i < size) { while (j < numOfCols && i < size) {
SColumnDataAgg* pAgg = &pSup->colAggArray.data[i]; SColumnDataAgg* pAgg = &pSup->colAggArray.data[i];
if (pAgg->colId == pSup->colId[j]) { if (pAgg->colId == pSup->colId[j]) {
pResBlock->pBlockAgg[pSup->slotId[j]] = pAgg; pResBlock->pBlockAgg[pSup->slotId[j]] = *pAgg;
i += 1; i += 1;
j += 1; j += 1;
} else if (pAgg->colId < pSup->colId[j]) { } else if (pAgg->colId < pSup->colId[j]) {
i += 1; i += 1;
} else if (pSup->colId[j] < pAgg->colId) { } else if (pSup->colId[j] < pAgg->colId) {
pResBlock->pBlockAgg[pSup->slotId[j]] = NULL; pResBlock->pBlockAgg[pSup->slotId[j]].colId = -1;
*allHave = false; *allHave = false;
j += 1; j += 1;
} }

View File

@ -2354,7 +2354,7 @@ int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t ol
for (int32_t i = 0; i < pSortGroupCols->size; ++i) { for (int32_t i = 0; i < pSortGroupCols->size; ++i) {
const SColumn* pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i); const SColumn* pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId); const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
if (pBlock->pBlockAgg) pColAgg = pBlock->pBlockAgg[pCol->slotId]; if (pBlock->pBlockAgg) pColAgg = &pBlock->pBlockAgg[pCol->slotId];
if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) { if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
if (isNull[i] != 1) return 1; if (isNull[i] != 1) return 1;
@ -2389,7 +2389,7 @@ int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock*
const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId); const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
if (pCol->slotId > pBlock->pDataBlock->size) continue; if (pCol->slotId > pBlock->pDataBlock->size) continue;
if (pBlock->pBlockAgg) pColAgg = pBlock->pBlockAgg[pCol->slotId]; if (pBlock->pBlockAgg) pColAgg = &pBlock->pBlockAgg[pCol->slotId];
if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) { if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
isNull[i] = 1; isNull[i] = 1;

View File

@ -434,7 +434,7 @@ void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pB
if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) { if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
int32_t slotId = pFuncParam->pCol->slotId; int32_t slotId = pFuncParam->pCol->slotId;
pInput->pColumnDataAgg[j] = pBlock->pBlockAgg[slotId]; pInput->pColumnDataAgg[j] = &pBlock->pBlockAgg[slotId];
if (pInput->pColumnDataAgg[j] == NULL) { if (pInput->pColumnDataAgg[j] == NULL) {
pInput->colDataSMAIsSet = false; pInput->colDataSMAIsSet = false;
} }

View File

@ -133,7 +133,7 @@ static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlo
SColumn* pCol = taosArrayGet(pGroupCols, i); SColumn* pCol = taosArrayGet(pGroupCols, i);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
if (pBlock->pBlockAgg != NULL) { if (pBlock->pBlockAgg != NULL) {
pColAgg = pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched?
} }
bool isNull = colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg); bool isNull = colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg);
@ -189,7 +189,7 @@ static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSData
} }
if (pBlock->pBlockAgg != NULL) { if (pBlock->pBlockAgg != NULL) {
pColAgg = pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched?
} }
SGroupKeys* pkey = taosArrayGet(pGroupColVals, i); SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
@ -653,7 +653,7 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
setBufPageDirty(pPage, true); setBufPageDirty(pPage, true);
releaseBufPage(pInfo->pBuf, pPage); releaseBufPage(pInfo->pBuf, pPage);
} else { } else {
SSDataBlock* dataNotLoadBlock = createOneDataBlock(pBlock, true); SSDataBlock* dataNotLoadBlock = createBlockDataNotLoaded(pBlock);
if (dataNotLoadBlock == NULL) { if (dataNotLoadBlock == NULL) {
T_LONG_JMP(pTaskInfo->env, terrno); T_LONG_JMP(pTaskInfo->env, terrno);
} }

View File

@ -220,7 +220,7 @@ static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo*
return code; return code;
} }
static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg** pColsAgg, int32_t numOfCols, static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg* pColsAgg, int32_t numOfCols,
int32_t numOfRows) { int32_t numOfRows) {
if (pColsAgg == NULL || pFilterInfo == NULL) { if (pColsAgg == NULL || pFilterInfo == NULL) {
return true; return true;

View File

@ -911,7 +911,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
struct SColumnDataAgg* pAgg = NULL; struct SColumnDataAgg* pAgg = NULL;
for (int32_t j = 0; j < pBlock->info.rows; ++j) { for (int32_t j = 0; j < pBlock->info.rows; ++j) {
pAgg = (pBlock->pBlockAgg != NULL) ? pBlock->pBlockAgg[pInfo->stateCol.slotId] : NULL; pAgg = (pBlock->pBlockAgg != NULL) ? &pBlock->pBlockAgg[pInfo->stateCol.slotId] : NULL;
if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) { if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) {
continue; continue;
} }

View File

@ -651,7 +651,7 @@ int32_t tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock,
leftNull = colDataIsNull_t(pLeftColInfoData, leftRowIndex, isVarType); leftNull = colDataIsNull_t(pLeftColInfoData, leftRowIndex, isVarType);
} else { } else {
leftNull = leftNull =
colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, leftRowIndex, pLeftBlock->pBlockAgg[pOrder->slotId]); colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, leftRowIndex, &pLeftBlock->pBlockAgg[pOrder->slotId]);
} }
} }
@ -661,7 +661,7 @@ int32_t tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock,
rightNull = colDataIsNull_t(pRightColInfoData, rightRowIndex, isVarType); rightNull = colDataIsNull_t(pRightColInfoData, rightRowIndex, isVarType);
} else { } else {
rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, rightRowIndex, rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, rightRowIndex,
pRightBlock->pBlockAgg[pOrder->slotId]); &pRightBlock->pBlockAgg[pOrder->slotId]);
} }
} }
@ -742,7 +742,7 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) {
leftNull = colDataIsNull_t(pLeftColInfoData, pLeftSource->src.rowIndex, isVarType); leftNull = colDataIsNull_t(pLeftColInfoData, pLeftSource->src.rowIndex, isVarType);
} else { } else {
leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex,
pLeftBlock->pBlockAgg[i]); &pLeftBlock->pBlockAgg[i]);
} }
} }
@ -752,7 +752,7 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) {
rightNull = colDataIsNull_t(pRightColInfoData, pRightSource->src.rowIndex, isVarType); rightNull = colDataIsNull_t(pRightColInfoData, pRightSource->src.rowIndex, isVarType);
} else { } else {
rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex,
pRightBlock->pBlockAgg[i]); &pRightBlock->pBlockAgg[i]);
} }
} }

View File

@ -3782,7 +3782,7 @@ int32_t fltSclBuildRangeFromBlockSma(SFltSclColumnRange *colRange, SColumnDataAg
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg **pDataStatis, int32_t numOfCols, int32_t numOfRows) { bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t numOfCols, int32_t numOfRows) {
if (info->scalarMode) { if (info->scalarMode) {
SArray *colRanges = info->sclCtx.fltSclRange; SArray *colRanges = info->sclCtx.fltSclRange;
for (int32_t i = 0; i < taosArrayGetSize(colRanges); ++i) { for (int32_t i = 0; i < taosArrayGetSize(colRanges); ++i) {
@ -3790,13 +3790,13 @@ bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg **pDataStatis, int32_t
bool foundCol = false; bool foundCol = false;
int32_t j = 0; int32_t j = 0;
for (; j < numOfCols; ++j) { for (; j < numOfCols; ++j) {
if (pDataStatis[j] != NULL && pDataStatis[j]->colId == colRange->colNode->colId) { if (pDataStatis[j].colId == colRange->colNode->colId) {
foundCol = true; foundCol = true;
break; break;
} }
} }
if (foundCol) { if (foundCol) {
SColumnDataAgg *pAgg = pDataStatis[j]; SColumnDataAgg *pAgg = &pDataStatis[j];
SArray *points = taosArrayInit(2, sizeof(SFltSclPoint)); SArray *points = taosArrayInit(2, sizeof(SFltSclPoint));
fltSclBuildRangeFromBlockSma(colRange, pAgg, numOfRows, points); fltSclBuildRangeFromBlockSma(colRange, pAgg, numOfRows, points);
qDebug("column data agg: nulls %d, rows %d, max %" PRId64 " min %" PRId64, pAgg->numOfNull, numOfRows, qDebug("column data agg: nulls %d, rows %d, max %" PRId64 " min %" PRId64, pAgg->numOfNull, numOfRows,
@ -3833,7 +3833,7 @@ bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg **pDataStatis, int32_t
int32_t index = -1; int32_t index = -1;
SFilterRangeCtx *ctx = info->colRange[k]; SFilterRangeCtx *ctx = info->colRange[k];
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
if (pDataStatis[i] != NULL && pDataStatis[i]->colId == ctx->colId) { if (pDataStatis[i].colId == ctx->colId) {
index = i; index = i;
break; break;
} }
@ -3849,13 +3849,13 @@ bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg **pDataStatis, int32_t
break; break;
} }
if (pDataStatis[index]->numOfNull <= 0) { if (pDataStatis[index].numOfNull <= 0) {
if (ctx->isnull && !ctx->notnull && !ctx->isrange) { if (ctx->isnull && !ctx->notnull && !ctx->isrange) {
ret = false; ret = false;
break; break;
} }
} else if (pDataStatis[index]->numOfNull > 0) { } else if (pDataStatis[index].numOfNull > 0) {
if (pDataStatis[index]->numOfNull == numOfRows) { if (pDataStatis[index].numOfNull == numOfRows) {
if ((ctx->notnull || ctx->isrange) && (!ctx->isnull)) { if ((ctx->notnull || ctx->isrange) && (!ctx->isnull)) {
ret = false; ret = false;
break; break;
@ -3869,7 +3869,7 @@ bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg **pDataStatis, int32_t
} }
} }
SColumnDataAgg *pDataBlockst = pDataStatis[index]; SColumnDataAgg *pDataBlockst = &pDataStatis[index];
SFilterRangeNode *r = ctx->rs; SFilterRangeNode *r = ctx->rs;
float minv = 0; float minv = 0;