fix(query): fix bug in multi-group limit/offset of the merge sort .
This commit is contained in:
parent
8258c68b6d
commit
aab31f655c
|
@ -544,7 +544,6 @@ typedef struct SMultiwayMergeOperatorInfo {
|
||||||
SSDataBlock* pIntermediateBlock; // to hold the intermediate result
|
SSDataBlock* pIntermediateBlock; // to hold the intermediate result
|
||||||
int64_t startTs; // sort start time
|
int64_t startTs; // sort start time
|
||||||
bool groupSort;
|
bool groupSort;
|
||||||
bool hasGroupId;
|
|
||||||
uint64_t groupId;
|
uint64_t groupId;
|
||||||
STupleHandle* prefetchedTuple;
|
STupleHandle* prefetchedTuple;
|
||||||
} SMultiwayMergeOperatorInfo;
|
} SMultiwayMergeOperatorInfo;
|
||||||
|
@ -591,7 +590,9 @@ int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity, SSDataBlock* p) {
|
static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity,
|
||||||
|
SSDataBlock* p, bool* newgroup) {
|
||||||
|
*newgroup = false;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
STupleHandle* pTupleHandle = NULL;
|
STupleHandle* pTupleHandle = NULL;
|
||||||
|
@ -600,8 +601,12 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle*
|
||||||
pTupleHandle = tsortNextTuple(pHandle);
|
pTupleHandle = tsortNextTuple(pHandle);
|
||||||
} else {
|
} else {
|
||||||
pTupleHandle = pInfo->prefetchedTuple;
|
pTupleHandle = pInfo->prefetchedTuple;
|
||||||
pInfo->groupId = tsortGetGroupId(pTupleHandle);
|
|
||||||
pInfo->prefetchedTuple = NULL;
|
pInfo->prefetchedTuple = NULL;
|
||||||
|
uint64_t gid = tsortGetGroupId(pTupleHandle);
|
||||||
|
if (gid != pInfo->groupId) {
|
||||||
|
*newgroup = true;
|
||||||
|
pInfo->groupId = gid;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pTupleHandle = tsortNextTuple(pHandle);
|
pTupleHandle = tsortNextTuple(pHandle);
|
||||||
|
@ -614,12 +619,10 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle*
|
||||||
|
|
||||||
if (pInfo->groupSort) {
|
if (pInfo->groupSort) {
|
||||||
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle);
|
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle);
|
||||||
if (!pInfo->hasGroupId) {
|
if (pInfo->groupId == 0 || pInfo->groupId == tupleGroupId) {
|
||||||
|
appendOneRowToDataBlock(p, pTupleHandle);
|
||||||
|
p->info.id.groupId = tupleGroupId;
|
||||||
pInfo->groupId = tupleGroupId;
|
pInfo->groupId = tupleGroupId;
|
||||||
pInfo->hasGroupId = true;
|
|
||||||
appendOneRowToDataBlock(p, pTupleHandle);
|
|
||||||
} else if (pInfo->groupId == tupleGroupId) {
|
|
||||||
appendOneRowToDataBlock(p, pTupleHandle);
|
|
||||||
} else {
|
} else {
|
||||||
pInfo->prefetchedTuple = pTupleHandle;
|
pInfo->prefetchedTuple = pTupleHandle;
|
||||||
break;
|
break;
|
||||||
|
@ -632,11 +635,6 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle*
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInfo->groupSort) {
|
|
||||||
pInfo->hasGroupId = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, SArray* pColMatchInfo,
|
SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, SArray* pColMatchInfo,
|
||||||
|
@ -660,14 +658,18 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* p = pInfo->pIntermediateBlock;
|
SSDataBlock* p = pInfo->pIntermediateBlock;
|
||||||
|
bool newgroup = false;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
doGetSortedBlockData(pInfo, pHandle, capacity, p);
|
doGetSortedBlockData(pInfo, pHandle, capacity, p, &newgroup);
|
||||||
if (p->info.rows == 0) {
|
if (p->info.rows == 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo fix it: we need to decide whether this block is belonged to previous group or not .
|
if (newgroup) {
|
||||||
|
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
||||||
|
}
|
||||||
|
|
||||||
bool limitReached = applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo);
|
bool limitReached = applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo);
|
||||||
if (limitReached) {
|
if (limitReached) {
|
||||||
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
||||||
|
|
Loading…
Reference in New Issue