Merge pull request #13629 from taosdata/szhou/feature/multiwaymerge
fix: add group id to sort operator
This commit is contained in:
commit
0b549e9de8
|
@ -1217,6 +1217,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
|
|||
pBlock->info.numOfCols = numOfCols;
|
||||
pBlock->info.hasVarCol = pDataBlock->info.hasVarCol;
|
||||
pBlock->info.rowSize = pDataBlock->info.rowSize;
|
||||
pBlock->info.groupId = pDataBlock->info.groupId;
|
||||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData colInfo = {0};
|
||||
|
|
|
@ -688,6 +688,10 @@ typedef struct SSortedMergeOperatorInfo {
|
|||
int32_t numOfResPerPage;
|
||||
char** groupVal;
|
||||
SArray *groupInfo;
|
||||
|
||||
bool hasGroupId;
|
||||
uint64_t groupId;
|
||||
STupleHandle* prefetchedTuple;
|
||||
} SSortedMergeOperatorInfo;
|
||||
|
||||
typedef struct SSortOperatorInfo {
|
||||
|
@ -700,6 +704,10 @@ typedef struct SSortOperatorInfo {
|
|||
|
||||
int64_t startTs; // sort start time
|
||||
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
|
||||
|
||||
STupleHandle *prefetchedTuple;
|
||||
bool hasGroupId;
|
||||
uint64_t groupId;
|
||||
} SSortOperatorInfo;
|
||||
|
||||
typedef struct STagFilterOperatorInfo {
|
||||
|
@ -759,7 +767,7 @@ void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData
|
|||
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode);
|
||||
SColumn extractColumnFromColumnNode(SColumnNode* pColNode);
|
||||
|
||||
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo);
|
||||
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo, SSortOperatorInfo* pInfo);
|
||||
SSDataBlock* loadNextDataBlock(void* param);
|
||||
|
||||
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset);
|
||||
|
|
|
@ -3103,6 +3103,68 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
|
|||
return (pInfo->binfo.pRes->info.rows > 0) ? pInfo->binfo.pRes : NULL;
|
||||
}
|
||||
|
||||
SSDataBlock* getSortedMergeBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
|
||||
SSortedMergeOperatorInfo *pInfo) {
|
||||
blockDataCleanup(pDataBlock);
|
||||
|
||||
SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
|
||||
if (p == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
blockDataEnsureCapacity(p, capacity);
|
||||
|
||||
while (1) {
|
||||
STupleHandle* pTupleHandle = NULL;
|
||||
if (pInfo->prefetchedTuple == NULL) {
|
||||
pTupleHandle = tsortNextTuple(pHandle);
|
||||
} else {
|
||||
pTupleHandle = pInfo->prefetchedTuple;
|
||||
pInfo->groupId = tsortGetGroupId(pTupleHandle);
|
||||
pInfo->prefetchedTuple = NULL;
|
||||
}
|
||||
|
||||
if (pTupleHandle == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle);
|
||||
if (!pInfo->hasGroupId) {
|
||||
pInfo->groupId = tupleGroupId;
|
||||
pInfo->hasGroupId = true;
|
||||
appendOneRowToDataBlock(p, pTupleHandle);
|
||||
} else if (pInfo->groupId == tupleGroupId) {
|
||||
appendOneRowToDataBlock(p, pTupleHandle);
|
||||
} else {
|
||||
pInfo->prefetchedTuple = pTupleHandle;
|
||||
break;
|
||||
}
|
||||
|
||||
if (p->info.rows >= capacity) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (p->info.rows > 0) {
|
||||
int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i);
|
||||
ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID);
|
||||
|
||||
SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
|
||||
SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->targetSlotId);
|
||||
colDataAssign(pDst, pSrc, p->info.rows);
|
||||
}
|
||||
|
||||
pDataBlock->info.rows = p->info.rows;
|
||||
pDataBlock->info.capacity = p->info.rows;
|
||||
pDataBlock->info.groupId = pInfo->groupId;
|
||||
}
|
||||
|
||||
blockDataDestroy(p);
|
||||
return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
|
||||
}
|
||||
|
||||
static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) {
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
|
@ -3111,7 +3173,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) {
|
|||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SSortedMergeOperatorInfo* pInfo = pOperator->info;
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, NULL);
|
||||
return getSortedMergeBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, NULL, pInfo);
|
||||
}
|
||||
|
||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||
|
|
|
@ -42,6 +42,8 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
|
|||
|
||||
pInfo->pSortInfo = pSortInfo;
|
||||
pInfo->pColMatchInfo = pColMatchColInfo;
|
||||
pInfo->hasGroupId = false;
|
||||
pInfo->prefetchedTuple = NULL;
|
||||
pOperator->name = "SortOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
|
||||
pOperator->blocking = true;
|
||||
|
@ -81,8 +83,8 @@ void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
|
|||
pBlock->info.rows += 1;
|
||||
}
|
||||
|
||||
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity,
|
||||
SArray* pColMatchInfo) {
|
||||
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
|
||||
SSortOperatorInfo* pInfo) {
|
||||
blockDataCleanup(pDataBlock);
|
||||
|
||||
SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
|
||||
|
@ -93,14 +95,33 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i
|
|||
blockDataEnsureCapacity(p, capacity);
|
||||
|
||||
while (1) {
|
||||
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
|
||||
STupleHandle* pTupleHandle = NULL;
|
||||
if (pInfo->prefetchedTuple == NULL) {
|
||||
pTupleHandle = tsortNextTuple(pHandle);
|
||||
} else {
|
||||
pTupleHandle = pInfo->prefetchedTuple;
|
||||
pInfo->groupId = tsortGetGroupId(pTupleHandle);
|
||||
pInfo->prefetchedTuple = NULL;
|
||||
}
|
||||
|
||||
if (pTupleHandle == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
appendOneRowToDataBlock(p, pTupleHandle);
|
||||
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle);
|
||||
if (!pInfo->hasGroupId) {
|
||||
pInfo->groupId = tupleGroupId;
|
||||
pInfo->hasGroupId = true;
|
||||
appendOneRowToDataBlock(p, pTupleHandle);
|
||||
} else if (pInfo->groupId == tupleGroupId) {
|
||||
appendOneRowToDataBlock(p, pTupleHandle);
|
||||
} else {
|
||||
pInfo->prefetchedTuple = pTupleHandle;
|
||||
break;
|
||||
}
|
||||
|
||||
if (p->info.rows >= capacity) {
|
||||
return pDataBlock;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -117,6 +138,7 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i
|
|||
|
||||
pDataBlock->info.rows = p->info.rows;
|
||||
pDataBlock->info.capacity = p->info.rows;
|
||||
pDataBlock->info.groupId = pInfo->groupId;
|
||||
}
|
||||
|
||||
blockDataDestroy(p);
|
||||
|
@ -188,8 +210,8 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
|
|||
longjmp(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
SSDataBlock* pBlock =
|
||||
getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pInfo->pColMatchInfo);
|
||||
SSDataBlock* pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
|
||||
pInfo->pColMatchInfo, pInfo);
|
||||
|
||||
if (pBlock != NULL) {
|
||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||
|
@ -230,11 +252,11 @@ typedef struct SMultiwaySortMergeOperatorInfo {
|
|||
SArray* pColMatchInfo; // for index map from table scan output
|
||||
|
||||
SSDataBlock* pInputBlock;
|
||||
int64_t startTs; // sort start time
|
||||
int64_t startTs; // sort start time
|
||||
|
||||
bool hasGroupId;
|
||||
uint64_t groupId;
|
||||
STupleHandle *prefetchedTuple;
|
||||
bool hasGroupId;
|
||||
uint64_t groupId;
|
||||
STupleHandle* prefetchedTuple;
|
||||
} SMultiwaySortMergeOperatorInfo;
|
||||
|
||||
int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
|
||||
|
@ -274,7 +296,7 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity,
|
||||
SArray* pColMatchInfo, SMultiwaySortMergeOperatorInfo* pInfo) {
|
||||
SArray* pColMatchInfo, SMultiwaySortMergeOperatorInfo* pInfo) {
|
||||
blockDataCleanup(pDataBlock);
|
||||
|
||||
SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
|
||||
|
@ -285,7 +307,6 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
|
|||
blockDataEnsureCapacity(p, capacity);
|
||||
|
||||
while (1) {
|
||||
|
||||
STupleHandle* pTupleHandle = NULL;
|
||||
if (pInfo->prefetchedTuple == NULL) {
|
||||
pTupleHandle = tsortNextTuple(pHandle);
|
||||
|
@ -314,7 +335,6 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
|
|||
if (p->info.rows >= capacity) {
|
||||
break;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if (p->info.rows > 0) {
|
||||
|
@ -337,7 +357,6 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
|
|||
return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
|
||||
}
|
||||
|
||||
|
||||
SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) {
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
|
@ -351,12 +370,8 @@ SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) {
|
|||
longjmp(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
SSDataBlock* pBlock =
|
||||
getMultiwaySortedBlockData(pInfo->pSortHandle,
|
||||
pInfo->binfo.pRes,
|
||||
pOperator->resultInfo.capacity,
|
||||
pInfo->pColMatchInfo,
|
||||
pInfo);
|
||||
SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes,
|
||||
pOperator->resultInfo.capacity, pInfo->pColMatchInfo, pInfo);
|
||||
|
||||
if (pBlock != NULL) {
|
||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||
|
@ -367,7 +382,7 @@ SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
void destroyMultiwaySortMergeOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SMultiwaySortMergeOperatorInfo * pInfo = (SMultiwaySortMergeOperatorInfo*)param;
|
||||
SMultiwaySortMergeOperatorInfo* pInfo = (SMultiwaySortMergeOperatorInfo*)param;
|
||||
pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes);
|
||||
pInfo->pInputBlock = blockDataDestroy(pInfo->pInputBlock);
|
||||
|
||||
|
@ -387,9 +402,9 @@ int32_t getMultiwaySortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrEx
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams, SSDataBlock* pInputBlock,
|
||||
SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pColMatchColInfo,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams,
|
||||
SSDataBlock* pInputBlock, SSDataBlock* pResBlock, SArray* pSortInfo,
|
||||
SArray* pColMatchColInfo, SExecTaskInfo* pTaskInfo) {
|
||||
SMultiwaySortMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMultiwaySortMergeOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
int32_t rowSize = pResBlock->info.rowSize;
|
||||
|
@ -413,7 +428,8 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams,
|
|||
|
||||
pInfo->bufPageSize = rowSize < 1024 ? 1024 : rowSize * 2;
|
||||
pInfo->sortBufSize = pInfo->bufPageSize * 16;
|
||||
|
||||
pInfo->hasGroupId = false;
|
||||
pInfo->prefetchedTuple = NULL;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(doOpenMultiwaySortMergeOperator, doMultiwaySortMerge, NULL, NULL,
|
||||
|
|
|
@ -528,16 +528,24 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
|
|||
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
|
||||
SSortSource* source = taosArrayGetP(pHandle->pOrderedSource, 0);
|
||||
taosArrayClear(pHandle->pOrderedSource);
|
||||
|
||||
|
||||
bool hasGroupId = false;
|
||||
SSDataBlock* prefetchedDataBlock = NULL;
|
||||
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = pHandle->fetchfp(source->param);
|
||||
SSDataBlock* pBlock = NULL;
|
||||
if (prefetchedDataBlock == NULL) {
|
||||
pBlock = pHandle->fetchfp(source->param);
|
||||
} else {
|
||||
pBlock = prefetchedDataBlock;
|
||||
prefetchedDataBlock = NULL;
|
||||
}
|
||||
|
||||
if (pBlock == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (pHandle->pDataBlock == NULL) {
|
||||
pHandle->pDataBlock = createOneDataBlock(pBlock, false);
|
||||
|
||||
if (!hasGroupId) {
|
||||
// calculate the buffer pages according to the total available buffers.
|
||||
int32_t rowSize = blockDataGetRowSize(pBlock);
|
||||
if (rowSize * 4 > 4096) {
|
||||
|
@ -549,29 +557,36 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
|
|||
// todo!!
|
||||
pHandle->numOfPages = 1024;
|
||||
sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
||||
|
||||
hasGroupId = true;
|
||||
pHandle->pDataBlock = createOneDataBlock(pBlock, false);
|
||||
}
|
||||
|
||||
// perform the scalar function calculation before apply the sort
|
||||
if (pHandle->beforeFp != NULL) {
|
||||
pHandle->beforeFp(pBlock, pHandle->param);
|
||||
}
|
||||
if (pHandle->pDataBlock->info.groupId == pBlock->info.groupId) {
|
||||
// perform the scalar function calculation before apply the sort
|
||||
if (pHandle->beforeFp != NULL) {
|
||||
pHandle->beforeFp(pBlock, pHandle->param);
|
||||
}
|
||||
// todo relocate the columns
|
||||
int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
|
||||
// todo relocate the columns
|
||||
int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
size_t size = blockDataGetSize(pHandle->pDataBlock);
|
||||
if (size > sortBufSize) {
|
||||
// Perform the in-memory sort and then flush data in the buffer into disk.
|
||||
int64_t p = taosGetTimestampUs();
|
||||
blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
|
||||
|
||||
size_t size = blockDataGetSize(pHandle->pDataBlock);
|
||||
if (size > sortBufSize) {
|
||||
// Perform the in-memory sort and then flush data in the buffer into disk.
|
||||
int64_t p = taosGetTimestampUs();
|
||||
blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
|
||||
int64_t el = taosGetTimestampUs() - p;
|
||||
pHandle->sortElapsed += el;
|
||||
|
||||
int64_t el = taosGetTimestampUs() - p;
|
||||
pHandle->sortElapsed += el;
|
||||
|
||||
doAddToBuf(pHandle->pDataBlock, pHandle);
|
||||
doAddToBuf(pHandle->pDataBlock, pHandle);
|
||||
}
|
||||
} else {
|
||||
prefetchedDataBlock = pBlock;
|
||||
pHandle->pDataBlock = createOneDataBlock(pBlock, false);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue