diff --git a/include/common/tcommon.h b/include/common/tcommon.h index a5a805ff40..5714990dd5 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -240,7 +240,7 @@ typedef struct SDataBlockInfo { } SDataBlockInfo; typedef struct SSDataBlock { - SColumnDataAgg** pBlockAgg; + SColumnDataAgg* pBlockAgg; SArray* pDataBlock; // SArray SDataBlockInfo info; } SSDataBlock; diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 197fa125f5..ec998e9365 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -102,7 +102,7 @@ static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, u return false; } - if (pColAgg != NULL) { + if (pColAgg != NULL && pColAgg->colId != -1) { if (pColAgg->numOfNull == totalRows) { ASSERT(pColumnInfoData->nullbitmap == NULL); return true; @@ -282,6 +282,8 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* p void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList); +void copyPkVal(SDataBlockInfo* pDst, const SDataBlockInfo* pSrc); + #ifdef __cplusplus } #endif diff --git a/include/libs/scalar/filter.h b/include/libs/scalar/filter.h index c1ce1e6fd8..750179ee3b 100644 --- a/include/libs/scalar/filter.h +++ b/include/libs/scalar/filter.h @@ -58,7 +58,7 @@ extern int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict extern int32_t filterConverNcharColumns(SFilterInfo *pFilterInfo, int32_t rows, bool *gotNchar); extern int32_t filterFreeNcharColumns(SFilterInfo *pFilterInfo); extern void filterFreeInfo(SFilterInfo *info); -extern bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg **pColsAgg, int32_t numOfCols, int32_t numOfRows); +extern bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pColsAgg, int32_t numOfCols, int32_t numOfRows); /* condition split interface */ int32_t filterPartitionCond(SNode **pCondition, SNode **pPrimaryKeyCond, SNode **pTagIndexCond, SNode **pTagCond, diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 8e8c9d9a85..809721d606 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -21,7 +21,7 @@ #define MALLOC_ALIGN_BYTES 32 -static void copyPkVal(SDataBlockInfo* pDst, const SDataBlockInfo* pSrc); + int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) { if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { @@ -848,7 +848,7 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3 if (pBlock->pBlockAgg == NULL) { isNull = colDataIsNull_s(pColData, j); } else { - isNull = colDataIsNull(pColData, pBlock->info.rows, j, pBlock->pBlockAgg[i]); + isNull = colDataIsNull(pColData, pBlock->info.rows, j, &pBlock->pBlockAgg[i]); } if (isNull) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index fe44f01917..c6f13bc0a8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -4897,7 +4897,7 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_ } int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave, bool* hasNullSMA) { - SColumnDataAgg*** pBlockSMA = &pDataBlock->pBlockAgg; + SColumnDataAgg** pBlockSMA = &pDataBlock->pBlockAgg; int32_t code = 0; *allHave = false; @@ -4952,7 +4952,13 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock, if (pResBlock->pBlockAgg == NULL) { size_t num = taosArrayGetSize(pResBlock->pDataBlock); - pResBlock->pBlockAgg = taosMemoryCalloc(num, POINTER_BYTES); + pResBlock->pBlockAgg = taosMemoryCalloc(num, sizeof(SColumnDataAgg)); + if (pResBlock->pBlockAgg == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + for(int i = 0; i < num; ++i) { + pResBlock->pBlockAgg[i].colId = -1; + } } // do fill all null column value SMA info @@ -4964,13 +4970,12 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock, while (j < numOfCols && i < size) { SColumnDataAgg* pAgg = &pSup->colAggArray.data[i]; if (pAgg->colId == pSup->colId[j]) { - pResBlock->pBlockAgg[pSup->slotId[j]] = pAgg; + pResBlock->pBlockAgg[pSup->slotId[j]] = *pAgg; i += 1; j += 1; } else if (pAgg->colId < pSup->colId[j]) { i += 1; } else if (pSup->colId[j] < pAgg->colId) { - pResBlock->pBlockAgg[pSup->slotId[j]] = NULL; *allHave = false; j += 1; } diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 592231f043..fba8193ee5 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -624,6 +624,8 @@ typedef struct SDataGroupInfo { uint64_t groupId; int64_t numOfRows; SArray* pPageList; + SArray* blockForNotLoaded; // SSDataBlock that data is not loaded + int32_t offsetForNotLoaded; // read offset for SSDataBlock that data is not loaded } SDataGroupInfo; typedef struct SWindowRowsSup { diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 9ca681779d..cb2a75407a 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -2368,7 +2368,7 @@ int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t ol for (int32_t i = 0; i < pSortGroupCols->size; ++i) { const SColumn* pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i); const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId); - if (pBlock->pBlockAgg) pColAgg = pBlock->pBlockAgg[pCol->slotId]; + if (pBlock->pBlockAgg) pColAgg = &pBlock->pBlockAgg[pCol->slotId]; if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) { if (isNull[i] != 1) return 1; @@ -2403,7 +2403,7 @@ int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId); if (pCol->slotId > pBlock->pDataBlock->size) continue; - if (pBlock->pBlockAgg) pColAgg = pBlock->pBlockAgg[pCol->slotId]; + if (pBlock->pBlockAgg) pColAgg = &pBlock->pBlockAgg[pCol->slotId]; if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) { isNull[i] = 1; diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index 43c04ca8d9..9326bfec43 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -434,8 +434,8 @@ void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pB if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) { int32_t slotId = pFuncParam->pCol->slotId; - pInput->pColumnDataAgg[j] = pBlock->pBlockAgg[slotId]; - if (pInput->pColumnDataAgg[j] == NULL) { + pInput->pColumnDataAgg[j] = &pBlock->pBlockAgg[slotId]; + if (pInput->pColumnDataAgg[j]->colId == -1) { pInput->colDataSMAIsSet = false; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 9a31e993b2..710db0045d 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -131,7 +131,7 @@ static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlo SColumn* pCol = taosArrayGet(pGroupCols, i); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId); if (pBlock->pBlockAgg != NULL) { - pColAgg = pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? + pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? } bool isNull = colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg); @@ -187,7 +187,7 @@ static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSData } if (pBlock->pBlockAgg != NULL) { - pColAgg = pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? + pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? } SGroupKeys* pkey = taosArrayGet(pGroupColVals, i); @@ -563,6 +563,55 @@ _error: return NULL; } +SSDataBlock* createBlockDataNotLoaded(const SOperatorInfo* pOperator, SSDataBlock* pDataBlock) { + if (pDataBlock == NULL) { + return NULL; + } + + SSDataBlock* pDstBlock = createDataBlock(); + pDstBlock->info = pDataBlock->info; + pDstBlock->info.id.blockId = pOperator->resultDataBlockId; + pDstBlock->info.capacity = 0; + pDstBlock->info.rowSize = 0; + + size_t numOfCols = pOperator->exprSupp.numOfExprs; + if (pDataBlock->pBlockAgg) { + pDstBlock->pBlockAgg = taosMemoryCalloc(numOfCols, sizeof(SColumnDataAgg)); + if (pDstBlock->pBlockAgg == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + blockDataDestroy(pDstBlock); + return NULL; + } + for(int i = 0; i < numOfCols; ++i) { + pDstBlock->pBlockAgg[i].colId = -1; + } + } + + for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { + SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i]; + int32_t slotId = pExpr->base.pParam[0].pCol->slotId; + SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, slotId); + SColumnInfoData colInfo = {.hasNull = true, .info = pSrc->info}; + blockDataAppendColInfo(pDstBlock, &colInfo); + + SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i); + if (pDataBlock->pBlockAgg && pDataBlock->pBlockAgg[slotId].colId != -1) { + pDstBlock->pBlockAgg[i] = pDataBlock->pBlockAgg[slotId]; + } else { + int32_t code = doEnsureCapacity(pDst, &pDstBlock->info, pDataBlock->info.rows, false); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + blockDataDestroy(pDstBlock); + return NULL; + } + + colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info); + } + } + + return pDstBlock; +} + static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { SPartitionOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -584,71 +633,86 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { pGroupInfo->groupId = calcGroupId(pInfo->keyBuf, len); } - // number of rows - int32_t* rows = (int32_t*)pPage; + if (pBlock->info.dataLoad) { + // number of rows + int32_t* rows = (int32_t*)pPage; - size_t numOfCols = pOperator->exprSupp.numOfExprs; - for (int32_t i = 0; i < numOfCols; ++i) { - SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i]; - int32_t slotId = pExpr->base.pParam[0].pCol->slotId; + size_t numOfCols = pOperator->exprSupp.numOfExprs; + for (int32_t i = 0; i < numOfCols; ++i) { + SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i]; + int32_t slotId = pExpr->base.pParam[0].pCol->slotId; - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); - int32_t bytes = pColInfoData->info.bytes; - int32_t startOffset = pInfo->columnOffset[i]; + int32_t bytes = pColInfoData->info.bytes; + int32_t startOffset = pInfo->columnOffset[i]; - int32_t* columnLen = NULL; - int32_t contentLen = 0; + int32_t* columnLen = NULL; + int32_t contentLen = 0; - if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { - int32_t* offset = (int32_t*)((char*)pPage + startOffset); - columnLen = (int32_t*)((char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity); - char* data = (char*)((char*)columnLen + sizeof(int32_t)); + if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { + int32_t* offset = (int32_t*)((char*)pPage + startOffset); + columnLen = (int32_t*)((char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity); + char* data = (char*)((char*)columnLen + sizeof(int32_t)); - if (colDataIsNull_s(pColInfoData, j)) { - offset[(*rows)] = -1; - contentLen = 0; - } else if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) { - offset[*rows] = (*columnLen); - char* src = colDataGetData(pColInfoData, j); - int32_t dataLen = getJsonValueLen(src); + if (colDataIsNull_s(pColInfoData, j)) { + offset[(*rows)] = -1; + contentLen = 0; + } else if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) { + offset[*rows] = (*columnLen); + char* src = colDataGetData(pColInfoData, j); + int32_t dataLen = getJsonValueLen(src); - memcpy(data + (*columnLen), src, dataLen); - int32_t v = (data + (*columnLen) + dataLen - (char*)pPage); - ASSERT(v > 0); + memcpy(data + (*columnLen), src, dataLen); + int32_t v = (data + (*columnLen) + dataLen - (char*)pPage); + ASSERT(v > 0); - contentLen = dataLen; + contentLen = dataLen; + } else { + offset[*rows] = (*columnLen); + char* src = colDataGetData(pColInfoData, j); + memcpy(data + (*columnLen), src, varDataTLen(src)); + int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage); + ASSERT(v > 0); + + contentLen = varDataTLen(src); + } } else { - offset[*rows] = (*columnLen); - char* src = colDataGetData(pColInfoData, j); - memcpy(data + (*columnLen), src, varDataTLen(src)); - int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage); - ASSERT(v > 0); + char* bitmap = (char*)pPage + startOffset; + columnLen = (int32_t*)((char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity)); + char* data = (char*)columnLen + sizeof(int32_t); - contentLen = varDataTLen(src); + bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j); + if (isNull) { + colDataSetNull_f(bitmap, (*rows)); + } else { + memcpy(data + (*columnLen), colDataGetData(pColInfoData, j), bytes); + ASSERT((data + (*columnLen) + bytes - (char*)pPage) <= getBufPageSize(pInfo->pBuf)); + } + contentLen = bytes; } - } else { - char* bitmap = (char*)pPage + startOffset; - columnLen = (int32_t*)((char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity)); - char* data = (char*)columnLen + sizeof(int32_t); - bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j); - if (isNull) { - colDataSetNull_f(bitmap, (*rows)); - } else { - memcpy(data + (*columnLen), colDataGetData(pColInfoData, j), bytes); - ASSERT((data + (*columnLen) + bytes - (char*)pPage) <= getBufPageSize(pInfo->pBuf)); - } - contentLen = bytes; + (*columnLen) += contentLen; } - (*columnLen) += contentLen; + (*rows) += 1; + + setBufPageDirty(pPage, true); + releaseBufPage(pInfo->pBuf, pPage); + } else { + SSDataBlock* dataNotLoadBlock = createBlockDataNotLoaded(pOperator, pBlock); + if (dataNotLoadBlock == NULL) { + T_LONG_JMP(pTaskInfo->env, terrno); + } + if (pGroupInfo->blockForNotLoaded == NULL) { + pGroupInfo->blockForNotLoaded = taosArrayInit(0, sizeof(SSDataBlock*)); + pGroupInfo->offsetForNotLoaded = 0; + } + dataNotLoadBlock->info.id.groupId = pGroupInfo->groupId; + dataNotLoadBlock->info.dataLoad = 0; + taosArrayPush(pGroupInfo->blockForNotLoaded, &dataNotLoadBlock); + break; } - - (*rows) += 1; - - setBufPageDirty(pPage, true); - releaseBufPage(pInfo->pBuf, pPage); } } @@ -730,6 +794,14 @@ static void clearPartitionOperator(SPartitionOperatorInfo* pInfo) { int32_t size = taosArrayGetSize(pInfo->sortedGroupArray); for (int32_t i = 0; i < size; i++) { SDataGroupInfo* pGp = taosArrayGet(pInfo->sortedGroupArray, i); + if (pGp->blockForNotLoaded) { + for (int32_t i = 0; i < pGp->blockForNotLoaded->size; i++) { + SSDataBlock** pBlock = taosArrayGet(pGp->blockForNotLoaded, i); + blockDataDestroy(*pBlock); + } + taosArrayClear(pGp->blockForNotLoaded); + pGp->offsetForNotLoaded = 0; + } taosArrayDestroy(pGp->pPageList); } taosArrayClear(pInfo->sortedGroupArray); @@ -747,6 +819,15 @@ static int compareDataGroupInfo(const void* group1, const void* group2) { return (pGroupInfo1->groupId < pGroupInfo2->groupId) ? -1 : 1; } +static SSDataBlock* buildPartitionResultForNotLoadBlock(SDataGroupInfo* pGroupInfo) { + if (pGroupInfo->blockForNotLoaded && pGroupInfo->offsetForNotLoaded < pGroupInfo->blockForNotLoaded->size) { + SSDataBlock** pBlock = taosArrayGet(pGroupInfo->blockForNotLoaded, pGroupInfo->offsetForNotLoaded); + pGroupInfo->offsetForNotLoaded++; + return *pBlock; + } + return NULL; +} + static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { SPartitionOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -756,13 +837,17 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { SDataGroupInfo* pGroupInfo = (pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL; if (pInfo->groupIndex == -1 || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) { + if(pGroupInfo != NULL) { + SSDataBlock* ret = buildPartitionResultForNotLoadBlock(pGroupInfo); + if(ret != NULL) return ret; + } // try next group data - ++pInfo->groupIndex; - if (pInfo->groupIndex >= taosArrayGetSize(pInfo->sortedGroupArray)) { + if (pInfo->groupIndex + 1 >= taosArrayGetSize(pInfo->sortedGroupArray)) { setOperatorCompleted(pOperator); clearPartitionOperator(pInfo); return NULL; } + ++pInfo->groupIndex; pGroupInfo = taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex); pInfo->pageIndex = 0; @@ -774,6 +859,20 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo)); T_LONG_JMP(pTaskInfo->env, terrno); } + if (*(int32_t*)page == 0) { + releaseBufPage(pInfo->pBuf, page); + SSDataBlock* ret = buildPartitionResultForNotLoadBlock(pGroupInfo); + if (ret != NULL) return ret; + if (pInfo->groupIndex + 1 < taosArrayGetSize(pInfo->sortedGroupArray)) { + pInfo->groupIndex++; + pInfo->pageIndex = 0; + } else { + setOperatorCompleted(pOperator); + clearPartitionOperator(pInfo); + return NULL; + } + return buildPartitionResult(pOperator); + } blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->rowCapacity); blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity); @@ -783,6 +882,8 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { pInfo->binfo.pRes->info.id.groupId = pGroupInfo->groupId; pInfo->binfo.pRes->info.dataLoad = 1; pInfo->orderedRows = 0; + } else if (pInfo->pOrderInfoArr == NULL) { + qError("Exception, remainRows not zero, but pOrderInfoArr is NULL"); } if (pInfo->pOrderInfoArr) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ec40bceb5e..1b90088605 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -220,7 +220,7 @@ static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* return code; } -static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg** pColsAgg, int32_t numOfCols, +static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg* pColsAgg, int32_t numOfCols, int32_t numOfRows) { if (pColsAgg == NULL || pFilterInfo == NULL) { return true; @@ -769,6 +769,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { SSDataBlock* pBlock = pTableScanInfo->pResBlock; bool hasNext = false; int32_t code = TSDB_CODE_SUCCESS; + pBlock->info.dataLoad = false; int64_t st = taosGetTimestampUs(); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index b72811cdcc..7a70a3c2a0 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -911,7 +911,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI struct SColumnDataAgg* pAgg = NULL; for (int32_t j = 0; j < pBlock->info.rows; ++j) { - pAgg = (pBlock->pBlockAgg != NULL) ? pBlock->pBlockAgg[pInfo->stateCol.slotId] : NULL; + pAgg = (pBlock->pBlockAgg != NULL) ? &pBlock->pBlockAgg[pInfo->stateCol.slotId] : NULL; if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) { continue; } diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 8ab388830f..d9bcc954a4 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -651,7 +651,7 @@ int32_t tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock, leftNull = colDataIsNull_t(pLeftColInfoData, leftRowIndex, isVarType); } else { leftNull = - colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, leftRowIndex, pLeftBlock->pBlockAgg[pOrder->slotId]); + colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, leftRowIndex, &pLeftBlock->pBlockAgg[pOrder->slotId]); } } @@ -661,7 +661,7 @@ int32_t tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock, rightNull = colDataIsNull_t(pRightColInfoData, rightRowIndex, isVarType); } else { rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, rightRowIndex, - pRightBlock->pBlockAgg[pOrder->slotId]); + &pRightBlock->pBlockAgg[pOrder->slotId]); } } @@ -742,7 +742,7 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) { leftNull = colDataIsNull_t(pLeftColInfoData, pLeftSource->src.rowIndex, isVarType); } else { leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, - pLeftBlock->pBlockAgg[i]); + &pLeftBlock->pBlockAgg[i]); } } @@ -752,7 +752,7 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) { rightNull = colDataIsNull_t(pRightColInfoData, pRightSource->src.rowIndex, isVarType); } else { rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, - pRightBlock->pBlockAgg[i]); + &pRightBlock->pBlockAgg[i]); } } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 699865a9bf..1aa92479b9 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3631,7 +3631,7 @@ int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* SColumnInfoData* pColInfo = taosArrayGet(pSrcBlock->pDataBlock, pCtx->saveHandle.pState->tsIndex); ASSERT(pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP); key.groupId = pSrcBlock->info.id.groupId; - key.ts = *(int64_t*)colDataGetData(pColInfo, rowIndex);; + key.ts = *(int64_t*)colDataGetData(pColInfo, rowIndex); } char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf); diff --git a/source/libs/function/src/detail/tminmax.c b/source/libs/function/src/detail/tminmax.c index a6c91a57ce..653b8adad7 100644 --- a/source/libs/function/src/detail/tminmax.c +++ b/source/libs/function/src/detail/tminmax.c @@ -702,23 +702,16 @@ static void doExtractVal(SColumnInfoData* pCol, int32_t i, int32_t end, SqlFunct } } -static int32_t saveRelatedTuple(SqlFunctionCtx* pCtx, SInputColumnInfoData* pInput, int32_t index, void* tval) { +static int32_t saveRelatedTupleTag(SqlFunctionCtx* pCtx, SInputColumnInfoData* pInput, void* tval) { SColumnInfoData* pCol = pInput->pData[0]; SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SMinmaxResInfo* pBuf = GET_ROWCELL_INTERBUF(pResInfo); - int32_t code = 0; + int32_t code = TSDB_CODE_SUCCESS; if (pCtx->subsidiaries.num > 0) { - index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); - if (index >= 0) { - code = saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } + code = saveTupleData(pCtx, 0, pCtx->pSrcBlock, &pBuf->tuplePos); } - return code; } @@ -758,7 +751,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems) pBuf->v = GET_INT64_VAL(tval); } - code = saveRelatedTuple(pCtx, pInput, index, tval); + code = saveRelatedTupleTag(pCtx, pInput, tval); } else { if (IS_SIGNED_NUMERIC_TYPE(type)) { int64_t prev = 0; @@ -767,7 +760,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems) int64_t val = GET_INT64_VAL(tval); if ((prev < val) ^ isMinFunc) { GET_INT64_VAL(&pBuf->v) = val; - code = saveRelatedTuple(pCtx, pInput, index, tval); + code = saveRelatedTupleTag(pCtx, pInput, tval); } } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { uint64_t prev = 0; @@ -776,7 +769,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems) uint64_t val = GET_UINT64_VAL(tval); if ((prev < val) ^ isMinFunc) { GET_UINT64_VAL(&pBuf->v) = val; - code = saveRelatedTuple(pCtx, pInput, index, tval); + code = saveRelatedTupleTag(pCtx, pInput, tval); } } else if (type == TSDB_DATA_TYPE_DOUBLE) { double prev = 0; @@ -785,7 +778,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems) double val = GET_DOUBLE_VAL(tval); if ((prev < val) ^ isMinFunc) { GET_DOUBLE_VAL(&pBuf->v) = val; - code = saveRelatedTuple(pCtx, pInput, index, tval); + code = saveRelatedTupleTag(pCtx, pInput, tval); } } else if (type == TSDB_DATA_TYPE_FLOAT) { float prev = 0; @@ -794,7 +787,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems) float val = GET_DOUBLE_VAL(tval); if ((prev < val) ^ isMinFunc) { GET_FLOAT_VAL(&pBuf->v) = val; - code = saveRelatedTuple(pCtx, pInput, index, tval); + code = saveRelatedTupleTag(pCtx, pInput, tval); } } } diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index b9cf5d48f0..72e38c7a0d 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -3782,7 +3782,7 @@ int32_t fltSclBuildRangeFromBlockSma(SFltSclColumnRange *colRange, SColumnDataAg return TSDB_CODE_SUCCESS; } -bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg **pDataStatis, int32_t numOfCols, int32_t numOfRows) { +bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t numOfCols, int32_t numOfRows) { if (info->scalarMode) { SArray *colRanges = info->sclCtx.fltSclRange; for (int32_t i = 0; i < taosArrayGetSize(colRanges); ++i) { @@ -3790,13 +3790,13 @@ bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg **pDataStatis, int32_t bool foundCol = false; int32_t j = 0; for (; j < numOfCols; ++j) { - if (pDataStatis[j] != NULL && pDataStatis[j]->colId == colRange->colNode->colId) { + if (pDataStatis[j].colId == colRange->colNode->colId) { foundCol = true; break; } } if (foundCol) { - SColumnDataAgg *pAgg = pDataStatis[j]; + SColumnDataAgg *pAgg = &pDataStatis[j]; SArray *points = taosArrayInit(2, sizeof(SFltSclPoint)); fltSclBuildRangeFromBlockSma(colRange, pAgg, numOfRows, points); qDebug("column data agg: nulls %d, rows %d, max %" PRId64 " min %" PRId64, pAgg->numOfNull, numOfRows, @@ -3833,7 +3833,7 @@ bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg **pDataStatis, int32_t int32_t index = -1; SFilterRangeCtx *ctx = info->colRange[k]; for (int32_t i = 0; i < numOfCols; ++i) { - if (pDataStatis[i] != NULL && pDataStatis[i]->colId == ctx->colId) { + if (pDataStatis[i].colId == ctx->colId) { index = i; break; } @@ -3849,13 +3849,13 @@ bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg **pDataStatis, int32_t break; } - if (pDataStatis[index]->numOfNull <= 0) { + if (pDataStatis[index].numOfNull <= 0) { if (ctx->isnull && !ctx->notnull && !ctx->isrange) { ret = false; break; } - } else if (pDataStatis[index]->numOfNull > 0) { - if (pDataStatis[index]->numOfNull == numOfRows) { + } else if (pDataStatis[index].numOfNull > 0) { + if (pDataStatis[index].numOfNull == numOfRows) { if ((ctx->notnull || ctx->isrange) && (!ctx->isnull)) { ret = false; break; @@ -3869,7 +3869,7 @@ bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg **pDataStatis, int32_t } } - SColumnDataAgg *pDataBlockst = pDataStatis[index]; + SColumnDataAgg *pDataBlockst = &pDataStatis[index]; SFilterRangeNode *r = ctx->rs; float minv = 0; diff --git a/tests/army/test.py b/tests/army/test.py index dda5d7d5b0..332d7f29c4 100644 --- a/tests/army/test.py +++ b/tests/army/test.py @@ -657,8 +657,10 @@ if __name__ == "__main__": conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath()) if fileName == "all": + tdLog.info("Procedures for testing runAllLinux") tdCases.runAllLinux(conn) else: + tdLog.info(f"Procedures for testing runOneLinux {fileName}") tdCases.runOneLinux(conn, fileName, replicaVar) # do restart option diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 307260cf6a..6114a560dc 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -536,6 +536,8 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/mavg.py -R ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_partition.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_partition.py -R +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_limit_interval.py +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_limit_interval.py -R ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_min_last_interval.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row_interval.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max.py @@ -780,6 +782,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_null.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_partition.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_partition.py -Q 2 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_limit_interval.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_min_last_interval.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row_interval.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row.py -Q 2 @@ -875,6 +878,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_null.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_partition.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_partition.py -Q 3 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_limit_interval.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_min_last_interval.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row_interval.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row.py -Q 3 @@ -972,6 +976,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_null.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_partition.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_partition.py -Q 4 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_limit_interval.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_min_last_interval.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row_interval.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row.py -Q 4 diff --git a/tests/system-test/2-query/partition_limit_interval.py b/tests/system-test/2-query/partition_limit_interval.py new file mode 100755 index 0000000000..dc0aabbfdd --- /dev/null +++ b/tests/system-test/2-query/partition_limit_interval.py @@ -0,0 +1,105 @@ +from util.log import * +from util.sql import * +from util.cases import * + +class TDTestCase: + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), True) + + self.row_nums = 1000 + self.tb_nums = 10 + self.ts = 1537146000000 + self.dbname = "db1" + self.stable = "meters" + + def prepare_datas(self, stb_name , tb_nums , row_nums, dbname="db" ): + tdSql.execute(f'''create database {self.dbname} MAXROWS 4096 MINROWS 100''') + tdSql.execute(f'''use {self.dbname}''') + tdSql.execute(f'''CREATE STABLE {self.dbname}.{self.stable} (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT) TAGS (`groupid` TINYINT, `location` VARCHAR(16))''') + + for i in range(self.tb_nums): + tbname = f"{self.dbname}.sub_{self.stable}_{i}" + ts = self.ts + i*10000 + tdSql.execute(f"create table {tbname} using {self.dbname}.{self.stable} tags({i} ,'nchar_{i}')") + tdLog.info(f"create table {tbname} using {self.dbname}.{self.stable} tags({i} ,'nchar_{i}')") + if i < (self.tb_nums - 2): + for row in range(row_nums): + ts = self.ts + row*1000 + tdSql.execute(f"insert into {tbname} values({ts} , {row/10}, {215 + (row % 100)})") + + for null in range(5): + ts = self.ts + row_nums*1000 + null*1000 + tdSql.execute(f"insert into {tbname} values({ts} , NULL , NULL)") + + def basic_query(self): + tdSql.query(f"select groupid, count(*) from {self.dbname}.{self.stable} partition by groupid interval(1d) limit 100") + tdSql.checkRows(8) + tdSql.checkData(0, 1, 1005) + + tdSql.query(f"select groupid, count(*) from {self.dbname}.{self.stable} partition by tbname interval(1d) order by groupid limit 100;") + tdSql.checkRows(8) + tdSql.checkData(0, 0, 0) + tdSql.checkData(0, 1, 1005) + tdSql.checkData(7, 0, 7) + tdSql.checkData(7, 1, 1005) + + tdSql.query(f"select groupid, count(*) from {self.dbname}.{self.stable} partition by tbname, groupid interval(5d) order by groupid limit 10") + tdSql.checkRows(8) + tdSql.checkData(0, 0, 0) + tdSql.checkData(0, 1, 1005) + tdSql.checkData(7, 0, 7) + tdSql.checkData(7, 1, 1005) + + tdSql.query(f"select groupid, count(*), min(current) from {self.dbname}.{self.stable} partition by groupid interval(5d) order by groupid limit 10;") + tdSql.checkRows(8) + tdSql.checkData(0, 0, 0) + tdSql.checkData(0, 1, 1005) + tdSql.checkData(0, 2, 0) + tdSql.checkData(7, 0, 7) + tdSql.checkData(7, 1, 1005) + tdSql.checkData(7, 2, 0) + + tdSql.query(f"select groupid, min(current) from {self.dbname}.{self.stable} partition by groupid interval(5d) limit 100;") + tdSql.checkRows(8) + tdSql.checkData(0, 1, 0) + + tdSql.query(f"select groupid, avg(current) from {self.dbname}.{self.stable} partition by groupid interval(5d) limit 10000;") + tdSql.checkRows(8) + tdSql.checkData(0, 1, tdSql.getData(7, 1)) + + tdSql.query(f"select current, avg(current) from {self.dbname}.{self.stable} partition by current interval(5d) limit 100;") + tdSql.checkData(0, 0, tdSql.getData(0, 1)) + + tdSql.query(f"select groupid, last(voltage), min(current) from {self.dbname}.{self.stable} partition by groupid interval(5d) limit 10") + tdSql.checkRows(8) + tdSql.checkData(0, 1, tdSql.getData(7, 1)) + tdSql.checkData(0, 2, tdSql.getData(7, 2)) + + tdSql.query(f"select groupid, min(current), min(voltage) from {self.dbname}.{self.stable} partition by tbname, groupid interval(5d) limit 100;") + tdSql.checkRows(8) + tdSql.checkData(0, 1, 0) + tdSql.checkData(0, 2, 215) + tdSql.checkData(7, 1, 0) + tdSql.checkData(7, 2, 215) + + tdSql.query(f"select groupid, min(voltage), min(current) from {self.dbname}.{self.stable} partition by tbname, groupid interval(5d) limit 100;") + tdSql.checkRows(8) + tdSql.checkData(0, 2, 0) + tdSql.checkData(0, 1, 215) + tdSql.checkData(7, 2, 0) + tdSql.checkData(7, 1, 215) + + def run(self): + tdSql.prepare() + self.prepare_datas("stb",self.tb_nums,self.row_nums) + self.basic_query() + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/system-test/runAllOne.sh b/tests/system-test/runAllOne.sh index 099ae1bbd3..79fc2cd363 100644 --- a/tests/system-test/runAllOne.sh +++ b/tests/system-test/runAllOne.sh @@ -235,6 +235,8 @@ python3 ./test.py -f 2-query/mavg.py -P python3 ./test.py -f 2-query/mavg.py -P -R python3 ./test.py -f 2-query/max_partition.py -P python3 ./test.py -f 2-query/max_partition.py -P -R +python3 ./test.py -f 2-query/partition_limit_interval.py -P +python3 ./test.py -f 2-query/partition_limit_interval.py -P -R python3 ./test.py -f 2-query/max_min_last_interval.py -P python3 ./test.py -f 2-query/last_row_interval.py -P python3 ./test.py -f 2-query/max.py -P @@ -481,6 +483,7 @@ python3 ./test.py -f 2-query/irate.py -P -Q 2 python3 ./test.py -f 2-query/function_null.py -P -Q 2 python3 ./test.py -f 2-query/count_partition.py -P -Q 2 python3 ./test.py -f 2-query/max_partition.py -P -Q 2 +python3 ./test.py -f 2-query/partition_limit_interval.py -P -Q 2 python3 ./test.py -f 2-query/max_min_last_interval.py -P -Q 2 python3 ./test.py -f 2-query/last_row_interval.py -P -Q 2 python3 ./test.py -f 2-query/last_row.py -P -Q 2 @@ -576,6 +579,7 @@ python3 ./test.py -f 2-query/irate.py -P -Q 3 python3 ./test.py -f 2-query/function_null.py -P -Q 3 python3 ./test.py -f 2-query/count_partition.py -P -Q 3 python3 ./test.py -f 2-query/max_partition.py -P -Q 3 +python3 ./test.py -f 2-query/partition_limit_interval.py -P -Q 3 python3 ./test.py -f 2-query/max_min_last_interval.py -P -Q 3 python3 ./test.py -f 2-query/last_row_interval.py -P -Q 3 python3 ./test.py -f 2-query/last_row.py -P -Q 3 @@ -673,6 +677,7 @@ python3 ./test.py -f 2-query/irate.py -P -Q 4 python3 ./test.py -f 2-query/function_null.py -P -Q 4 python3 ./test.py -f 2-query/count_partition.py -P -Q 4 python3 ./test.py -f 2-query/max_partition.py -P -Q 4 +python3 ./test.py -f 2-query/partition_limit_interval.py -P -Q 4 python3 ./test.py -f 2-query/max_min_last_interval.py -P -Q 4 python3 ./test.py -f 2-query/last_row_interval.py -P -Q 4 python3 ./test.py -f 2-query/last_row.py -P -Q 4