fix group count

This commit is contained in:
“happyguoxy” 2024-06-07 16:49:29 +08:00
parent 7a8e87f8cd
commit 6d98a56778
3 changed files with 66 additions and 51 deletions

View File

@ -624,6 +624,8 @@ typedef struct SDataGroupInfo {
uint64_t groupId;
int64_t numOfRows;
SArray* pPageList;
SArray* blockForNotLoaded; // SSDataBlock that data is not loaded
int32_t offsetForNotLoaded; // read offset for SSDataBlock that data is not loaded
} SDataGroupInfo;
typedef struct SWindowRowsSup {

View File

@ -53,8 +53,6 @@ typedef struct SPartitionOperatorInfo {
int32_t rowCapacity; // maximum number of rows for each buffer page
int32_t* columnOffset; // start position for each column data
SArray* sortedGroupArray; // SDataGroupInfo sorted by group id
SArray* blockForNotLoaded; // SSDataBlock that data is not loaded
int32_t offsetForNotLoaded;// read offset for SSDataBlock that data is not loaded
int32_t groupIndex; // group index
int32_t pageIndex; // page index of current group
SExprSupp scalarSup;
@ -573,21 +571,14 @@ SSDataBlock* createBlockDataNotLoaded(const SOperatorInfo* pOperator, SSDataBloc
SSDataBlock* pDstBlock = createDataBlock();
pDstBlock->info = pDataBlock->info;
pDstBlock->info.rows = 0;
copyPkVal(&pDstBlock->info, &pDataBlock->info);
pDstBlock->info.rows = pDataBlock->info.rows;
pDstBlock->info.capacity = 0;
pDstBlock->info.rowSize = 0;
pDstBlock->info.id = pDataBlock->info.id;
pDstBlock->info.blankFill = pDataBlock->info.blankFill;
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
pDstBlock->pBlockAgg = taosMemoryCalloc(numOfCols, sizeof(SColumnDataAgg));
if (pDstBlock->pBlockAgg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
copyPkVal(&pDstBlock->info, &pDataBlock->info);
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i];
int32_t slotId = pExpr->base.pParam[0].pCol->slotId;
@ -605,24 +596,25 @@ SSDataBlock* createBlockDataNotLoaded(const SOperatorInfo* pOperator, SSDataBloc
return NULL;
}
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i];
int32_t slotId = pExpr->base.pParam[0].pCol->slotId;
if (slotId < numOfCols) {
pDstBlock->pBlockAgg[slotId] = pDataBlock->pBlockAgg[i];
pDstBlock->pBlockAgg[slotId].colId = i;
if (pDataBlock->pBlockAgg != NULL) {
pDstBlock->pBlockAgg = taosMemoryCalloc(numOfCols, sizeof(SColumnDataAgg));
if (pDstBlock->pBlockAgg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
blockDataDestroy(pDstBlock);
return NULL;
}
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i];
int32_t slotId = pExpr->base.pParam[0].pCol->slotId;
if (slotId < numOfCols) {
pDstBlock->pBlockAgg[slotId] = pDataBlock->pBlockAgg[i];
SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);
SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, slotId);
colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info);
}
}
// SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, slotId);
// if (pSrc) {
// SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i);
// colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info);
// }
}
pDstBlock->info.rows = pDataBlock->info.rows;
pDstBlock->info.capacity = pDataBlock->info.rows;
return pDstBlock;
}
@ -718,13 +710,14 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
if (dataNotLoadBlock == NULL) {
T_LONG_JMP(pTaskInfo->env, terrno);
}
if (pInfo->blockForNotLoaded == NULL) {
pInfo->blockForNotLoaded = taosArrayInit(1, sizeof(SSDataBlock));
pInfo->offsetForNotLoaded = 0;
if (pGroupInfo->blockForNotLoaded == NULL) {
pGroupInfo->blockForNotLoaded = taosArrayInit(1, sizeof(SSDataBlock));
pGroupInfo->offsetForNotLoaded = 0;
}
dataNotLoadBlock->info.id.groupId = pGroupInfo->groupId;
dataNotLoadBlock->info.dataLoad = 0;
taosArrayInsert(pInfo->blockForNotLoaded, pInfo->blockForNotLoaded->size, &dataNotLoadBlock);
pInfo->binfo.pRes->info.rows = pBlock->info.rows;
taosArrayInsert(pGroupInfo->blockForNotLoaded, pGroupInfo->blockForNotLoaded->size, &dataNotLoadBlock);
break;
}
}
@ -808,18 +801,18 @@ static void clearPartitionOperator(SPartitionOperatorInfo* pInfo) {
int32_t size = taosArrayGetSize(pInfo->sortedGroupArray);
for (int32_t i = 0; i < size; i++) {
SDataGroupInfo* pGp = taosArrayGet(pInfo->sortedGroupArray, i);
if (pGp->blockForNotLoaded) {
for (int32_t i = 0; i < pGp->blockForNotLoaded->size; i++) {
SSDataBlock** pBlock = taosArrayGet(pGp->blockForNotLoaded, i);
blockDataDestroy(*pBlock);
}
taosArrayClear(pGp->blockForNotLoaded);
pGp->offsetForNotLoaded = 0;
}
taosArrayDestroy(pGp->pPageList);
}
taosArrayClear(pInfo->sortedGroupArray);
clearDiskbasedBuf(pInfo->pBuf);
if (pInfo->blockForNotLoaded) {
for (int32_t i = 0; i < pInfo->blockForNotLoaded->size; i++) {
SSDataBlock** pBlock = taosArrayGet(pInfo->blockForNotLoaded, i);
blockDataDestroy(*pBlock);
}
taosArrayClear(pInfo->blockForNotLoaded);
pInfo->offsetForNotLoaded = 0;
}
}
static int compareDataGroupInfo(const void* group1, const void* group2) {
@ -833,19 +826,13 @@ static int compareDataGroupInfo(const void* group1, const void* group2) {
return (pGroupInfo1->groupId < pGroupInfo2->groupId) ? -1 : 1;
}
static SSDataBlock* buildPartitionResultForNotLoadBlock(SOperatorInfo* pOperator) {
SPartitionOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pInfo->blockForNotLoaded && pInfo->offsetForNotLoaded < pInfo->blockForNotLoaded->size) {
SSDataBlock** pBlock = taosArrayGet(pInfo->blockForNotLoaded, pInfo->offsetForNotLoaded);
pInfo->offsetForNotLoaded++;
static SSDataBlock* buildPartitionResultForNotLoadBlock(SDataGroupInfo* pGroupInfo) {
if (pGroupInfo->blockForNotLoaded && pGroupInfo->offsetForNotLoaded < pGroupInfo->blockForNotLoaded->size) {
SSDataBlock** pBlock = taosArrayGet(pGroupInfo->blockForNotLoaded, pGroupInfo->offsetForNotLoaded);
pGroupInfo->offsetForNotLoaded++;
return *pBlock;
} else {
setOperatorCompleted(pOperator);
clearPartitionOperator(pInfo);
return NULL;
}
return NULL;
}
static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
@ -857,9 +844,15 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
SDataGroupInfo* pGroupInfo =
(pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL;
if (pInfo->groupIndex == -1 || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) {
if(pGroupInfo != NULL) {
SSDataBlock* ret = buildPartitionResultForNotLoadBlock(pGroupInfo);
if(ret != NULL) return ret;
}
// try next group data
if (pInfo->groupIndex + 1 >= taosArrayGetSize(pInfo->sortedGroupArray)) {
return buildPartitionResultForNotLoadBlock(pOperator);
setOperatorCompleted(pOperator);
clearPartitionOperator(pInfo);
return NULL;
}
++pInfo->groupIndex;
@ -873,6 +866,22 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, terrno);
}
if (*(int32_t*)page == 0) {
SSDataBlock* ret = buildPartitionResultForNotLoadBlock(pGroupInfo);
if (ret != NULL) return ret;
releaseBufPage(pInfo->pBuf, page);
if (pInfo->pageIndex < taosArrayGetSize(pGroupInfo->pPageList)) {
pInfo->pageIndex += 1;
} else if (pInfo->groupIndex + 1 < taosArrayGetSize(pInfo->sortedGroupArray)) {
pInfo->groupIndex++;
pInfo->pageIndex = 0;
} else {
setOperatorCompleted(pOperator);
clearPartitionOperator(pInfo);
return NULL;
}
return buildPartitionResult(pOperator);
}
blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->rowCapacity);
blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity);
@ -882,6 +891,8 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
pInfo->binfo.pRes->info.id.groupId = pGroupInfo->groupId;
pInfo->binfo.pRes->info.dataLoad = 1;
pInfo->orderedRows = 0;
} else if (pInfo->pOrderInfoArr == NULL) {
qError("Exception, remainRows not zero, but pOrderInfoArr is NULL");
}
if (pInfo->pOrderInfoArr) {
@ -944,6 +955,8 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
while (pGroupIter != NULL) {
SDataGroupInfo* pGroupInfo = pGroupIter;
taosArrayPush(groupArray, pGroupInfo);
static int i = 0;
qInfo("groupArray push %p %p %d times", pGroupInfo, pGroupInfo->blockForNotLoaded, ++i);
pGroupIter = taosHashIterate(pInfo->pGroupSet, pGroupIter);
}

View File

@ -3628,7 +3628,7 @@ int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock*
SColumnInfoData* pColInfo = taosArrayGet(pSrcBlock->pDataBlock, pCtx->saveHandle.pState->tsIndex);
ASSERT(pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
key.groupId = pSrcBlock->info.id.groupId;
key.ts = *(int64_t*)colDataGetData(pColInfo, rowIndex);;
key.ts = *(int64_t*)colDataGetData(pColInfo, rowIndex);
}
char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf);