fix: blockAgg

This commit is contained in:
factosea 2024-06-13 19:14:44 +08:00
parent 6da1215573
commit 19f6766c9a
6 changed files with 29 additions and 28 deletions

View File

@ -102,7 +102,7 @@ static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, u
return false; return false;
} }
if (pColAgg != NULL) { if (pColAgg != NULL && pColAgg->colId != -1) {
if (pColAgg->numOfNull == totalRows) { if (pColAgg->numOfNull == totalRows) {
ASSERT(pColumnInfoData->nullbitmap == NULL); ASSERT(pColumnInfoData->nullbitmap == NULL);
return true; return true;

View File

@ -4959,6 +4959,12 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock,
if (pResBlock->pBlockAgg == NULL) { if (pResBlock->pBlockAgg == NULL) {
size_t num = taosArrayGetSize(pResBlock->pDataBlock); size_t num = taosArrayGetSize(pResBlock->pDataBlock);
pResBlock->pBlockAgg = taosMemoryCalloc(num, sizeof(SColumnDataAgg)); pResBlock->pBlockAgg = taosMemoryCalloc(num, sizeof(SColumnDataAgg));
if (pResBlock->pBlockAgg == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for(int i = 0; i < num; ++i) {
pResBlock->pBlockAgg[i].colId = -1;
}
} }
// do fill all null column value SMA info // do fill all null column value SMA info
@ -4976,7 +4982,6 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock,
} else if (pAgg->colId < pSup->colId[j]) { } else if (pAgg->colId < pSup->colId[j]) {
i += 1; i += 1;
} else if (pSup->colId[j] < pAgg->colId) { } else if (pSup->colId[j] < pAgg->colId) {
pResBlock->pBlockAgg[pSup->slotId[j]].colId = -1;
*allHave = false; *allHave = false;
j += 1; j += 1;
} }

View File

@ -435,7 +435,7 @@ void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pB
if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) { if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
int32_t slotId = pFuncParam->pCol->slotId; int32_t slotId = pFuncParam->pCol->slotId;
pInput->pColumnDataAgg[j] = &pBlock->pBlockAgg[slotId]; pInput->pColumnDataAgg[j] = &pBlock->pBlockAgg[slotId];
if (pInput->pColumnDataAgg[j] == NULL) { if (pInput->pColumnDataAgg[j]->colId == -1) {
pInput->colDataSMAIsSet = false; pInput->colDataSMAIsSet = false;
} }

View File

@ -591,17 +591,19 @@ SSDataBlock* createBlockDataNotLoaded(const SOperatorInfo* pOperator, SSDataBloc
SColumnInfoData colInfo = {.hasNull = true, .info = pSrc->info}; SColumnInfoData colInfo = {.hasNull = true, .info = pSrc->info};
blockDataAppendColInfo(pDstBlock, &colInfo); blockDataAppendColInfo(pDstBlock, &colInfo);
pDstBlock->pBlockAgg[i].colId = -1;
SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i); SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i);
int32_t code = doEnsureCapacity(pDst, &pDstBlock->info, pDataBlock->info.rows, false); if (pDataBlock->pBlockAgg && pDataBlock->pBlockAgg[slotId].colId != -1) {
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
blockDataDestroy(pDstBlock);
return NULL;
}
colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info);
if (pDataBlock->pBlockAgg) {
pDstBlock->pBlockAgg[i] = pDataBlock->pBlockAgg[slotId]; pDstBlock->pBlockAgg[i] = pDataBlock->pBlockAgg[slotId];
} else {
int32_t code = doEnsureCapacity(pDst, &pDstBlock->info, pDataBlock->info.rows, false);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
blockDataDestroy(pDstBlock);
return NULL;
}
colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info);
} }
} }
@ -706,7 +708,6 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
} }
dataNotLoadBlock->info.id.groupId = pGroupInfo->groupId; dataNotLoadBlock->info.id.groupId = pGroupInfo->groupId;
dataNotLoadBlock->info.dataLoad = 0; dataNotLoadBlock->info.dataLoad = 0;
pInfo->binfo.pRes->info.rows = pBlock->info.rows;
taosArrayPush(pGroupInfo->blockForNotLoaded, &dataNotLoadBlock); taosArrayPush(pGroupInfo->blockForNotLoaded, &dataNotLoadBlock);
break; break;
} }

View File

