From a6217eec0323abdffb33de188e532cf8f3d7605a Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Fri, 31 May 2024 18:58:35 +0800 Subject: [PATCH] partition interval and limimt, dataload error --- source/libs/executor/src/groupoperator.c | 148 ++++++++++++++--------- source/libs/executor/src/scanoperator.c | 1 + 2 files changed, 94 insertions(+), 55 deletions(-) diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 9a31e993b2..c6a7804b9c 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -53,6 +53,8 @@ typedef struct SPartitionOperatorInfo { int32_t rowCapacity; // maximum number of rows for each buffer page int32_t* columnOffset; // start position for each column data SArray* sortedGroupArray; // SDataGroupInfo sorted by group id + SArray* blockForNotLoaded; // SSDataBlock that data is not loaded + int32_t offsetForNotLoaded;// read offset for SSDataBlock that data is not loaded int32_t groupIndex; // group index int32_t pageIndex; // page index of current group SExprSupp scalarSup; @@ -584,71 +586,86 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { pGroupInfo->groupId = calcGroupId(pInfo->keyBuf, len); } - // number of rows - int32_t* rows = (int32_t*)pPage; + if (pBlock->info.dataLoad) { + // number of rows + int32_t* rows = (int32_t*)pPage; - size_t numOfCols = pOperator->exprSupp.numOfExprs; - for (int32_t i = 0; i < numOfCols; ++i) { - SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i]; - int32_t slotId = pExpr->base.pParam[0].pCol->slotId; + size_t numOfCols = pOperator->exprSupp.numOfExprs; + for (int32_t i = 0; i < numOfCols; ++i) { + SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i]; + int32_t slotId = pExpr->base.pParam[0].pCol->slotId; - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); - int32_t bytes = pColInfoData->info.bytes; - int32_t startOffset = pInfo->columnOffset[i]; + int32_t bytes = pColInfoData->info.bytes; + int32_t startOffset = pInfo->columnOffset[i]; - int32_t* columnLen = NULL; - int32_t contentLen = 0; + int32_t* columnLen = NULL; + int32_t contentLen = 0; - if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { - int32_t* offset = (int32_t*)((char*)pPage + startOffset); - columnLen = (int32_t*)((char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity); - char* data = (char*)((char*)columnLen + sizeof(int32_t)); + if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { + int32_t* offset = (int32_t*)((char*)pPage + startOffset); + columnLen = (int32_t*)((char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity); + char* data = (char*)((char*)columnLen + sizeof(int32_t)); - if (colDataIsNull_s(pColInfoData, j)) { - offset[(*rows)] = -1; - contentLen = 0; - } else if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) { - offset[*rows] = (*columnLen); - char* src = colDataGetData(pColInfoData, j); - int32_t dataLen = getJsonValueLen(src); + if (colDataIsNull_s(pColInfoData, j)) { + offset[(*rows)] = -1; + contentLen = 0; + } else if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) { + offset[*rows] = (*columnLen); + char* src = colDataGetData(pColInfoData, j); + int32_t dataLen = getJsonValueLen(src); - memcpy(data + (*columnLen), src, dataLen); - int32_t v = (data + (*columnLen) + dataLen - (char*)pPage); - ASSERT(v > 0); + memcpy(data + (*columnLen), src, dataLen); + int32_t v = (data + (*columnLen) + dataLen - (char*)pPage); + ASSERT(v > 0); - contentLen = dataLen; + contentLen = dataLen; + } else { + offset[*rows] = (*columnLen); + char* src = colDataGetData(pColInfoData, j); + memcpy(data + (*columnLen), src, varDataTLen(src)); + int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage); + ASSERT(v > 0); + + contentLen = varDataTLen(src); + } } else { - offset[*rows] = (*columnLen); - char* src = colDataGetData(pColInfoData, j); - memcpy(data + (*columnLen), src, varDataTLen(src)); - int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage); - ASSERT(v > 0); + char* bitmap = (char*)pPage + startOffset; + columnLen = (int32_t*)((char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity)); + char* data = (char*)columnLen + sizeof(int32_t); - contentLen = varDataTLen(src); + bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j); + if (isNull) { + colDataSetNull_f(bitmap, (*rows)); + } else { + memcpy(data + (*columnLen), colDataGetData(pColInfoData, j), bytes); + ASSERT((data + (*columnLen) + bytes - (char*)pPage) <= getBufPageSize(pInfo->pBuf)); + } + contentLen = bytes; } - } else { - char* bitmap = (char*)pPage + startOffset; - columnLen = (int32_t*)((char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity)); - char* data = (char*)columnLen + sizeof(int32_t); - bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j); - if (isNull) { - colDataSetNull_f(bitmap, (*rows)); - } else { - memcpy(data + (*columnLen), colDataGetData(pColInfoData, j), bytes); - ASSERT((data + (*columnLen) + bytes - (char*)pPage) <= getBufPageSize(pInfo->pBuf)); - } - contentLen = bytes; + (*columnLen) += contentLen; } - (*columnLen) += contentLen; + (*rows) += 1; + + setBufPageDirty(pPage, true); + releaseBufPage(pInfo->pBuf, pPage); + } else { + SSDataBlock* dataNotLoadBlock = createOneDataBlock(pBlock, true); + if (dataNotLoadBlock == NULL) { + T_LONG_JMP(pTaskInfo->env, terrno); + } + if (pInfo->blockForNotLoaded == NULL) { + pInfo->blockForNotLoaded = taosArrayInit(1, sizeof(SSDataBlock)); + pInfo->offsetForNotLoaded = 0; + } + dataNotLoadBlock->info.id.groupId = pGroupInfo->groupId; + dataNotLoadBlock->info.dataLoad = 0; + taosArrayInsert(pInfo->blockForNotLoaded, pInfo->blockForNotLoaded->size, &dataNotLoadBlock); + break; } - - (*rows) += 1; - - setBufPageDirty(pPage, true); - releaseBufPage(pInfo->pBuf, pPage); } } @@ -734,6 +751,14 @@ static void clearPartitionOperator(SPartitionOperatorInfo* pInfo) { } taosArrayClear(pInfo->sortedGroupArray); clearDiskbasedBuf(pInfo->pBuf); + if (pInfo->blockForNotLoaded) { + for (int32_t i = 0; i < pInfo->blockForNotLoaded->size; i++) { + SSDataBlock** pBlock = taosArrayGet(pInfo->blockForNotLoaded, i); + blockDataDestroy(*pBlock); + } + taosArrayClear(pInfo->blockForNotLoaded); + pInfo->offsetForNotLoaded = 0; + } } static int compareDataGroupInfo(const void* group1, const void* group2) { @@ -747,6 +772,21 @@ static int compareDataGroupInfo(const void* group1, const void* group2) { return (pGroupInfo1->groupId < pGroupInfo2->groupId) ? -1 : 1; } +static SSDataBlock* buildPartitionResultForNotLoadBlock(SOperatorInfo* pOperator) { + SPartitionOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + if (pInfo->blockForNotLoaded && pInfo->offsetForNotLoaded < pInfo->blockForNotLoaded->size) { + SSDataBlock** pBlock = taosArrayGet(pInfo->blockForNotLoaded, pInfo->offsetForNotLoaded); + pInfo->offsetForNotLoaded++; + return *pBlock; + } else { + setOperatorCompleted(pOperator); + clearPartitionOperator(pInfo); + return NULL; + } +} + static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { SPartitionOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -757,12 +797,10 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { (pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL; if (pInfo->groupIndex == -1 || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) { // try next group data - ++pInfo->groupIndex; - if (pInfo->groupIndex >= taosArrayGetSize(pInfo->sortedGroupArray)) { - setOperatorCompleted(pOperator); - clearPartitionOperator(pInfo); - return NULL; + if (pInfo->groupIndex + 1 >= taosArrayGetSize(pInfo->sortedGroupArray)) { + return buildPartitionResultForNotLoadBlock(pOperator); } + ++pInfo->groupIndex; pGroupInfo = taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex); pInfo->pageIndex = 0; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 110aabf9b1..fb86e73a64 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -769,6 +769,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { SSDataBlock* pBlock = pTableScanInfo->pResBlock; bool hasNext = false; int32_t code = TSDB_CODE_SUCCESS; + pBlock->info.dataLoad = false; int64_t st = taosGetTimestampUs();