fix: add group id to sorted merge operator
This commit is contained in:
parent
414b3242a7
commit
80a29a6a31
|
@ -688,6 +688,10 @@ typedef struct SSortedMergeOperatorInfo {
|
||||||
int32_t numOfResPerPage;
|
int32_t numOfResPerPage;
|
||||||
char** groupVal;
|
char** groupVal;
|
||||||
SArray *groupInfo;
|
SArray *groupInfo;
|
||||||
|
|
||||||
|
bool hasGroupId;
|
||||||
|
uint64_t groupId;
|
||||||
|
STupleHandle* prefetchedTuple;
|
||||||
} SSortedMergeOperatorInfo;
|
} SSortedMergeOperatorInfo;
|
||||||
|
|
||||||
typedef struct SSortOperatorInfo {
|
typedef struct SSortOperatorInfo {
|
||||||
|
|
|
@ -3133,6 +3133,68 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
|
||||||
return (pInfo->binfo.pRes->info.rows > 0) ? pInfo->binfo.pRes : NULL;
|
return (pInfo->binfo.pRes->info.rows > 0) ? pInfo->binfo.pRes : NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SSDataBlock* getSortedMergeBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
|
||||||
|
SSortedMergeOperatorInfo *pInfo) {
|
||||||
|
blockDataCleanup(pDataBlock);
|
||||||
|
|
||||||
|
SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
|
||||||
|
if (p == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
blockDataEnsureCapacity(p, capacity);
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
STupleHandle* pTupleHandle = NULL;
|
||||||
|
if (pInfo->prefetchedTuple == NULL) {
|
||||||
|
pTupleHandle = tsortNextTuple(pHandle);
|
||||||
|
} else {
|
||||||
|
pTupleHandle = pInfo->prefetchedTuple;
|
||||||
|
pInfo->groupId = tsortGetGroupId(pTupleHandle);
|
||||||
|
pInfo->prefetchedTuple = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pTupleHandle == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle);
|
||||||
|
if (!pInfo->hasGroupId) {
|
||||||
|
pInfo->groupId = tupleGroupId;
|
||||||
|
pInfo->hasGroupId = true;
|
||||||
|
appendOneRowToDataBlock(p, pTupleHandle);
|
||||||
|
} else if (pInfo->groupId == tupleGroupId) {
|
||||||
|
appendOneRowToDataBlock(p, pTupleHandle);
|
||||||
|
} else {
|
||||||
|
pInfo->prefetchedTuple = pTupleHandle;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (p->info.rows >= capacity) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (p->info.rows > 0) {
|
||||||
|
int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
|
||||||
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i);
|
||||||
|
ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID);
|
||||||
|
|
||||||
|
SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
|
||||||
|
SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->targetSlotId);
|
||||||
|
colDataAssign(pDst, pSrc, p->info.rows);
|
||||||
|
}
|
||||||
|
|
||||||
|
pDataBlock->info.rows = p->info.rows;
|
||||||
|
pDataBlock->info.capacity = p->info.rows;
|
||||||
|
pDataBlock->info.groupId = pInfo->groupId;
|
||||||
|
}
|
||||||
|
|
||||||
|
blockDataDestroy(p);
|
||||||
|
return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
|
||||||
|
}
|
||||||
|
|
||||||
static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) {
|
static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) {
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -3141,7 +3203,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SSortedMergeOperatorInfo* pInfo = pOperator->info;
|
SSortedMergeOperatorInfo* pInfo = pOperator->info;
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, NULL);
|
return getSortedMergeBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, NULL, pInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||||
|
|
Loading…
Reference in New Issue