@ -702,23 +702,16 @@ static void doExtractVal(SColumnInfoData* pCol, int32_t i, int32_t end, SqlFunct
} }
} }
static int32_t saveRelatedTuple(SqlFunctionCtx* pCtx, SInputColumnInfoData* pInput, int32_t index, void* tval) { static int32_t saveRelatedTupleTag(SqlFunctionCtx* pCtx, SInputColumnInfoData* pInput, void* tval) {
SColumnInfoData* pCol = pInput->pData[0]; SColumnInfoData* pCol = pInput->pData[0];
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SMinmaxResInfo* pBuf = GET_ROWCELL_INTERBUF(pResInfo); SMinmaxResInfo* pBuf = GET_ROWCELL_INTERBUF(pResInfo);
int32_t code = 0; int32_t code = TSDB_CODE_SUCCESS;
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); code = saveTupleData(pCtx, 0, pCtx->pSrcBlock, &pBuf->tuplePos);
if (index >= 0) {
code = saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
} }
return code; return code;
} }
@ -758,7 +751,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems)
pBuf->v = GET_INT64_VAL(tval); pBuf->v = GET_INT64_VAL(tval);
} }
code = saveRelatedTuple(pCtx, pInput, index, tval); code = saveRelatedTupleTag(pCtx, pInput, tval);
} else { } else {
if (IS_SIGNED_NUMERIC_TYPE(type)) { if (IS_SIGNED_NUMERIC_TYPE(type)) {
int64_t prev = 0; int64_t prev = 0;
@ -767,7 +760,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems)
int64_t val = GET_INT64_VAL(tval); int64_t val = GET_INT64_VAL(tval);
if ((prev < val) ^ isMinFunc) { if ((prev < val) ^ isMinFunc) {
GET_INT64_VAL(&pBuf->v) = val; GET_INT64_VAL(&pBuf->v) = val;
code = saveRelatedTuple(pCtx, pInput, index, tval); code = saveRelatedTupleTag(pCtx, pInput, tval);
} }
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
uint64_t prev = 0; uint64_t prev = 0;
@ -776,7 +769,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems)
uint64_t val = GET_UINT64_VAL(tval); uint64_t val = GET_UINT64_VAL(tval);
if ((prev < val) ^ isMinFunc) { if ((prev < val) ^ isMinFunc) {
GET_UINT64_VAL(&pBuf->v) = val; GET_UINT64_VAL(&pBuf->v) = val;
code = saveRelatedTuple(pCtx, pInput, index, tval); code = saveRelatedTupleTag(pCtx, pInput, tval);
} }
} else if (type == TSDB_DATA_TYPE_DOUBLE) { } else if (type == TSDB_DATA_TYPE_DOUBLE) {
double prev = 0; double prev = 0;
@ -785,7 +778,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems)
double val = GET_DOUBLE_VAL(tval); double val = GET_DOUBLE_VAL(tval);
if ((prev < val) ^ isMinFunc) { if ((prev < val) ^ isMinFunc) {
GET_DOUBLE_VAL(&pBuf->v) = val; GET_DOUBLE_VAL(&pBuf->v) = val;
code = saveRelatedTuple(pCtx, pInput, index, tval); code = saveRelatedTupleTag(pCtx, pInput, tval);
} }
} else if (type == TSDB_DATA_TYPE_FLOAT) { } else if (type == TSDB_DATA_TYPE_FLOAT) {
float prev = 0; float prev = 0;
@ -794,7 +787,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems)
float val = GET_DOUBLE_VAL(tval); float val = GET_DOUBLE_VAL(tval);
if ((prev < val) ^ isMinFunc) { if ((prev < val) ^ isMinFunc) {
GET_FLOAT_VAL(&pBuf->v) = val; GET_FLOAT_VAL(&pBuf->v) = val;
code = saveRelatedTuple(pCtx, pInput, index, tval); code = saveRelatedTupleTag(pCtx, pInput, tval);
} }
} }
} }

View File

@ -657,8 +657,10 @@ if __name__ == "__main__":
conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath()) conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath())
if fileName == "all": if fileName == "all":
tdLog.info("Procedures for testing runAllLinux")
tdCases.runAllLinux(conn) tdCases.runAllLinux(conn)
else: else:
tdLog.info(f"Procedures for testing runOneLinux {fileName}")
tdCases.runOneLinux(conn, fileName, replicaVar) tdCases.runOneLinux(conn, fileName, replicaVar)
# do restart option # do restart option