From a6217eec0323abdffb33de188e532cf8f3d7605a Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Fri, 31 May 2024 18:58:35 +0800 Subject: [PATCH 01/11] partition interval and limimt, dataload error --- source/libs/executor/src/groupoperator.c | 148 ++++++++++++++--------- source/libs/executor/src/scanoperator.c | 1 + 2 files changed, 94 insertions(+), 55 deletions(-) diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 9a31e993b2..c6a7804b9c 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -53,6 +53,8 @@ typedef struct SPartitionOperatorInfo { int32_t rowCapacity; // maximum number of rows for each buffer page int32_t* columnOffset; // start position for each column data SArray* sortedGroupArray; // SDataGroupInfo sorted by group id + SArray* blockForNotLoaded; // SSDataBlock that data is not loaded + int32_t offsetForNotLoaded;// read offset for SSDataBlock that data is not loaded int32_t groupIndex; // group index int32_t pageIndex; // page index of current group SExprSupp scalarSup; @@ -584,71 +586,86 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { pGroupInfo->groupId = calcGroupId(pInfo->keyBuf, len); } - // number of rows - int32_t* rows = (int32_t*)pPage; + if (pBlock->info.dataLoad) { + // number of rows + int32_t* rows = (int32_t*)pPage; - size_t numOfCols = pOperator->exprSupp.numOfExprs; - for (int32_t i = 0; i < numOfCols; ++i) { - SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i]; - int32_t slotId = pExpr->base.pParam[0].pCol->slotId; + size_t numOfCols = pOperator->exprSupp.numOfExprs; + for (int32_t i = 0; i < numOfCols; ++i) { + SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i]; + int32_t slotId = pExpr->base.pParam[0].pCol->slotId; - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); - int32_t bytes = pColInfoData->info.bytes; - int32_t startOffset = pInfo->columnOffset[i]; + int32_t bytes = pColInfoData->info.bytes; + int32_t startOffset = pInfo->columnOffset[i]; - int32_t* columnLen = NULL; - int32_t contentLen = 0; + int32_t* columnLen = NULL; + int32_t contentLen = 0; - if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { - int32_t* offset = (int32_t*)((char*)pPage + startOffset); - columnLen = (int32_t*)((char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity); - char* data = (char*)((char*)columnLen + sizeof(int32_t)); + if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { + int32_t* offset = (int32_t*)((char*)pPage + startOffset); + columnLen = (int32_t*)((char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity); + char* data = (char*)((char*)columnLen + sizeof(int32_t)); - if (colDataIsNull_s(pColInfoData, j)) { - offset[(*rows)] = -1; - contentLen = 0; - } else if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) { - offset[*rows] = (*columnLen); - char* src = colDataGetData(pColInfoData, j); - int32_t dataLen = getJsonValueLen(src); + if (colDataIsNull_s(pColInfoData, j)) { + offset[(*rows)] = -1; + contentLen = 0; + } else if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) { + offset[*rows] = (*columnLen); + char* src = colDataGetData(pColInfoData, j); + int32_t dataLen = getJsonValueLen(src); - memcpy(data + (*columnLen), src, dataLen); - int32_t v = (data + (*columnLen) + dataLen - (char*)pPage); - ASSERT(v > 0); + memcpy(data + (*columnLen), src, dataLen); + int32_t v = (data + (*columnLen) + dataLen - (char*)pPage); + ASSERT(v > 0); - contentLen = dataLen; + contentLen = dataLen; + } else { + offset[*rows] = (*columnLen); + char* src = colDataGetData(pColInfoData, j); + memcpy(data + (*columnLen), src, varDataTLen(src)); + int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage); + ASSERT(v > 0); + + contentLen = varDataTLen(src); + } } else { - offset[*rows] = (*columnLen); - char* src = colDataGetData(pColInfoData, j); - memcpy(data + (*columnLen), src, varDataTLen(src)); - int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage); - ASSERT(v > 0); + char* bitmap = (char*)pPage + startOffset; + columnLen = (int32_t*)((char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity)); + char* data = (char*)columnLen + sizeof(int32_t); - contentLen = varDataTLen(src); + bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j); + if (isNull) { + colDataSetNull_f(bitmap, (*rows)); + } else { + memcpy(data + (*columnLen), colDataGetData(pColInfoData, j), bytes); + ASSERT((data + (*columnLen) + bytes - (char*)pPage) <= getBufPageSize(pInfo->pBuf)); + } + contentLen = bytes; } - } else { - char* bitmap = (char*)pPage + startOffset; - columnLen = (int32_t*)((char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity)); - char* data = (char*)columnLen + sizeof(int32_t); - bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j); - if (isNull) { - colDataSetNull_f(bitmap, (*rows)); - } else { - memcpy(data + (*columnLen), colDataGetData(pColInfoData, j), bytes); - ASSERT((data + (*columnLen) + bytes - (char*)pPage) <= getBufPageSize(pInfo->pBuf)); - } - contentLen = bytes; + (*columnLen) += contentLen; } - (*columnLen) += contentLen; + (*rows) += 1; + + setBufPageDirty(pPage, true); + releaseBufPage(pInfo->pBuf, pPage); + } else { + SSDataBlock* dataNotLoadBlock = createOneDataBlock(pBlock, true); + if (dataNotLoadBlock == NULL) { + T_LONG_JMP(pTaskInfo->env, terrno); + } + if (pInfo->blockForNotLoaded == NULL) { + pInfo->blockForNotLoaded = taosArrayInit(1, sizeof(SSDataBlock)); + pInfo->offsetForNotLoaded = 0; + } + dataNotLoadBlock->info.id.groupId = pGroupInfo->groupId; + dataNotLoadBlock->info.dataLoad = 0; + taosArrayInsert(pInfo->blockForNotLoaded, pInfo->blockForNotLoaded->size, &dataNotLoadBlock); + break; } - - (*rows) += 1; - - setBufPageDirty(pPage, true); - releaseBufPage(pInfo->pBuf, pPage); } } @@ -734,6 +751,14 @@ static void clearPartitionOperator(SPartitionOperatorInfo* pInfo) { } taosArrayClear(pInfo->sortedGroupArray); clearDiskbasedBuf(pInfo->pBuf); + if (pInfo->blockForNotLoaded) { + for (int32_t i = 0; i < pInfo->blockForNotLoaded->size; i++) { + SSDataBlock** pBlock = taosArrayGet(pInfo->blockForNotLoaded, i); + blockDataDestroy(*pBlock); + } + taosArrayClear(pInfo->blockForNotLoaded); + pInfo->offsetForNotLoaded = 0; + } } static int compareDataGroupInfo(const void* group1, const void* group2) { @@ -747,6 +772,21 @@ static int compareDataGroupInfo(const void* group1, const void* group2) { return (pGroupInfo1->groupId < pGroupInfo2->groupId) ? -1 : 1; } +static SSDataBlock* buildPartitionResultForNotLoadBlock(SOperatorInfo* pOperator) { + SPartitionOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + if (pInfo->blockForNotLoaded && pInfo->offsetForNotLoaded < pInfo->blockForNotLoaded->size) { + SSDataBlock** pBlock = taosArrayGet(pInfo->blockForNotLoaded, pInfo->offsetForNotLoaded); + pInfo->offsetForNotLoaded++; + return *pBlock; + } else { + setOperatorCompleted(pOperator); + clearPartitionOperator(pInfo); + return NULL; + } +} + static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { SPartitionOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -757,12 +797,10 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { (pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL; if (pInfo->groupIndex == -1 || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) { // try next group data - ++pInfo->groupIndex; - if (pInfo->groupIndex >= taosArrayGetSize(pInfo->sortedGroupArray)) { - setOperatorCompleted(pOperator); - clearPartitionOperator(pInfo); - return NULL; + if (pInfo->groupIndex + 1 >= taosArrayGetSize(pInfo->sortedGroupArray)) { + return buildPartitionResultForNotLoadBlock(pOperator); } + ++pInfo->groupIndex; pGroupInfo = taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex); pInfo->pageIndex = 0; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 110aabf9b1..fb86e73a64 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -769,6 +769,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { SSDataBlock* pBlock = pTableScanInfo->pResBlock; bool hasNext = false; int32_t code = TSDB_CODE_SUCCESS; + pBlock->info.dataLoad = false; int64_t st = taosGetTimestampUs(); From c42e627a41eda9e2d79052ed13010c58408409cb Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Tue, 4 Jun 2024 11:39:47 +0800 Subject: [PATCH 02/11] test --- include/common/tcommon.h | 2 +- include/common/tdatablock.h | 1 + include/libs/scalar/filter.h | 2 +- source/common/src/tdatablock.c | 58 ++++++++++++++++++- source/dnode/vnode/src/tsdb/tsdbRead2.c | 8 +-- source/libs/executor/src/executil.c | 4 +- source/libs/executor/src/executorInt.c | 2 +- source/libs/executor/src/groupoperator.c | 6 +- source/libs/executor/src/scanoperator.c | 2 +- source/libs/executor/src/timewindowoperator.c | 2 +- source/libs/executor/src/tsort.c | 8 +-- source/libs/scalar/src/filter.c | 16 ++--- 12 files changed, 84 insertions(+), 27 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index d28477ae40..6d8c1b90f8 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -239,7 +239,7 @@ typedef struct SDataBlockInfo { } SDataBlockInfo; typedef struct SSDataBlock { - SColumnDataAgg** pBlockAgg; + SColumnDataAgg* pBlockAgg; SArray* pDataBlock; // SArray SDataBlockInfo info; } SSDataBlock; diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 197fa125f5..58fb3a6f4c 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -256,6 +256,7 @@ SSDataBlock* createDataBlock(); void* blockDataDestroy(SSDataBlock* pBlock); void blockDataFreeRes(SSDataBlock* pBlock); SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData); +SSDataBlock* createBlockDataNotLoaded(SSDataBlock* pDataBlock); SSDataBlock* createSpecialDataBlock(EStreamType type); SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx); diff --git a/include/libs/scalar/filter.h b/include/libs/scalar/filter.h index c1ce1e6fd8..750179ee3b 100644 --- a/include/libs/scalar/filter.h +++ b/include/libs/scalar/filter.h @@ -58,7 +58,7 @@ extern int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict extern int32_t filterConverNcharColumns(SFilterInfo *pFilterInfo, int32_t rows, bool *gotNchar); extern int32_t filterFreeNcharColumns(SFilterInfo *pFilterInfo); extern void filterFreeInfo(SFilterInfo *info); -extern bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg **pColsAgg, int32_t numOfCols, int32_t numOfRows); +extern bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pColsAgg, int32_t numOfCols, int32_t numOfRows); /* condition split interface */ int32_t filterPartitionCond(SNode **pCondition, SNode **pPrimaryKeyCond, SNode **pTagIndexCond, SNode **pTagCond, diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index ac4811fb1b..7b4d20238c 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -848,7 +848,7 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3 if (pBlock->pBlockAgg == NULL) { isNull = colDataIsNull_s(pColData, j); } else { - isNull = colDataIsNull(pColData, pBlock->info.rows, j, pBlock->pBlockAgg[i]); + isNull = colDataIsNull(pColData, pBlock->info.rows, j, &pBlock->pBlockAgg[i]); } if (isNull) { @@ -1733,6 +1733,62 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { return pDstBlock; } +SSDataBlock* createBlockDataNotLoaded(SSDataBlock* pDataBlock) { + if (pDataBlock == NULL) { + return NULL; + } + + SSDataBlock* pDstBlock = createDataBlock(); + pDstBlock->info = pDataBlock->info; + + pDstBlock->info.rows = 0; + pDstBlock->info.capacity = 0; + pDstBlock->info.rowSize = 0; + pDstBlock->info.id = pDataBlock->info.id; + pDstBlock->info.blankFill = pDataBlock->info.blankFill; + + size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); + SColumnInfoData colInfo = {.hasNull = true, .info = p->info}; + blockDataAppendColInfo(pDstBlock, &colInfo); + } + + copyPkVal(&pDstBlock->info, &pDataBlock->info); + + int32_t code = blockDataEnsureCapacity(pDstBlock, pDataBlock->info.rows); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + blockDataDestroy(pDstBlock); + return NULL; + } + + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i); + SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i); + colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info); + } + + pDstBlock->info.rows = pDataBlock->info.rows; + pDstBlock->info.capacity = pDataBlock->info.rows; + + pDstBlock->pBlockAgg = pDataBlock->pBlockAgg; + pDataBlock->pBlockAgg = NULL; + // int numOfSlots = sizeof(pDataBlock->pBlockAgg)/POINTER_BYTES; + // if (pDataBlock->pBlockAgg != NULL) { + // pDstBlock->pBlockAgg = taosMemoryCalloc(numOfSlots, POINTER_BYTES); + // if (pDstBlock->pBlockAgg == NULL) { + // terrno = TSDB_CODE_OUT_OF_MEMORY; + // return NULL; + // } + // for (int j = 0; j < numOfSlots; ++j) { + // pDstBlock->pBlockAgg[j] = &(*pDataBlock->pBlockAgg)[j]; + // } + // } + + return pDstBlock; +} + SSDataBlock* createDataBlock() { SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); if (pBlock == NULL) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 99520f7c92..0771e4d5bf 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -4903,7 +4903,7 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_ } int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave, bool* hasNullSMA) { - SColumnDataAgg*** pBlockSMA = &pDataBlock->pBlockAgg; + SColumnDataAgg** pBlockSMA = &pDataBlock->pBlockAgg; int32_t code = 0; *allHave = false; @@ -4958,7 +4958,7 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock, if (pResBlock->pBlockAgg == NULL) { size_t num = taosArrayGetSize(pResBlock->pDataBlock); - pResBlock->pBlockAgg = taosMemoryCalloc(num, POINTER_BYTES); + pResBlock->pBlockAgg = taosMemoryCalloc(num, sizeof(SColumnDataAgg)); } // do fill all null column value SMA info @@ -4970,13 +4970,13 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock, while (j < numOfCols && i < size) { SColumnDataAgg* pAgg = &pSup->colAggArray.data[i]; if (pAgg->colId == pSup->colId[j]) { - pResBlock->pBlockAgg[pSup->slotId[j]] = pAgg; + pResBlock->pBlockAgg[pSup->slotId[j]] = *pAgg; i += 1; j += 1; } else if (pAgg->colId < pSup->colId[j]) { i += 1; } else if (pSup->colId[j] < pAgg->colId) { - pResBlock->pBlockAgg[pSup->slotId[j]] = NULL; + pResBlock->pBlockAgg[pSup->slotId[j]].colId = -1; *allHave = false; j += 1; } diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index d06beebd6b..7672dd60aa 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -2354,7 +2354,7 @@ int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t ol for (int32_t i = 0; i < pSortGroupCols->size; ++i) { const SColumn* pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i); const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId); - if (pBlock->pBlockAgg) pColAgg = pBlock->pBlockAgg[pCol->slotId]; + if (pBlock->pBlockAgg) pColAgg = &pBlock->pBlockAgg[pCol->slotId]; if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) { if (isNull[i] != 1) return 1; @@ -2389,7 +2389,7 @@ int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId); if (pCol->slotId > pBlock->pDataBlock->size) continue; - if (pBlock->pBlockAgg) pColAgg = pBlock->pBlockAgg[pCol->slotId]; + if (pBlock->pBlockAgg) pColAgg = &pBlock->pBlockAgg[pCol->slotId]; if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) { isNull[i] = 1; diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index 43c04ca8d9..03b9a374c1 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -434,7 +434,7 @@ void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pB if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) { int32_t slotId = pFuncParam->pCol->slotId; - pInput->pColumnDataAgg[j] = pBlock->pBlockAgg[slotId]; + pInput->pColumnDataAgg[j] = &pBlock->pBlockAgg[slotId]; if (pInput->pColumnDataAgg[j] == NULL) { pInput->colDataSMAIsSet = false; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index c6a7804b9c..714c25aa3f 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -133,7 +133,7 @@ static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlo SColumn* pCol = taosArrayGet(pGroupCols, i); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId); if (pBlock->pBlockAgg != NULL) { - pColAgg = pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? + pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? } bool isNull = colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg); @@ -189,7 +189,7 @@ static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSData } if (pBlock->pBlockAgg != NULL) { - pColAgg = pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? + pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? } SGroupKeys* pkey = taosArrayGet(pGroupColVals, i); @@ -653,7 +653,7 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { setBufPageDirty(pPage, true); releaseBufPage(pInfo->pBuf, pPage); } else { - SSDataBlock* dataNotLoadBlock = createOneDataBlock(pBlock, true); + SSDataBlock* dataNotLoadBlock = createBlockDataNotLoaded(pBlock); if (dataNotLoadBlock == NULL) { T_LONG_JMP(pTaskInfo->env, terrno); } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index fb86e73a64..1bbc92005c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -220,7 +220,7 @@ static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* return code; } -static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg** pColsAgg, int32_t numOfCols, +static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg* pColsAgg, int32_t numOfCols, int32_t numOfRows) { if (pColsAgg == NULL || pFilterInfo == NULL) { return true; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index b72811cdcc..7a70a3c2a0 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -911,7 +911,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI struct SColumnDataAgg* pAgg = NULL; for (int32_t j = 0; j < pBlock->info.rows; ++j) { - pAgg = (pBlock->pBlockAgg != NULL) ? pBlock->pBlockAgg[pInfo->stateCol.slotId] : NULL; + pAgg = (pBlock->pBlockAgg != NULL) ? &pBlock->pBlockAgg[pInfo->stateCol.slotId] : NULL; if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) { continue; } diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index daac98bbfc..1715065d24 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -651,7 +651,7 @@ int32_t tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock, leftNull = colDataIsNull_t(pLeftColInfoData, leftRowIndex, isVarType); } else { leftNull = - colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, leftRowIndex, pLeftBlock->pBlockAgg[pOrder->slotId]); + colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, leftRowIndex, &pLeftBlock->pBlockAgg[pOrder->slotId]); } } @@ -661,7 +661,7 @@ int32_t tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock, rightNull = colDataIsNull_t(pRightColInfoData, rightRowIndex, isVarType); } else { rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, rightRowIndex, - pRightBlock->pBlockAgg[pOrder->slotId]); + &pRightBlock->pBlockAgg[pOrder->slotId]); } } @@ -742,7 +742,7 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) { leftNull = colDataIsNull_t(pLeftColInfoData, pLeftSource->src.rowIndex, isVarType); } else { leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, - pLeftBlock->pBlockAgg[i]); + &pLeftBlock->pBlockAgg[i]); } } @@ -752,7 +752,7 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) { rightNull = colDataIsNull_t(pRightColInfoData, pRightSource->src.rowIndex, isVarType); } else { rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, - pRightBlock->pBlockAgg[i]); + &pRightBlock->pBlockAgg[i]); } } diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index 57f2543691..ea80ffd076 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -3782,7 +3782,7 @@ int32_t fltSclBuildRangeFromBlockSma(SFltSclColumnRange *colRange, SColumnDataAg return TSDB_CODE_SUCCESS; } -bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg **pDataStatis, int32_t numOfCols, int32_t numOfRows) { +bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t numOfCols, int32_t numOfRows) { if (info->scalarMode) { SArray *colRanges = info->sclCtx.fltSclRange; for (int32_t i = 0; i < taosArrayGetSize(colRanges); ++i) { @@ -3790,13 +3790,13 @@ bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg **pDataStatis, int32_t bool foundCol = false; int32_t j = 0; for (; j < numOfCols; ++j) { - if (pDataStatis[j] != NULL && pDataStatis[j]->colId == colRange->colNode->colId) { + if (pDataStatis[j].colId == colRange->colNode->colId) { foundCol = true; break; } } if (foundCol) { - SColumnDataAgg *pAgg = pDataStatis[j]; + SColumnDataAgg *pAgg = &pDataStatis[j]; SArray *points = taosArrayInit(2, sizeof(SFltSclPoint)); fltSclBuildRangeFromBlockSma(colRange, pAgg, numOfRows, points); qDebug("column data agg: nulls %d, rows %d, max %" PRId64 " min %" PRId64, pAgg->numOfNull, numOfRows, @@ -3833,7 +3833,7 @@ bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg **pDataStatis, int32_t int32_t index = -1; SFilterRangeCtx *ctx = info->colRange[k]; for (int32_t i = 0; i < numOfCols; ++i) { - if (pDataStatis[i] != NULL && pDataStatis[i]->colId == ctx->colId) { + if (pDataStatis[i].colId == ctx->colId) { index = i; break; } @@ -3849,13 +3849,13 @@ bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg **pDataStatis, int32_t break; } - if (pDataStatis[index]->numOfNull <= 0) { + if (pDataStatis[index].numOfNull <= 0) { if (ctx->isnull && !ctx->notnull && !ctx->isrange) { ret = false; break; } - } else if (pDataStatis[index]->numOfNull > 0) { - if (pDataStatis[index]->numOfNull == numOfRows) { + } else if (pDataStatis[index].numOfNull > 0) { + if (pDataStatis[index].numOfNull == numOfRows) { if ((ctx->notnull || ctx->isrange) && (!ctx->isnull)) { ret = false; break; @@ -3869,7 +3869,7 @@ bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg **pDataStatis, int32_t } } - SColumnDataAgg *pDataBlockst = pDataStatis[index]; + SColumnDataAgg *pDataBlockst = &pDataStatis[index]; SFilterRangeNode *r = ctx->rs; float minv = 0; From 7a8e87f8cd7f27378393a87e91ed9bed3f6e0ce8 Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Tue, 4 Jun 2024 19:05:19 +0800 Subject: [PATCH 03/11] fix: slot id --- include/common/tdatablock.h | 3 +- source/common/src/tdatablock.c | 58 +--------------------- source/libs/executor/src/groupoperator.c | 63 +++++++++++++++++++++++- 3 files changed, 65 insertions(+), 59 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 58fb3a6f4c..722f18c52d 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -256,7 +256,6 @@ SSDataBlock* createDataBlock(); void* blockDataDestroy(SSDataBlock* pBlock); void blockDataFreeRes(SSDataBlock* pBlock); SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData); -SSDataBlock* createBlockDataNotLoaded(SSDataBlock* pDataBlock); SSDataBlock* createSpecialDataBlock(EStreamType type); SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx); @@ -283,6 +282,8 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* p void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList); +void copyPkVal(SDataBlockInfo* pDst, const SDataBlockInfo* pSrc); + #ifdef __cplusplus } #endif diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 7b4d20238c..7b0de5adcf 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -21,7 +21,7 @@ #define MALLOC_ALIGN_BYTES 32 -static void copyPkVal(SDataBlockInfo* pDst, const SDataBlockInfo* pSrc); + int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) { if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { @@ -1733,62 +1733,6 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { return pDstBlock; } -SSDataBlock* createBlockDataNotLoaded(SSDataBlock* pDataBlock) { - if (pDataBlock == NULL) { - return NULL; - } - - SSDataBlock* pDstBlock = createDataBlock(); - pDstBlock->info = pDataBlock->info; - - pDstBlock->info.rows = 0; - pDstBlock->info.capacity = 0; - pDstBlock->info.rowSize = 0; - pDstBlock->info.id = pDataBlock->info.id; - pDstBlock->info.blankFill = pDataBlock->info.blankFill; - - size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); - SColumnInfoData colInfo = {.hasNull = true, .info = p->info}; - blockDataAppendColInfo(pDstBlock, &colInfo); - } - - copyPkVal(&pDstBlock->info, &pDataBlock->info); - - int32_t code = blockDataEnsureCapacity(pDstBlock, pDataBlock->info.rows); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - blockDataDestroy(pDstBlock); - return NULL; - } - - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i); - SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i); - colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info); - } - - pDstBlock->info.rows = pDataBlock->info.rows; - pDstBlock->info.capacity = pDataBlock->info.rows; - - pDstBlock->pBlockAgg = pDataBlock->pBlockAgg; - pDataBlock->pBlockAgg = NULL; - // int numOfSlots = sizeof(pDataBlock->pBlockAgg)/POINTER_BYTES; - // if (pDataBlock->pBlockAgg != NULL) { - // pDstBlock->pBlockAgg = taosMemoryCalloc(numOfSlots, POINTER_BYTES); - // if (pDstBlock->pBlockAgg == NULL) { - // terrno = TSDB_CODE_OUT_OF_MEMORY; - // return NULL; - // } - // for (int j = 0; j < numOfSlots; ++j) { - // pDstBlock->pBlockAgg[j] = &(*pDataBlock->pBlockAgg)[j]; - // } - // } - - return pDstBlock; -} - SSDataBlock* createDataBlock() { SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); if (pBlock == NULL) { diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 714c25aa3f..c44ba4eecb 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -565,6 +565,67 @@ _error: return NULL; } +SSDataBlock* createBlockDataNotLoaded(const SOperatorInfo* pOperator, SSDataBlock* pDataBlock) { + if (pDataBlock == NULL) { + return NULL; + } + + SSDataBlock* pDstBlock = createDataBlock(); + pDstBlock->info = pDataBlock->info; + + pDstBlock->info.rows = 0; + pDstBlock->info.capacity = 0; + pDstBlock->info.rowSize = 0; + pDstBlock->info.id = pDataBlock->info.id; + pDstBlock->info.blankFill = pDataBlock->info.blankFill; + + size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); + + pDstBlock->pBlockAgg = taosMemoryCalloc(numOfCols, sizeof(SColumnDataAgg)); + if (pDstBlock->pBlockAgg == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + copyPkVal(&pDstBlock->info, &pDataBlock->info); + + for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { + SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i]; + int32_t slotId = pExpr->base.pParam[0].pCol->slotId; + SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, slotId); + if (pSrc) { + SColumnInfoData colInfo = {.hasNull = true, .info = pSrc->info}; + blockDataAppendColInfo(pDstBlock, &colInfo); + } + } + + int32_t code = blockDataEnsureCapacity(pDstBlock, pDataBlock->info.rows); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + blockDataDestroy(pDstBlock); + return NULL; + } + + for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { + SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i]; + int32_t slotId = pExpr->base.pParam[0].pCol->slotId; + if (slotId < numOfCols) { + pDstBlock->pBlockAgg[slotId] = pDataBlock->pBlockAgg[i]; + pDstBlock->pBlockAgg[slotId].colId = i; + } + + // SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, slotId); + // if (pSrc) { + // SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i); + // colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info); + // } + } + + pDstBlock->info.rows = pDataBlock->info.rows; + pDstBlock->info.capacity = pDataBlock->info.rows; + + return pDstBlock; +} + static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { SPartitionOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -653,7 +714,7 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { setBufPageDirty(pPage, true); releaseBufPage(pInfo->pBuf, pPage); } else { - SSDataBlock* dataNotLoadBlock = createBlockDataNotLoaded(pBlock); + SSDataBlock* dataNotLoadBlock = createBlockDataNotLoaded(pOperator, pBlock); if (dataNotLoadBlock == NULL) { T_LONG_JMP(pTaskInfo->env, terrno); } From 6d98a56778f114c7618fdc9bf3f9707fe5fcf714 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Chappyguoxy=E2=80=9D?= <“happy_guoxy@163.com”> Date: Fri, 7 Jun 2024 16:49:29 +0800 Subject: [PATCH 04/11] fix group count --- source/libs/executor/inc/executorInt.h | 2 + source/libs/executor/src/groupoperator.c | 113 +++++++++++++---------- source/libs/function/src/builtinsimpl.c | 2 +- 3 files changed, 66 insertions(+), 51 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 592231f043..fba8193ee5 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -624,6 +624,8 @@ typedef struct SDataGroupInfo { uint64_t groupId; int64_t numOfRows; SArray* pPageList; + SArray* blockForNotLoaded; // SSDataBlock that data is not loaded + int32_t offsetForNotLoaded; // read offset for SSDataBlock that data is not loaded } SDataGroupInfo; typedef struct SWindowRowsSup { diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index c44ba4eecb..9473f650ae 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -53,8 +53,6 @@ typedef struct SPartitionOperatorInfo { int32_t rowCapacity; // maximum number of rows for each buffer page int32_t* columnOffset; // start position for each column data SArray* sortedGroupArray; // SDataGroupInfo sorted by group id - SArray* blockForNotLoaded; // SSDataBlock that data is not loaded - int32_t offsetForNotLoaded;// read offset for SSDataBlock that data is not loaded int32_t groupIndex; // group index int32_t pageIndex; // page index of current group SExprSupp scalarSup; @@ -573,21 +571,14 @@ SSDataBlock* createBlockDataNotLoaded(const SOperatorInfo* pOperator, SSDataBloc SSDataBlock* pDstBlock = createDataBlock(); pDstBlock->info = pDataBlock->info; - pDstBlock->info.rows = 0; + copyPkVal(&pDstBlock->info, &pDataBlock->info); + pDstBlock->info.rows = pDataBlock->info.rows; pDstBlock->info.capacity = 0; pDstBlock->info.rowSize = 0; pDstBlock->info.id = pDataBlock->info.id; pDstBlock->info.blankFill = pDataBlock->info.blankFill; size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); - - pDstBlock->pBlockAgg = taosMemoryCalloc(numOfCols, sizeof(SColumnDataAgg)); - if (pDstBlock->pBlockAgg == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - copyPkVal(&pDstBlock->info, &pDataBlock->info); - for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i]; int32_t slotId = pExpr->base.pParam[0].pCol->slotId; @@ -605,24 +596,25 @@ SSDataBlock* createBlockDataNotLoaded(const SOperatorInfo* pOperator, SSDataBloc return NULL; } - for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { - SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i]; - int32_t slotId = pExpr->base.pParam[0].pCol->slotId; - if (slotId < numOfCols) { - pDstBlock->pBlockAgg[slotId] = pDataBlock->pBlockAgg[i]; - pDstBlock->pBlockAgg[slotId].colId = i; + if (pDataBlock->pBlockAgg != NULL) { + pDstBlock->pBlockAgg = taosMemoryCalloc(numOfCols, sizeof(SColumnDataAgg)); + if (pDstBlock->pBlockAgg == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + blockDataDestroy(pDstBlock); + return NULL; + } + for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { + SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i]; + int32_t slotId = pExpr->base.pParam[0].pCol->slotId; + if (slotId < numOfCols) { + pDstBlock->pBlockAgg[slotId] = pDataBlock->pBlockAgg[i]; + SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i); + SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, slotId); + colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info); + } } - - // SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, slotId); - // if (pSrc) { - // SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i); - // colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info); - // } } - pDstBlock->info.rows = pDataBlock->info.rows; - pDstBlock->info.capacity = pDataBlock->info.rows; - return pDstBlock; } @@ -718,13 +710,14 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { if (dataNotLoadBlock == NULL) { T_LONG_JMP(pTaskInfo->env, terrno); } - if (pInfo->blockForNotLoaded == NULL) { - pInfo->blockForNotLoaded = taosArrayInit(1, sizeof(SSDataBlock)); - pInfo->offsetForNotLoaded = 0; + if (pGroupInfo->blockForNotLoaded == NULL) { + pGroupInfo->blockForNotLoaded = taosArrayInit(1, sizeof(SSDataBlock)); + pGroupInfo->offsetForNotLoaded = 0; } dataNotLoadBlock->info.id.groupId = pGroupInfo->groupId; dataNotLoadBlock->info.dataLoad = 0; - taosArrayInsert(pInfo->blockForNotLoaded, pInfo->blockForNotLoaded->size, &dataNotLoadBlock); + pInfo->binfo.pRes->info.rows = pBlock->info.rows; + taosArrayInsert(pGroupInfo->blockForNotLoaded, pGroupInfo->blockForNotLoaded->size, &dataNotLoadBlock); break; } } @@ -808,18 +801,18 @@ static void clearPartitionOperator(SPartitionOperatorInfo* pInfo) { int32_t size = taosArrayGetSize(pInfo->sortedGroupArray); for (int32_t i = 0; i < size; i++) { SDataGroupInfo* pGp = taosArrayGet(pInfo->sortedGroupArray, i); + if (pGp->blockForNotLoaded) { + for (int32_t i = 0; i < pGp->blockForNotLoaded->size; i++) { + SSDataBlock** pBlock = taosArrayGet(pGp->blockForNotLoaded, i); + blockDataDestroy(*pBlock); + } + taosArrayClear(pGp->blockForNotLoaded); + pGp->offsetForNotLoaded = 0; + } taosArrayDestroy(pGp->pPageList); } taosArrayClear(pInfo->sortedGroupArray); clearDiskbasedBuf(pInfo->pBuf); - if (pInfo->blockForNotLoaded) { - for (int32_t i = 0; i < pInfo->blockForNotLoaded->size; i++) { - SSDataBlock** pBlock = taosArrayGet(pInfo->blockForNotLoaded, i); - blockDataDestroy(*pBlock); - } - taosArrayClear(pInfo->blockForNotLoaded); - pInfo->offsetForNotLoaded = 0; - } } static int compareDataGroupInfo(const void* group1, const void* group2) { @@ -833,19 +826,13 @@ static int compareDataGroupInfo(const void* group1, const void* group2) { return (pGroupInfo1->groupId < pGroupInfo2->groupId) ? -1 : 1; } -static SSDataBlock* buildPartitionResultForNotLoadBlock(SOperatorInfo* pOperator) { - SPartitionOperatorInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - - if (pInfo->blockForNotLoaded && pInfo->offsetForNotLoaded < pInfo->blockForNotLoaded->size) { - SSDataBlock** pBlock = taosArrayGet(pInfo->blockForNotLoaded, pInfo->offsetForNotLoaded); - pInfo->offsetForNotLoaded++; +static SSDataBlock* buildPartitionResultForNotLoadBlock(SDataGroupInfo* pGroupInfo) { + if (pGroupInfo->blockForNotLoaded && pGroupInfo->offsetForNotLoaded < pGroupInfo->blockForNotLoaded->size) { + SSDataBlock** pBlock = taosArrayGet(pGroupInfo->blockForNotLoaded, pGroupInfo->offsetForNotLoaded); + pGroupInfo->offsetForNotLoaded++; return *pBlock; - } else { - setOperatorCompleted(pOperator); - clearPartitionOperator(pInfo); - return NULL; } + return NULL; } static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { @@ -857,9 +844,15 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { SDataGroupInfo* pGroupInfo = (pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL; if (pInfo->groupIndex == -1 || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) { + if(pGroupInfo != NULL) { + SSDataBlock* ret = buildPartitionResultForNotLoadBlock(pGroupInfo); + if(ret != NULL) return ret; + } // try next group data if (pInfo->groupIndex + 1 >= taosArrayGetSize(pInfo->sortedGroupArray)) { - return buildPartitionResultForNotLoadBlock(pOperator); + setOperatorCompleted(pOperator); + clearPartitionOperator(pInfo); + return NULL; } ++pInfo->groupIndex; @@ -873,6 +866,22 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo)); T_LONG_JMP(pTaskInfo->env, terrno); } + if (*(int32_t*)page == 0) { + SSDataBlock* ret = buildPartitionResultForNotLoadBlock(pGroupInfo); + if (ret != NULL) return ret; + releaseBufPage(pInfo->pBuf, page); + if (pInfo->pageIndex < taosArrayGetSize(pGroupInfo->pPageList)) { + pInfo->pageIndex += 1; + } else if (pInfo->groupIndex + 1 < taosArrayGetSize(pInfo->sortedGroupArray)) { + pInfo->groupIndex++; + pInfo->pageIndex = 0; + } else { + setOperatorCompleted(pOperator); + clearPartitionOperator(pInfo); + return NULL; + } + return buildPartitionResult(pOperator); + } blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->rowCapacity); blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity); @@ -882,6 +891,8 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { pInfo->binfo.pRes->info.id.groupId = pGroupInfo->groupId; pInfo->binfo.pRes->info.dataLoad = 1; pInfo->orderedRows = 0; + } else if (pInfo->pOrderInfoArr == NULL) { + qError("Exception, remainRows not zero, but pOrderInfoArr is NULL"); } if (pInfo->pOrderInfoArr) { @@ -944,6 +955,8 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) { while (pGroupIter != NULL) { SDataGroupInfo* pGroupInfo = pGroupIter; taosArrayPush(groupArray, pGroupInfo); + static int i = 0; + qInfo("groupArray push %p %p %d times", pGroupInfo, pGroupInfo->blockForNotLoaded, ++i); pGroupIter = taosHashIterate(pInfo->pGroupSet, pGroupIter); } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 3fb298e1ea..36de40a38a 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3628,7 +3628,7 @@ int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* SColumnInfoData* pColInfo = taosArrayGet(pSrcBlock->pDataBlock, pCtx->saveHandle.pState->tsIndex); ASSERT(pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP); key.groupId = pSrcBlock->info.id.groupId; - key.ts = *(int64_t*)colDataGetData(pColInfo, rowIndex);; + key.ts = *(int64_t*)colDataGetData(pColInfo, rowIndex); } char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf); From 16ceacac2b3d2fbb6c9b150eb93923617b46e42b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Chappyguoxy=E2=80=9D?= <“happy_guoxy@163.com”> Date: Fri, 7 Jun 2024 20:12:18 +0800 Subject: [PATCH 05/11] fix: no tag value --- source/libs/executor/src/groupoperator.c | 26 ++++++++++-------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 9473f650ae..63be7eaa67 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -570,23 +570,16 @@ SSDataBlock* createBlockDataNotLoaded(const SOperatorInfo* pOperator, SSDataBloc SSDataBlock* pDstBlock = createDataBlock(); pDstBlock->info = pDataBlock->info; - - copyPkVal(&pDstBlock->info, &pDataBlock->info); - pDstBlock->info.rows = pDataBlock->info.rows; pDstBlock->info.capacity = 0; pDstBlock->info.rowSize = 0; - pDstBlock->info.id = pDataBlock->info.id; - pDstBlock->info.blankFill = pDataBlock->info.blankFill; size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i]; int32_t slotId = pExpr->base.pParam[0].pCol->slotId; SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, slotId); - if (pSrc) { - SColumnInfoData colInfo = {.hasNull = true, .info = pSrc->info}; - blockDataAppendColInfo(pDstBlock, &colInfo); - } + SColumnInfoData colInfo = {.hasNull = true, .info = pSrc->info}; + blockDataAppendColInfo(pDstBlock, &colInfo); } int32_t code = blockDataEnsureCapacity(pDstBlock, pDataBlock->info.rows); @@ -606,15 +599,18 @@ SSDataBlock* createBlockDataNotLoaded(const SOperatorInfo* pOperator, SSDataBloc for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i]; int32_t slotId = pExpr->base.pParam[0].pCol->slotId; - if (slotId < numOfCols) { - pDstBlock->pBlockAgg[slotId] = pDataBlock->pBlockAgg[i]; - SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i); - SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, slotId); - colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info); - } + pDstBlock->pBlockAgg[i] = pDataBlock->pBlockAgg[slotId]; } } + for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { + SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i]; + int32_t slotId = pExpr->base.pParam[0].pCol->slotId; + SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, slotId); + SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i); + colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info); + } + return pDstBlock; } From 5f347a0b223590f85b2e00ac4eaf015b74413fa0 Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Wed, 12 Jun 2024 11:50:03 +0800 Subject: [PATCH 06/11] test case --- tests/parallel_test/cases.task | 5 + .../2-query/partition_limit_interval.py | 105 ++++++++++++++++++ tests/system-test/runAllOne.sh | 5 + 3 files changed, 115 insertions(+) create mode 100755 tests/system-test/2-query/partition_limit_interval.py diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 61687eeccd..bed1c20421 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -535,6 +535,8 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/mavg.py -R ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_partition.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_partition.py -R +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_limit_interval.py +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_limit_interval.py -R ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_min_last_interval.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row_interval.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max.py @@ -779,6 +781,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_null.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_partition.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_partition.py -Q 2 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_limit_interval.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_min_last_interval.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row_interval.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row.py -Q 2 @@ -874,6 +877,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_null.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_partition.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_partition.py -Q 3 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_limit_interval.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_min_last_interval.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row_interval.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row.py -Q 3 @@ -971,6 +975,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_null.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_partition.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_partition.py -Q 4 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_limit_interval.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_min_last_interval.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row_interval.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row.py -Q 4 diff --git a/tests/system-test/2-query/partition_limit_interval.py b/tests/system-test/2-query/partition_limit_interval.py new file mode 100755 index 0000000000..287c7b7619 --- /dev/null +++ b/tests/system-test/2-query/partition_limit_interval.py @@ -0,0 +1,105 @@ +from util.log import * +from util.sql import * +from util.cases import * + +class TDTestCase: + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), True) + + self.row_nums = 1000 + self.tb_nums = 10 + self.ts = 1537146000000 + self.dbname = "db1" + self.stable = "meters" + + def prepare_datas(self, stb_name , tb_nums , row_nums, dbname="db" ): + tdSql.execute(f'''create database db1 MAXROWS 4096 MINROWS 100''') + tdSql.execute(f'''use {self.dbname}''') + tdSql.execute(f'''CREATE STABLE {self.stable} (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT) TAGS (`groupid` TINYINT, `location` VARCHAR(16))''') + + for i in range(self.tb_nums): + tbname = f"{self.dbname}.sub_{self.stable}_{i}" + ts = self.ts + i*10000 + tdSql.execute(f"create table {tbname} using {self.dbname}.{self.stable} tags({i} ,'nchar_{i}')") + tdLog.info(f"create table {tbname} using {self.dbname}.{self.stable} tags({i} ,'nchar_{i}')") + if i < (self.tb_nums - 2): + for row in range(row_nums): + ts = self.ts + row*1000 + tdSql.execute(f"insert into {tbname} values({ts} , {row/10}, {215 + (row % 100)})") + + for null in range(5): + ts = self.ts + row_nums*1000 + null*1000 + tdSql.execute(f"insert into {tbname} values({ts} , NULL , NULL)") + + def basic_query(self): + tdSql.query(f"select groupid, count(*) from {self.dbname}.{self.stable} partition by groupid interval(1d) limit 100") + tdSql.checkRows(8) + tdSql.checkData(0, 1, 1005) + + tdSql.query(f"select groupid, count(*) from {self.dbname}.{self.stable} partition by tbname interval(1d) order by groupid limit 100;") + tdSql.checkRows(8) + tdSql.checkData(0, 0, 0) + tdSql.checkData(0, 1, 1005) + tdSql.checkData(7, 0, 7) + tdSql.checkData(7, 1, 1005) + + tdSql.query(f"select groupid, count(*) from {self.dbname}.{self.stable} partition by tbname, groupid interval(5d) order by groupid limit 10") + tdSql.checkRows(8) + tdSql.checkData(0, 0, 0) + tdSql.checkData(0, 1, 1005) + tdSql.checkData(7, 0, 7) + tdSql.checkData(7, 1, 1005) + + tdSql.query(f"select groupid, count(*), min(current) from {self.dbname}.{self.stable} partition by groupid interval(5d) order by groupid limit 10;") + tdSql.checkRows(8) + tdSql.checkData(0, 0, 0) + tdSql.checkData(0, 1, 1005) + tdSql.checkData(0, 2, 0) + tdSql.checkData(7, 0, 7) + tdSql.checkData(7, 1, 1005) + tdSql.checkData(7, 2, 0) + + tdSql.query(f"select groupid, min(current) from {self.dbname}.{self.stable} partition by groupid interval(5d) limit 100;") + tdSql.checkRows(8) + tdSql.checkData(0, 1, 0) + + tdSql.query(f"select groupid, avg(current) from {self.dbname}.{self.stable} partition by groupid interval(5d) limit 10000;") + tdSql.checkRows(8) + tdSql.checkData(0, 1, tdSql.getData(7, 1)) + + tdSql.query(f"select current, avg(current) from {self.dbname}.{self.stable} partition by current interval(5d) limit 100;") + tdSql.checkData(0, 0, tdSql.getData(0, 1)) + + tdSql.query(f"select groupid, last(voltage), min(current) from {self.dbname}.{self.stable} partition by groupid interval(5d) limit 10") + tdSql.checkRows(8) + tdSql.checkData(0, 1, tdSql.getData(7, 1)) + tdSql.checkData(0, 2, tdSql.getData(7, 2)) + + tdSql.query(f"select groupid, min(current), min(voltage) from {self.dbname}.{self.stable} partition by tbname, groupid interval(5d) limit 100;") + tdSql.checkRows(8) + tdSql.checkData(0, 1, 0) + tdSql.checkData(0, 2, 215) + tdSql.checkData(7, 1, 0) + tdSql.checkData(7, 2, 215) + + tdSql.query(f"select groupid, min(voltage), min(current) from {self.dbname}.{self.stable} partition by tbname, groupid interval(5d) limit 100;") + tdSql.checkRows(8) + tdSql.checkData(0, 2, 0) + tdSql.checkData(0, 1, 215) + tdSql.checkData(7, 2, 0) + tdSql.checkData(7, 1, 215) + + def run(self): + tdSql.prepare() + self.prepare_datas("stb",self.tb_nums,self.row_nums) + self.basic_query() + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/system-test/runAllOne.sh b/tests/system-test/runAllOne.sh index 099ae1bbd3..79fc2cd363 100644 --- a/tests/system-test/runAllOne.sh +++ b/tests/system-test/runAllOne.sh @@ -235,6 +235,8 @@ python3 ./test.py -f 2-query/mavg.py -P python3 ./test.py -f 2-query/mavg.py -P -R python3 ./test.py -f 2-query/max_partition.py -P python3 ./test.py -f 2-query/max_partition.py -P -R +python3 ./test.py -f 2-query/partition_limit_interval.py -P +python3 ./test.py -f 2-query/partition_limit_interval.py -P -R python3 ./test.py -f 2-query/max_min_last_interval.py -P python3 ./test.py -f 2-query/last_row_interval.py -P python3 ./test.py -f 2-query/max.py -P @@ -481,6 +483,7 @@ python3 ./test.py -f 2-query/irate.py -P -Q 2 python3 ./test.py -f 2-query/function_null.py -P -Q 2 python3 ./test.py -f 2-query/count_partition.py -P -Q 2 python3 ./test.py -f 2-query/max_partition.py -P -Q 2 +python3 ./test.py -f 2-query/partition_limit_interval.py -P -Q 2 python3 ./test.py -f 2-query/max_min_last_interval.py -P -Q 2 python3 ./test.py -f 2-query/last_row_interval.py -P -Q 2 python3 ./test.py -f 2-query/last_row.py -P -Q 2 @@ -576,6 +579,7 @@ python3 ./test.py -f 2-query/irate.py -P -Q 3 python3 ./test.py -f 2-query/function_null.py -P -Q 3 python3 ./test.py -f 2-query/count_partition.py -P -Q 3 python3 ./test.py -f 2-query/max_partition.py -P -Q 3 +python3 ./test.py -f 2-query/partition_limit_interval.py -P -Q 3 python3 ./test.py -f 2-query/max_min_last_interval.py -P -Q 3 python3 ./test.py -f 2-query/last_row_interval.py -P -Q 3 python3 ./test.py -f 2-query/last_row.py -P -Q 3 @@ -673,6 +677,7 @@ python3 ./test.py -f 2-query/irate.py -P -Q 4 python3 ./test.py -f 2-query/function_null.py -P -Q 4 python3 ./test.py -f 2-query/count_partition.py -P -Q 4 python3 ./test.py -f 2-query/max_partition.py -P -Q 4 +python3 ./test.py -f 2-query/partition_limit_interval.py -P -Q 4 python3 ./test.py -f 2-query/max_min_last_interval.py -P -Q 4 python3 ./test.py -f 2-query/last_row_interval.py -P -Q 4 python3 ./test.py -f 2-query/last_row.py -P -Q 4 From 5e4107df9324220d600fa4fa44fc5d320b9edbcd Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Wed, 12 Jun 2024 13:22:03 +0800 Subject: [PATCH 07/11] test case --- tests/system-test/2-query/partition_limit_interval.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/system-test/2-query/partition_limit_interval.py b/tests/system-test/2-query/partition_limit_interval.py index 287c7b7619..dc0aabbfdd 100755 --- a/tests/system-test/2-query/partition_limit_interval.py +++ b/tests/system-test/2-query/partition_limit_interval.py @@ -15,9 +15,9 @@ class TDTestCase: self.stable = "meters" def prepare_datas(self, stb_name , tb_nums , row_nums, dbname="db" ): - tdSql.execute(f'''create database db1 MAXROWS 4096 MINROWS 100''') + tdSql.execute(f'''create database {self.dbname} MAXROWS 4096 MINROWS 100''') tdSql.execute(f'''use {self.dbname}''') - tdSql.execute(f'''CREATE STABLE {self.stable} (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT) TAGS (`groupid` TINYINT, `location` VARCHAR(16))''') + tdSql.execute(f'''CREATE STABLE {self.dbname}.{self.stable} (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT) TAGS (`groupid` TINYINT, `location` VARCHAR(16))''') for i in range(self.tb_nums): tbname = f"{self.dbname}.sub_{self.stable}_{i}" From ff24eaf94df39bace0398bd50241e6898d5a97c1 Mon Sep 17 00:00:00 2001 From: facetosea <25808407@qq.com> Date: Thu, 13 Jun 2024 14:13:44 +0800 Subject: [PATCH 08/11] fix: code review --- source/libs/executor/src/groupoperator.c | 50 ++++++++++-------------- 1 file changed, 20 insertions(+), 30 deletions(-) diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 63be7eaa67..e4c650a50b 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -570,45 +570,39 @@ SSDataBlock* createBlockDataNotLoaded(const SOperatorInfo* pOperator, SSDataBloc SSDataBlock* pDstBlock = createDataBlock(); pDstBlock->info = pDataBlock->info; + pDstBlock->info.id = pOperator->resultDataBlockId; pDstBlock->info.capacity = 0; pDstBlock->info.rowSize = 0; - size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); - for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { - SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i]; - int32_t slotId = pExpr->base.pParam[0].pCol->slotId; - SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, slotId); - SColumnInfoData colInfo = {.hasNull = true, .info = pSrc->info}; - blockDataAppendColInfo(pDstBlock, &colInfo); - } - - int32_t code = blockDataEnsureCapacity(pDstBlock, pDataBlock->info.rows); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - blockDataDestroy(pDstBlock); - return NULL; - } - - if (pDataBlock->pBlockAgg != NULL) { + size_t numOfCols = pOperator->exprSupp.numOfExprs; + if (pDataBlock->pBlockAgg) { pDstBlock->pBlockAgg = taosMemoryCalloc(numOfCols, sizeof(SColumnDataAgg)); if (pDstBlock->pBlockAgg == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; blockDataDestroy(pDstBlock); return NULL; } - for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { - SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i]; - int32_t slotId = pExpr->base.pParam[0].pCol->slotId; - pDstBlock->pBlockAgg[i] = pDataBlock->pBlockAgg[slotId]; - } } for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i]; int32_t slotId = pExpr->base.pParam[0].pCol->slotId; SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, slotId); + SColumnInfoData colInfo = {.hasNull = true, .info = pSrc->info}; + blockDataAppendColInfo(pDstBlock, &colInfo); + SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i); + int32_t code = doEnsureCapacity(pDst, &pDstBlock->info, pDataBlock->info.rows, false); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + blockDataDestroy(pDstBlock); + return NULL; + } colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info); + + if (pDataBlock->pBlockAgg) { + pDstBlock->pBlockAgg[i] = pDataBlock->pBlockAgg[slotId]; + } } return pDstBlock; @@ -707,13 +701,13 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { T_LONG_JMP(pTaskInfo->env, terrno); } if (pGroupInfo->blockForNotLoaded == NULL) { - pGroupInfo->blockForNotLoaded = taosArrayInit(1, sizeof(SSDataBlock)); + pGroupInfo->blockForNotLoaded = taosArrayInit(0, sizeof(SSDataBlock*)); pGroupInfo->offsetForNotLoaded = 0; } dataNotLoadBlock->info.id.groupId = pGroupInfo->groupId; dataNotLoadBlock->info.dataLoad = 0; pInfo->binfo.pRes->info.rows = pBlock->info.rows; - taosArrayInsert(pGroupInfo->blockForNotLoaded, pGroupInfo->blockForNotLoaded->size, &dataNotLoadBlock); + taosArrayPush(pGroupInfo->blockForNotLoaded, &dataNotLoadBlock); break; } } @@ -863,12 +857,10 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, terrno); } if (*(int32_t*)page == 0) { + releaseBufPage(pInfo->pBuf, page); SSDataBlock* ret = buildPartitionResultForNotLoadBlock(pGroupInfo); if (ret != NULL) return ret; - releaseBufPage(pInfo->pBuf, page); - if (pInfo->pageIndex < taosArrayGetSize(pGroupInfo->pPageList)) { - pInfo->pageIndex += 1; - } else if (pInfo->groupIndex + 1 < taosArrayGetSize(pInfo->sortedGroupArray)) { + if (pInfo->groupIndex + 1 < taosArrayGetSize(pInfo->sortedGroupArray)) { pInfo->groupIndex++; pInfo->pageIndex = 0; } else { @@ -951,8 +943,6 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) { while (pGroupIter != NULL) { SDataGroupInfo* pGroupInfo = pGroupIter; taosArrayPush(groupArray, pGroupInfo); - static int i = 0; - qInfo("groupArray push %p %p %d times", pGroupInfo, pGroupInfo->blockForNotLoaded, ++i); pGroupIter = taosHashIterate(pInfo->pGroupSet, pGroupIter); } From 6da1215573e5f258951fce1248cab038719712b2 Mon Sep 17 00:00:00 2001 From: facetosea <25808407@qq.com> Date: Thu, 13 Jun 2024 14:47:21 +0800 Subject: [PATCH 09/11] fix blockid --- source/libs/executor/src/groupoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index e4c650a50b..88d5ac4c3c 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -570,7 +570,7 @@ SSDataBlock* createBlockDataNotLoaded(const SOperatorInfo* pOperator, SSDataBloc SSDataBlock* pDstBlock = createDataBlock(); pDstBlock->info = pDataBlock->info; - pDstBlock->info.id = pOperator->resultDataBlockId; + pDstBlock->info.id.blockId = pOperator->resultDataBlockId; pDstBlock->info.capacity = 0; pDstBlock->info.rowSize = 0; From 19f6766c9a181aeb27ce80e3dd63c4165e99585e Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Thu, 13 Jun 2024 19:14:44 +0800 Subject: [PATCH 10/11] fix: blockAgg --- include/common/tdatablock.h | 2 +- source/dnode/vnode/src/tsdb/tsdbRead2.c | 7 ++++++- source/libs/executor/src/executorInt.c | 2 +- source/libs/executor/src/groupoperator.c | 21 +++++++++++---------- source/libs/function/src/detail/tminmax.c | 23 ++++++++--------------- tests/army/test.py | 2 ++ 6 files changed, 29 insertions(+), 28 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 722f18c52d..ec998e9365 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -102,7 +102,7 @@ static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, u return false; } - if (pColAgg != NULL) { + if (pColAgg != NULL && pColAgg->colId != -1) { if (pColAgg->numOfNull == totalRows) { ASSERT(pColumnInfoData->nullbitmap == NULL); return true; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 0771e4d5bf..b4aca0a8a1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -4959,6 +4959,12 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock, if (pResBlock->pBlockAgg == NULL) { size_t num = taosArrayGetSize(pResBlock->pDataBlock); pResBlock->pBlockAgg = taosMemoryCalloc(num, sizeof(SColumnDataAgg)); + if (pResBlock->pBlockAgg == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + for(int i = 0; i < num; ++i) { + pResBlock->pBlockAgg[i].colId = -1; + } } // do fill all null column value SMA info @@ -4976,7 +4982,6 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock, } else if (pAgg->colId < pSup->colId[j]) { i += 1; } else if (pSup->colId[j] < pAgg->colId) { - pResBlock->pBlockAgg[pSup->slotId[j]].colId = -1; *allHave = false; j += 1; } diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index 03b9a374c1..9326bfec43 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -435,7 +435,7 @@ void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pB if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) { int32_t slotId = pFuncParam->pCol->slotId; pInput->pColumnDataAgg[j] = &pBlock->pBlockAgg[slotId]; - if (pInput->pColumnDataAgg[j] == NULL) { + if (pInput->pColumnDataAgg[j]->colId == -1) { pInput->colDataSMAIsSet = false; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 88d5ac4c3c..0feaedaeef 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -591,17 +591,19 @@ SSDataBlock* createBlockDataNotLoaded(const SOperatorInfo* pOperator, SSDataBloc SColumnInfoData colInfo = {.hasNull = true, .info = pSrc->info}; blockDataAppendColInfo(pDstBlock, &colInfo); + pDstBlock->pBlockAgg[i].colId = -1; SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i); - int32_t code = doEnsureCapacity(pDst, &pDstBlock->info, pDataBlock->info.rows, false); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - blockDataDestroy(pDstBlock); - return NULL; - } - colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info); - - if (pDataBlock->pBlockAgg) { + if (pDataBlock->pBlockAgg && pDataBlock->pBlockAgg[slotId].colId != -1) { pDstBlock->pBlockAgg[i] = pDataBlock->pBlockAgg[slotId]; + } else { + int32_t code = doEnsureCapacity(pDst, &pDstBlock->info, pDataBlock->info.rows, false); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + blockDataDestroy(pDstBlock); + return NULL; + } + + colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info); } } @@ -706,7 +708,6 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } dataNotLoadBlock->info.id.groupId = pGroupInfo->groupId; dataNotLoadBlock->info.dataLoad = 0; - pInfo->binfo.pRes->info.rows = pBlock->info.rows; taosArrayPush(pGroupInfo->blockForNotLoaded, &dataNotLoadBlock); break; } diff --git a/source/libs/function/src/detail/tminmax.c b/source/libs/function/src/detail/tminmax.c index a6c91a57ce..653b8adad7 100644 --- a/source/libs/function/src/detail/tminmax.c +++ b/source/libs/function/src/detail/tminmax.c @@ -702,23 +702,16 @@ static void doExtractVal(SColumnInfoData* pCol, int32_t i, int32_t end, SqlFunct } } -static int32_t saveRelatedTuple(SqlFunctionCtx* pCtx, SInputColumnInfoData* pInput, int32_t index, void* tval) { +static int32_t saveRelatedTupleTag(SqlFunctionCtx* pCtx, SInputColumnInfoData* pInput, void* tval) { SColumnInfoData* pCol = pInput->pData[0]; SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SMinmaxResInfo* pBuf = GET_ROWCELL_INTERBUF(pResInfo); - int32_t code = 0; + int32_t code = TSDB_CODE_SUCCESS; if (pCtx->subsidiaries.num > 0) { - index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); - if (index >= 0) { - code = saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } + code = saveTupleData(pCtx, 0, pCtx->pSrcBlock, &pBuf->tuplePos); } - return code; } @@ -758,7 +751,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems) pBuf->v = GET_INT64_VAL(tval); } - code = saveRelatedTuple(pCtx, pInput, index, tval); + code = saveRelatedTupleTag(pCtx, pInput, tval); } else { if (IS_SIGNED_NUMERIC_TYPE(type)) { int64_t prev = 0; @@ -767,7 +760,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems) int64_t val = GET_INT64_VAL(tval); if ((prev < val) ^ isMinFunc) { GET_INT64_VAL(&pBuf->v) = val; - code = saveRelatedTuple(pCtx, pInput, index, tval); + code = saveRelatedTupleTag(pCtx, pInput, tval); } } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { uint64_t prev = 0; @@ -776,7 +769,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems) uint64_t val = GET_UINT64_VAL(tval); if ((prev < val) ^ isMinFunc) { GET_UINT64_VAL(&pBuf->v) = val; - code = saveRelatedTuple(pCtx, pInput, index, tval); + code = saveRelatedTupleTag(pCtx, pInput, tval); } } else if (type == TSDB_DATA_TYPE_DOUBLE) { double prev = 0; @@ -785,7 +778,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems) double val = GET_DOUBLE_VAL(tval); if ((prev < val) ^ isMinFunc) { GET_DOUBLE_VAL(&pBuf->v) = val; - code = saveRelatedTuple(pCtx, pInput, index, tval); + code = saveRelatedTupleTag(pCtx, pInput, tval); } } else if (type == TSDB_DATA_TYPE_FLOAT) { float prev = 0; @@ -794,7 +787,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems) float val = GET_DOUBLE_VAL(tval); if ((prev < val) ^ isMinFunc) { GET_FLOAT_VAL(&pBuf->v) = val; - code = saveRelatedTuple(pCtx, pInput, index, tval); + code = saveRelatedTupleTag(pCtx, pInput, tval); } } } diff --git a/tests/army/test.py b/tests/army/test.py index dda5d7d5b0..332d7f29c4 100644 --- a/tests/army/test.py +++ b/tests/army/test.py @@ -657,8 +657,10 @@ if __name__ == "__main__": conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath()) if fileName == "all": + tdLog.info("Procedures for testing runAllLinux") tdCases.runAllLinux(conn) else: + tdLog.info(f"Procedures for testing runOneLinux {fileName}") tdCases.runOneLinux(conn, fileName, replicaVar) # do restart option From 65044b39643a58baa25863319996ddcef422f2bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Chappyguoxy=E2=80=9D?= <“happy_guoxy@163.com”> Date: Sat, 15 Jun 2024 20:56:58 +0800 Subject: [PATCH 11/11] colId init --- source/libs/executor/src/groupoperator.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 0feaedaeef..710db0045d 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -582,6 +582,9 @@ SSDataBlock* createBlockDataNotLoaded(const SOperatorInfo* pOperator, SSDataBloc blockDataDestroy(pDstBlock); return NULL; } + for(int i = 0; i < numOfCols; ++i) { + pDstBlock->pBlockAgg[i].colId = -1; + } } for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { @@ -591,7 +594,6 @@ SSDataBlock* createBlockDataNotLoaded(const SOperatorInfo* pOperator, SSDataBloc SColumnInfoData colInfo = {.hasNull = true, .info = pSrc->info}; blockDataAppendColInfo(pDstBlock, &colInfo); - pDstBlock->pBlockAgg[i].colId = -1; SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i); if (pDataBlock->pBlockAgg && pDataBlock->pBlockAgg[slotId].colId != -1) { pDstBlock->pBlockAgg[i] = pDataBlock->pBlockAgg[slotId];