fix(query): remove group info during sort.
This commit is contained in:
parent
87300b2ab0
commit
bd518f3f71
|
@ -696,10 +696,6 @@ typedef struct SSortedMergeOperatorInfo {
|
|||
int32_t numOfResPerPage;
|
||||
char** groupVal;
|
||||
SArray *groupInfo;
|
||||
|
||||
bool hasGroupId;
|
||||
uint64_t groupId;
|
||||
STupleHandle* prefetchedTuple;
|
||||
} SSortedMergeOperatorInfo;
|
||||
|
||||
typedef struct SSortOperatorInfo {
|
||||
|
@ -712,10 +708,6 @@ typedef struct SSortOperatorInfo {
|
|||
|
||||
int64_t startTs; // sort start time
|
||||
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
|
||||
|
||||
STupleHandle *prefetchedTuple;
|
||||
bool hasGroupId;
|
||||
uint64_t groupId;
|
||||
} SSortOperatorInfo;
|
||||
|
||||
typedef struct STagFilterOperatorInfo {
|
||||
|
|
|
@ -130,12 +130,6 @@ bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colId);
|
|||
*/
|
||||
void* tsortGetValue(STupleHandle* pVHandle, int32_t colId);
|
||||
|
||||
/**
|
||||
*
|
||||
* @param pVHandle
|
||||
* @return
|
||||
*/
|
||||
uint64_t tsortGetGroupId(STupleHandle* pVHandle);
|
||||
/**
|
||||
*
|
||||
* @param pSortHandle
|
||||
|
|
|
@ -3030,31 +3030,12 @@ SSDataBlock* getSortedMergeBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlo
|
|||
blockDataEnsureCapacity(p, capacity);
|
||||
|
||||
while (1) {
|
||||
STupleHandle* pTupleHandle = NULL;
|
||||
if (pInfo->prefetchedTuple == NULL) {
|
||||
pTupleHandle = tsortNextTuple(pHandle);
|
||||
} else {
|
||||
pTupleHandle = pInfo->prefetchedTuple;
|
||||
pInfo->groupId = tsortGetGroupId(pTupleHandle);
|
||||
pInfo->prefetchedTuple = NULL;
|
||||
}
|
||||
|
||||
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
|
||||
if (pTupleHandle == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle);
|
||||
if (!pInfo->hasGroupId) {
|
||||
pInfo->groupId = tupleGroupId;
|
||||
pInfo->hasGroupId = true;
|
||||
appendOneRowToDataBlock(p, pTupleHandle);
|
||||
} else if (pInfo->groupId == tupleGroupId) {
|
||||
appendOneRowToDataBlock(p, pTupleHandle);
|
||||
} else {
|
||||
pInfo->prefetchedTuple = pTupleHandle;
|
||||
break;
|
||||
}
|
||||
|
||||
appendOneRowToDataBlock(p, pTupleHandle);
|
||||
if (p->info.rows >= capacity) {
|
||||
break;
|
||||
}
|
||||
|
@ -3073,7 +3054,6 @@ SSDataBlock* getSortedMergeBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlo
|
|||
|
||||
pDataBlock->info.rows = p->info.rows;
|
||||
pDataBlock->info.capacity = p->info.rows;
|
||||
pDataBlock->info.groupId = pInfo->groupId;
|
||||
}
|
||||
|
||||
blockDataDestroy(p);
|
||||
|
@ -3339,7 +3319,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
|
|||
doSetOperatorCompleted(pOperator);
|
||||
}
|
||||
|
||||
size_t rows = blockDataGetNumOfRows(pInfo->pRes); // pInfo->pRes : NULL;
|
||||
size_t rows = blockDataGetNumOfRows(pInfo->pRes);
|
||||
pOperator->resultInfo.totalRows += rows;
|
||||
|
||||
return (rows == 0) ? NULL : pInfo->pRes;
|
||||
|
@ -4920,7 +4900,10 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
|
|||
}
|
||||
|
||||
SArray* extractPartitionColInfo(SNodeList* pNodeList) {
|
||||
if(!pNodeList) return NULL;
|
||||
if(!pNodeList) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
size_t numOfCols = LIST_LENGTH(pNodeList);
|
||||
SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
|
||||
if (pList == NULL) {
|
||||
|
|
|
@ -2175,25 +2175,13 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capa
|
|||
pTupleHandle = tsortNextTuple(pHandle);
|
||||
} else {
|
||||
pTupleHandle = pInfo->prefetchedTuple;
|
||||
pInfo->groupId = tsortGetGroupId(pTupleHandle);
|
||||
pInfo->prefetchedTuple = NULL;
|
||||
}
|
||||
|
||||
if (pTupleHandle == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle);
|
||||
if (!pInfo->hasGroupId) {
|
||||
pInfo->groupId = tupleGroupId;
|
||||
pInfo->hasGroupId = true;
|
||||
appendOneRowToDataBlock(p, pTupleHandle);
|
||||
} else if (pInfo->groupId == tupleGroupId) {
|
||||
appendOneRowToDataBlock(p, pTupleHandle);
|
||||
} else {
|
||||
pInfo->prefetchedTuple = pTupleHandle;
|
||||
break;
|
||||
}
|
||||
appendOneRowToDataBlock(p, pTupleHandle);
|
||||
|
||||
if (p->info.rows >= capacity) {
|
||||
break;
|
||||
|
|
|
@ -40,15 +40,13 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
|
|||
|
||||
initResultSizeInfo(pOperator, 1024);
|
||||
|
||||
pInfo->pSortInfo = pSortInfo;
|
||||
pInfo->pColMatchInfo = pColMatchColInfo;
|
||||
pInfo->hasGroupId = false;
|
||||
pInfo->prefetchedTuple = NULL;
|
||||
pOperator->name = "SortOperator";
|
||||
pInfo->pSortInfo = pSortInfo;
|
||||
pInfo->pColMatchInfo = pColMatchColInfo;
|
||||
pOperator->name = "SortOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
|
||||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
|
||||
// lazy evaluation for the following parameter since the input datablock is not known till now.
|
||||
// pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2; // there are headers, so pageSize = rowSize +
|
||||
|
@ -97,31 +95,12 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i
|
|||
blockDataEnsureCapacity(p, capacity);
|
||||
|
||||
while (1) {
|
||||
STupleHandle* pTupleHandle = NULL;
|
||||
if (pInfo->prefetchedTuple == NULL) {
|
||||
pTupleHandle = tsortNextTuple(pHandle);
|
||||
} else {
|
||||
pTupleHandle = pInfo->prefetchedTuple;
|
||||
pInfo->groupId = tsortGetGroupId(pTupleHandle);
|
||||
pInfo->prefetchedTuple = NULL;
|
||||
}
|
||||
|
||||
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
|
||||
if (pTupleHandle == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle);
|
||||
if (!pInfo->hasGroupId) {
|
||||
pInfo->groupId = tupleGroupId;
|
||||
pInfo->hasGroupId = true;
|
||||
appendOneRowToDataBlock(p, pTupleHandle);
|
||||
} else if (pInfo->groupId == tupleGroupId) {
|
||||
appendOneRowToDataBlock(p, pTupleHandle);
|
||||
} else {
|
||||
pInfo->prefetchedTuple = pTupleHandle;
|
||||
break;
|
||||
}
|
||||
|
||||
appendOneRowToDataBlock(p, pTupleHandle);
|
||||
if (p->info.rows >= capacity) {
|
||||
break;
|
||||
}
|
||||
|
@ -140,7 +119,6 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i
|
|||
|
||||
pDataBlock->info.rows = p->info.rows;
|
||||
pDataBlock->info.capacity = p->info.rows;
|
||||
pDataBlock->info.groupId = pInfo->groupId;
|
||||
}
|
||||
|
||||
blockDataDestroy(p);
|
||||
|
@ -255,10 +233,7 @@ typedef struct SMultiwaySortMergeOperatorInfo {
|
|||
|
||||
SSDataBlock* pInputBlock;
|
||||
int64_t startTs; // sort start time
|
||||
|
||||
bool hasGroupId;
|
||||
uint64_t groupId;
|
||||
STupleHandle* prefetchedTuple;
|
||||
uint64_t groupId;
|
||||
} SMultiwaySortMergeOperatorInfo;
|
||||
|
||||
int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
|
||||
|
@ -312,31 +287,12 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
|
|||
blockDataEnsureCapacity(p, capacity);
|
||||
|
||||
while (1) {
|
||||
STupleHandle* pTupleHandle = NULL;
|
||||
if (pInfo->prefetchedTuple == NULL) {
|
||||
pTupleHandle = tsortNextTuple(pHandle);
|
||||
} else {
|
||||
pTupleHandle = pInfo->prefetchedTuple;
|
||||
pInfo->groupId = tsortGetGroupId(pTupleHandle);
|
||||
pInfo->prefetchedTuple = NULL;
|
||||
}
|
||||
|
||||
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
|
||||
if (pTupleHandle == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle);
|
||||
if (!pInfo->hasGroupId) {
|
||||
pInfo->groupId = tupleGroupId;
|
||||
pInfo->hasGroupId = true;
|
||||
appendOneRowToDataBlock(p, pTupleHandle);
|
||||
} else if (pInfo->groupId == tupleGroupId) {
|
||||
appendOneRowToDataBlock(p, pTupleHandle);
|
||||
} else {
|
||||
pInfo->prefetchedTuple = pTupleHandle;
|
||||
break;
|
||||
}
|
||||
|
||||
appendOneRowToDataBlock(p, pTupleHandle);
|
||||
if (p->info.rows >= capacity) {
|
||||
break;
|
||||
}
|
||||
|
@ -432,14 +388,12 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams,
|
|||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pInfo->hasGroupId = false;
|
||||
pInfo->prefetchedTuple = NULL;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pInfo->bufPageSize = getProperSortPageSize(rowSize);
|
||||
|
||||
uint32_t numOfSources = taosArrayGetSize(pSortInfo);
|
||||
numOfSources = TMAX(2, numOfSources);
|
||||
numOfSources = TMAX(4, numOfSources);
|
||||
|
||||
pInfo->sortBufSize = numOfSources * pInfo->bufPageSize;
|
||||
|
||||
|
|
|
@ -557,59 +557,40 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
|
|||
SSortSource* source = taosArrayGetP(pHandle->pOrderedSource, 0);
|
||||
taosArrayClear(pHandle->pOrderedSource);
|
||||
|
||||
bool hasGroupId = false;
|
||||
SSDataBlock* prefetchedDataBlock = NULL;
|
||||
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = NULL;
|
||||
if (prefetchedDataBlock == NULL) {
|
||||
pBlock = pHandle->fetchfp(source->param);
|
||||
} else {
|
||||
pBlock = prefetchedDataBlock;
|
||||
prefetchedDataBlock = NULL;
|
||||
}
|
||||
|
||||
SSDataBlock* pBlock = pHandle->fetchfp(source->param);
|
||||
if (pBlock == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (!hasGroupId) {
|
||||
// calculate the buffer pages according to the total available buffers.
|
||||
if (pHandle->pDataBlock == NULL) {
|
||||
pHandle->pageSize = getProperSortPageSize(blockDataGetRowSize(pBlock));
|
||||
|
||||
// todo, number of pages are set according to the total available sort buffer
|
||||
pHandle->numOfPages = 1024;
|
||||
sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
||||
|
||||
hasGroupId = true;
|
||||
pHandle->pDataBlock = createOneDataBlock(pBlock, false);
|
||||
}
|
||||
|
||||
if (pHandle->pDataBlock->info.groupId == pBlock->info.groupId) {
|
||||
// perform the scalar function calculation before apply the sort
|
||||
if (pHandle->beforeFp != NULL) {
|
||||
pHandle->beforeFp(pBlock, pHandle->param);
|
||||
}
|
||||
if (pHandle->beforeFp != NULL) {
|
||||
pHandle->beforeFp(pBlock, pHandle->param);
|
||||
}
|
||||
|
||||
int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
|
||||
size_t size = blockDataGetSize(pHandle->pDataBlock);
|
||||
if (size > sortBufSize) {
|
||||
// Perform the in-memory sort and then flush data in the buffer into disk.
|
||||
int64_t p = taosGetTimestampUs();
|
||||
blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
|
||||
size_t size = blockDataGetSize(pHandle->pDataBlock);
|
||||
if (size > sortBufSize) {
|
||||
// Perform the in-memory sort and then flush data in the buffer into disk.
|
||||
int64_t p = taosGetTimestampUs();
|
||||
blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
|
||||
|
||||
int64_t el = taosGetTimestampUs() - p;
|
||||
pHandle->sortElapsed += el;
|
||||
int64_t el = taosGetTimestampUs() - p;
|
||||
pHandle->sortElapsed += el;
|
||||
|
||||
doAddToBuf(pHandle->pDataBlock, pHandle);
|
||||
}
|
||||
} else {
|
||||
prefetchedDataBlock = pBlock;
|
||||
pHandle->pDataBlock = createOneDataBlock(pBlock, false);
|
||||
doAddToBuf(pHandle->pDataBlock, pHandle);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -758,10 +739,6 @@ void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex) {
|
|||
}
|
||||
}
|
||||
|
||||
uint64_t tsortGetGroupId(STupleHandle* pVHandle) {
|
||||
return pVHandle->pBlock->info.groupId;
|
||||
}
|
||||
|
||||
SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) {
|
||||
SSortExecInfo info = {0};
|
||||
|
||||
|
|
Loading…
Reference in New Issue