fix: code review

This commit is contained in:
facetosea 2024-06-13 14:13:44 +08:00
parent 5e4107df93
commit ff24eaf94d
1 changed files with 20 additions and 30 deletions

View File

@ -570,45 +570,39 @@ SSDataBlock* createBlockDataNotLoaded(const SOperatorInfo* pOperator, SSDataBloc
SSDataBlock* pDstBlock = createDataBlock(); SSDataBlock* pDstBlock = createDataBlock();
pDstBlock->info = pDataBlock->info; pDstBlock->info = pDataBlock->info;
pDstBlock->info.id = pOperator->resultDataBlockId;
pDstBlock->info.capacity = 0; pDstBlock->info.capacity = 0;
pDstBlock->info.rowSize = 0; pDstBlock->info.rowSize = 0;
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); size_t numOfCols = pOperator->exprSupp.numOfExprs;
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { if (pDataBlock->pBlockAgg) {
SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i];
int32_t slotId = pExpr->base.pParam[0].pCol->slotId;
SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, slotId);
SColumnInfoData colInfo = {.hasNull = true, .info = pSrc->info};
blockDataAppendColInfo(pDstBlock, &colInfo);
}
int32_t code = blockDataEnsureCapacity(pDstBlock, pDataBlock->info.rows);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
blockDataDestroy(pDstBlock);
return NULL;
}
if (pDataBlock->pBlockAgg != NULL) {
pDstBlock->pBlockAgg = taosMemoryCalloc(numOfCols, sizeof(SColumnDataAgg)); pDstBlock->pBlockAgg = taosMemoryCalloc(numOfCols, sizeof(SColumnDataAgg));
if (pDstBlock->pBlockAgg == NULL) { if (pDstBlock->pBlockAgg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
blockDataDestroy(pDstBlock); blockDataDestroy(pDstBlock);
return NULL; return NULL;
} }
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i];
int32_t slotId = pExpr->base.pParam[0].pCol->slotId;
pDstBlock->pBlockAgg[i] = pDataBlock->pBlockAgg[slotId];
}
} }
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i]; SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i];
int32_t slotId = pExpr->base.pParam[0].pCol->slotId; int32_t slotId = pExpr->base.pParam[0].pCol->slotId;
SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, slotId); SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, slotId);
SColumnInfoData colInfo = {.hasNull = true, .info = pSrc->info};
blockDataAppendColInfo(pDstBlock, &colInfo);
SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i); SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i);
int32_t code = doEnsureCapacity(pDst, &pDstBlock->info, pDataBlock->info.rows, false);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
blockDataDestroy(pDstBlock);
return NULL;
}
colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info); colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info);
if (pDataBlock->pBlockAgg) {
pDstBlock->pBlockAgg[i] = pDataBlock->pBlockAgg[slotId];
}
} }
return pDstBlock; return pDstBlock;
@ -707,13 +701,13 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
T_LONG_JMP(pTaskInfo->env, terrno); T_LONG_JMP(pTaskInfo->env, terrno);
} }
if (pGroupInfo->blockForNotLoaded == NULL) { if (pGroupInfo->blockForNotLoaded == NULL) {
pGroupInfo->blockForNotLoaded = taosArrayInit(1, sizeof(SSDataBlock)); pGroupInfo->blockForNotLoaded = taosArrayInit(0, sizeof(SSDataBlock*));
pGroupInfo->offsetForNotLoaded = 0; pGroupInfo->offsetForNotLoaded = 0;
} }
dataNotLoadBlock->info.id.groupId = pGroupInfo->groupId; dataNotLoadBlock->info.id.groupId = pGroupInfo->groupId;
dataNotLoadBlock->info.dataLoad = 0; dataNotLoadBlock->info.dataLoad = 0;
pInfo->binfo.pRes->info.rows = pBlock->info.rows; pInfo->binfo.pRes->info.rows = pBlock->info.rows;
taosArrayInsert(pGroupInfo->blockForNotLoaded, pGroupInfo->blockForNotLoaded->size, &dataNotLoadBlock); taosArrayPush(pGroupInfo->blockForNotLoaded, &dataNotLoadBlock);
break; break;
} }
} }
@ -863,12 +857,10 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, terrno); T_LONG_JMP(pTaskInfo->env, terrno);
} }
if (*(int32_t*)page == 0) { if (*(int32_t*)page == 0) {
releaseBufPage(pInfo->pBuf, page);
SSDataBlock* ret = buildPartitionResultForNotLoadBlock(pGroupInfo); SSDataBlock* ret = buildPartitionResultForNotLoadBlock(pGroupInfo);
if (ret != NULL) return ret; if (ret != NULL) return ret;
releaseBufPage(pInfo->pBuf, page); if (pInfo->groupIndex + 1 < taosArrayGetSize(pInfo->sortedGroupArray)) {
if (pInfo->pageIndex < taosArrayGetSize(pGroupInfo->pPageList)) {
pInfo->pageIndex += 1;
} else if (pInfo->groupIndex + 1 < taosArrayGetSize(pInfo->sortedGroupArray)) {
pInfo->groupIndex++; pInfo->groupIndex++;
pInfo->pageIndex = 0; pInfo->pageIndex = 0;
} else { } else {
@ -951,8 +943,6 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
while (pGroupIter != NULL) { while (pGroupIter != NULL) {
SDataGroupInfo* pGroupInfo = pGroupIter; SDataGroupInfo* pGroupInfo = pGroupIter;
taosArrayPush(groupArray, pGroupInfo); taosArrayPush(groupArray, pGroupInfo);
static int i = 0;
qInfo("groupArray push %p %p %d times", pGroupInfo, pGroupInfo->blockForNotLoaded, ++i);
pGroupIter = taosHashIterate(pInfo->pGroupSet, pGroupIter); pGroupIter = taosHashIterate(pInfo->pGroupSet, pGroupIter);
} }