Revert "feat: add group id to multiway-sort-merge operator"
This reverts commit b2d066521b
.
This commit is contained in:
parent
b2d066521b
commit
d5a29fc413
|
@ -130,12 +130,6 @@ bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colId);
|
||||||
*/
|
*/
|
||||||
void* tsortGetValue(STupleHandle* pVHandle, int32_t colId);
|
void* tsortGetValue(STupleHandle* pVHandle, int32_t colId);
|
||||||
|
|
||||||
/**
|
|
||||||
*
|
|
||||||
* @param pVHandle
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
uint64_t tsortGetGroupId(STupleHandle* pVHandle);
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
* @param pSortHandle
|
* @param pSortHandle
|
||||||
|
|
|
@ -231,10 +231,6 @@ typedef struct SMultiwaySortMergeOperatorInfo {
|
||||||
|
|
||||||
SSDataBlock* pInputBlock;
|
SSDataBlock* pInputBlock;
|
||||||
int64_t startTs; // sort start time
|
int64_t startTs; // sort start time
|
||||||
|
|
||||||
bool hasGroupId;
|
|
||||||
uint64_t groupId;
|
|
||||||
STupleHandle *prefetchedTuple;
|
|
||||||
} SMultiwaySortMergeOperatorInfo;
|
} SMultiwaySortMergeOperatorInfo;
|
||||||
|
|
||||||
int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
|
int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
|
||||||
|
@ -273,70 +269,6 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity,
|
|
||||||
SArray* pColMatchInfo, SMultiwaySortMergeOperatorInfo* pInfo) {
|
|
||||||
blockDataCleanup(pDataBlock);
|
|
||||||
|
|
||||||
SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
|
|
||||||
if (p == NULL) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
blockDataEnsureCapacity(p, capacity);
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
|
|
||||||
STupleHandle* pTupleHandle = NULL;
|
|
||||||
if (pInfo->prefetchedTuple == NULL) {
|
|
||||||
pTupleHandle = tsortNextTuple(pHandle);
|
|
||||||
} else {
|
|
||||||
pTupleHandle = pInfo->prefetchedTuple;
|
|
||||||
pInfo->prefetchedTuple = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTupleHandle == NULL) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle);
|
|
||||||
if (!pInfo->hasGroupId) {
|
|
||||||
pInfo->groupId = tupleGroupId;
|
|
||||||
pInfo->hasGroupId = true;
|
|
||||||
appendOneRowToDataBlock(p, pTupleHandle);
|
|
||||||
} else if (pInfo->groupId == tupleGroupId) {
|
|
||||||
appendOneRowToDataBlock(p, pTupleHandle);
|
|
||||||
} else {
|
|
||||||
pInfo->prefetchedTuple = pTupleHandle;
|
|
||||||
pInfo->groupId = tupleGroupId;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (p->info.rows >= capacity) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
if (p->info.rows > 0) {
|
|
||||||
int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
|
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
|
||||||
SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i);
|
|
||||||
ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID);
|
|
||||||
|
|
||||||
SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
|
|
||||||
SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->targetSlotId);
|
|
||||||
colDataAssign(pDst, pSrc, p->info.rows);
|
|
||||||
}
|
|
||||||
|
|
||||||
pDataBlock->info.rows = p->info.rows;
|
|
||||||
pDataBlock->info.capacity = p->info.rows;
|
|
||||||
}
|
|
||||||
|
|
||||||
blockDataDestroy(p);
|
|
||||||
return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) {
|
SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) {
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -351,11 +283,7 @@ SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* pBlock =
|
SSDataBlock* pBlock =
|
||||||
getMultiwaySortedBlockData(pInfo->pSortHandle,
|
getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pInfo->pColMatchInfo);
|
||||||
pInfo->binfo.pRes,
|
|
||||||
pOperator->resultInfo.capacity,
|
|
||||||
pInfo->pColMatchInfo,
|
|
||||||
pInfo);
|
|
||||||
|
|
||||||
if (pBlock != NULL) {
|
if (pBlock != NULL) {
|
||||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||||
|
|
|
@ -709,10 +709,6 @@ void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex) {
|
||||||
return colDataGetData(pColInfo, pVHandle->rowIndex);
|
return colDataGetData(pColInfo, pVHandle->rowIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t tsortGetGroupId(STupleHandle* pVHandle) {
|
|
||||||
return pVHandle->pBlock->info.groupId;
|
|
||||||
}
|
|
||||||
|
|
||||||
SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) {
|
SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) {
|
||||||
SSortExecInfo info = {0};
|
SSortExecInfo info = {0};
|
||||||
|
|
||||||
|
|
|
@ -66,7 +66,6 @@ if $rows != 0 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
sql select * from tb1 where null;
|
sql select * from tb1 where null;
|
||||||
print $rows
|
|
||||||
if $rows != 0 then
|
if $rows != 0 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
Loading…
Reference in New Issue