Merge pull request #19652 from taosdata/fix/liaohj
fix(query): do some internal refactor, and identify a bug.
This commit is contained in:
commit
8e73e3090c
|
@ -1431,6 +1431,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
|
||||||
pBlock->info.rows = 0;
|
pBlock->info.rows = 0;
|
||||||
pBlock->info.capacity = 0;
|
pBlock->info.capacity = 0;
|
||||||
pBlock->info.rowSize = 0;
|
pBlock->info.rowSize = 0;
|
||||||
|
pBlock->info.id = pDataBlock->info.id;
|
||||||
|
|
||||||
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
|
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
|
|
@ -704,9 +704,10 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
|
||||||
SDiskbasedBuf* pBuf);
|
SDiskbasedBuf* pBuf);
|
||||||
|
|
||||||
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
|
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
|
||||||
|
bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo);
|
||||||
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
|
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
|
||||||
void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo);
|
void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo);
|
||||||
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator);
|
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
|
void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
|
||||||
int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput);
|
int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput);
|
||||||
|
|
|
@ -218,10 +218,7 @@ static SSDataBlock* loadRemoteData(SOperatorInfo* pOperator) {
|
||||||
if (status == PROJECT_RETRIEVE_CONTINUE) {
|
if (status == PROJECT_RETRIEVE_CONTINUE) {
|
||||||
continue;
|
continue;
|
||||||
} else if (status == PROJECT_RETRIEVE_DONE) {
|
} else if (status == PROJECT_RETRIEVE_DONE) {
|
||||||
size_t rows = pBlock->info.rows;
|
if (pBlock->info.rows == 0) {
|
||||||
pExchangeInfo->limitInfo.numOfOutputRows += rows;
|
|
||||||
|
|
||||||
if (rows == 0) {
|
|
||||||
setOperatorCompleted(pOperator);
|
setOperatorCompleted(pOperator);
|
||||||
return NULL;
|
return NULL;
|
||||||
} else {
|
} else {
|
||||||
|
@ -707,6 +704,8 @@ int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
|
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
if (pLimitInfo->remainGroupOffset > 0) {
|
if (pLimitInfo->remainGroupOffset > 0) {
|
||||||
if (pLimitInfo->currentGroupId == 0) { // it is the first group
|
if (pLimitInfo->currentGroupId == 0) { // it is the first group
|
||||||
pLimitInfo->currentGroupId = pBlock->info.id.groupId;
|
pLimitInfo->currentGroupId = pBlock->info.id.groupId;
|
||||||
|
@ -750,36 +749,20 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa
|
||||||
// set current group id
|
// set current group id
|
||||||
pLimitInfo->currentGroupId = pBlock->info.id.groupId;
|
pLimitInfo->currentGroupId = pBlock->info.id.groupId;
|
||||||
|
|
||||||
if (pLimitInfo->remainOffset >= pBlock->info.rows) {
|
bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pTaskInfo);
|
||||||
pLimitInfo->remainOffset -= pBlock->info.rows;
|
if (pBlock->info.rows == 0) {
|
||||||
blockDataCleanup(pBlock);
|
|
||||||
return PROJECT_RETRIEVE_CONTINUE;
|
return PROJECT_RETRIEVE_CONTINUE;
|
||||||
} else if (pLimitInfo->remainOffset < pBlock->info.rows && pLimitInfo->remainOffset > 0) {
|
} else {
|
||||||
blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset);
|
if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
|
||||||
pLimitInfo->remainOffset = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
// check for the limitation in each group
|
|
||||||
if (pLimitInfo->limit.limit >= 0 && pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) {
|
|
||||||
int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows);
|
|
||||||
blockDataKeepFirstNRows(pBlock, keepRows);
|
|
||||||
if (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups) {
|
|
||||||
setOperatorCompleted(pOperator);
|
setOperatorCompleted(pOperator);
|
||||||
} else {
|
return PROJECT_RETRIEVE_DONE;
|
||||||
// current group limitation is reached, and future blocks of this group need to be discarded.
|
|
||||||
if (pBlock->info.rows == 0) {
|
|
||||||
return PROJECT_RETRIEVE_CONTINUE;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return PROJECT_RETRIEVE_DONE;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo optimize performance
|
// todo optimize performance
|
||||||
// If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
|
// If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
|
||||||
// they may not belong to the same group the limit/offset value is not valid in this case.
|
// they may not belong to the same group the limit/offset value is not valid in this case.
|
||||||
if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || pLimitInfo->slimit.offset != -1 ||
|
if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || hasSlimitOffsetInfo(pLimitInfo)) {
|
||||||
pLimitInfo->slimit.limit != -1) {
|
|
||||||
return PROJECT_RETRIEVE_DONE;
|
return PROJECT_RETRIEVE_DONE;
|
||||||
} else { // not full enough, continue to accumulate the output data in the buffer.
|
} else { // not full enough, continue to accumulate the output data in the buffer.
|
||||||
return PROJECT_RETRIEVE_CONTINUE;
|
return PROJECT_RETRIEVE_CONTINUE;
|
||||||
|
|
|
@ -1749,6 +1749,10 @@ bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo) {
|
||||||
pLimitInfo->slimit.offset != -1);
|
pLimitInfo->slimit.offset != -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo) {
|
||||||
|
return (pLimitInfo->slimit.limit != -1 || pLimitInfo->slimit.offset != -1);
|
||||||
|
}
|
||||||
|
|
||||||
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo) {
|
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo) {
|
||||||
SLimit limit = {.limit = getLimit(pLimit), .offset = getOffset(pLimit)};
|
SLimit limit = {.limit = getLimit(pLimit), .offset = getOffset(pLimit)};
|
||||||
SLimit slimit = {.limit = getLimit(pSLimit), .offset = getOffset(pSLimit)};
|
SLimit slimit = {.limit = getLimit(pSLimit), .offset = getOffset(pSLimit)};
|
||||||
|
|
|
@ -90,7 +90,16 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
|
||||||
|
|
||||||
pInfo->binfo.pRes = pResBlock;
|
pInfo->binfo.pRes = pResBlock;
|
||||||
pInfo->pFinalRes = createOneDataBlock(pResBlock, false);
|
pInfo->pFinalRes = createOneDataBlock(pResBlock, false);
|
||||||
pInfo->mergeDataBlocks = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? false : pProjPhyNode->mergeDataBlock;
|
|
||||||
|
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
|
||||||
|
pInfo->mergeDataBlocks = false;
|
||||||
|
} else {
|
||||||
|
if (!pProjPhyNode->ignoreGroupId) {
|
||||||
|
pInfo->mergeDataBlocks = false;
|
||||||
|
} else {
|
||||||
|
pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t numOfRows = 4096;
|
int32_t numOfRows = 4096;
|
||||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||||
|
@ -185,36 +194,15 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS
|
||||||
SOperatorInfo* pOperator) {
|
SOperatorInfo* pOperator) {
|
||||||
// set current group id
|
// set current group id
|
||||||
pLimitInfo->currentGroupId = groupId;
|
pLimitInfo->currentGroupId = groupId;
|
||||||
|
bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pOperator->pTaskInfo);
|
||||||
if (pLimitInfo->remainOffset >= pBlock->info.rows) {
|
if (pBlock->info.rows == 0) {
|
||||||
pLimitInfo->remainOffset -= pBlock->info.rows;
|
|
||||||
blockDataCleanup(pBlock);
|
|
||||||
return PROJECT_RETRIEVE_CONTINUE;
|
return PROJECT_RETRIEVE_CONTINUE;
|
||||||
} else if (pLimitInfo->remainOffset < pBlock->info.rows && pLimitInfo->remainOffset > 0) {
|
} else {
|
||||||
blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset);
|
if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
|
||||||
pLimitInfo->remainOffset = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
// check for the limitation in each group
|
|
||||||
if (pLimitInfo->limit.limit >= 0 && pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) {
|
|
||||||
int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows);
|
|
||||||
blockDataKeepFirstNRows(pBlock, keepRows);
|
|
||||||
|
|
||||||
// TODO: optimize it later when partition by + limit
|
|
||||||
// all retrieved requirement has been fulfilled, let's finish this
|
|
||||||
if ((pLimitInfo->slimit.limit == -1 && pLimitInfo->currentGroupId == 0) ||
|
|
||||||
(pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
|
|
||||||
setOperatorCompleted(pOperator);
|
setOperatorCompleted(pOperator);
|
||||||
} else {
|
|
||||||
// Even current group is done, there may be many vgroups remain existed, and we need to continue to retrieve data
|
|
||||||
// from next group. So let's continue this retrieve process
|
|
||||||
if (keepRows == 0) {
|
|
||||||
return PROJECT_RETRIEVE_CONTINUE;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pLimitInfo->numOfOutputRows += pBlock->info.rows;
|
|
||||||
return PROJECT_RETRIEVE_DONE;
|
return PROJECT_RETRIEVE_DONE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -256,12 +256,11 @@ static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo handle the slimit info
|
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
|
||||||
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) {
|
|
||||||
SLimit* pLimit = &pLimitInfo->limit;
|
SLimit* pLimit = &pLimitInfo->limit;
|
||||||
const char* id = GET_TASKID(pTaskInfo);
|
const char* id = GET_TASKID(pTaskInfo);
|
||||||
|
|
||||||
if (pLimit->offset > 0 && pLimitInfo->remainOffset > 0) {
|
if (pLimitInfo->remainOffset > 0) {
|
||||||
if (pLimitInfo->remainOffset >= pBlock->info.rows) {
|
if (pLimitInfo->remainOffset >= pBlock->info.rows) {
|
||||||
pLimitInfo->remainOffset -= pBlock->info.rows;
|
pLimitInfo->remainOffset -= pBlock->info.rows;
|
||||||
blockDataEmpty(pBlock);
|
blockDataEmpty(pBlock);
|
||||||
|
@ -276,12 +275,14 @@ bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo
|
||||||
if (pLimit->limit != -1 && pLimit->limit <= (pLimitInfo->numOfOutputRows + pBlock->info.rows)) {
|
if (pLimit->limit != -1 && pLimit->limit <= (pLimitInfo->numOfOutputRows + pBlock->info.rows)) {
|
||||||
// limit the output rows
|
// limit the output rows
|
||||||
int32_t keep = (int32_t)(pLimit->limit - pLimitInfo->numOfOutputRows);
|
int32_t keep = (int32_t)(pLimit->limit - pLimitInfo->numOfOutputRows);
|
||||||
|
|
||||||
blockDataKeepFirstNRows(pBlock, keep);
|
blockDataKeepFirstNRows(pBlock, keep);
|
||||||
|
|
||||||
|
pLimitInfo->numOfOutputRows += pBlock->info.rows;
|
||||||
qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, id);
|
qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, id);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pLimitInfo->numOfOutputRows += pBlock->info.rows;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -393,13 +394,12 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool limitReached = applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo, pOperator);
|
bool limitReached = applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo);
|
||||||
if (limitReached) { // set operator flag is done
|
if (limitReached) { // set operator flag is done
|
||||||
setOperatorCompleted(pOperator);
|
setOperatorCompleted(pOperator);
|
||||||
}
|
}
|
||||||
|
|
||||||
pCost->totalRows += pBlock->info.rows;
|
pCost->totalRows += pBlock->info.rows;
|
||||||
pTableScanInfo->limitInfo.numOfOutputRows = pCost->totalRows;
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2714,9 +2714,7 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo, pOperator);
|
applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
|
||||||
pInfo->limitInfo.numOfOutputRows += pResBlock->info.rows;
|
|
||||||
|
|
||||||
qDebug("%s get sorted row block, rows:%d, limit:%"PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
|
qDebug("%s get sorted row block, rows:%d, limit:%"PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
|
||||||
pInfo->limitInfo.numOfOutputRows);
|
pInfo->limitInfo.numOfOutputRows);
|
||||||
|
|
||||||
|
|
|
@ -222,6 +222,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// multi-group case not handle here
|
||||||
SSDataBlock* pBlock = NULL;
|
SSDataBlock* pBlock = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
|
pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
|
||||||
|
@ -236,28 +237,14 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo add the limit/offset info
|
// there are bugs?
|
||||||
if (pInfo->limitInfo.remainOffset > 0) {
|
bool limitReached = applyLimitOffset(&pInfo->limitInfo, pBlock, pTaskInfo);
|
||||||
if (pInfo->limitInfo.remainOffset >= blockDataGetNumOfRows(pBlock)) {
|
if (limitReached) {
|
||||||
pInfo->limitInfo.remainOffset -= pBlock->info.rows;
|
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
blockDataTrimFirstNRows(pBlock, pInfo->limitInfo.remainOffset);
|
|
||||||
pInfo->limitInfo.remainOffset = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInfo->limitInfo.limit.limit > 0 &&
|
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||||
pInfo->limitInfo.limit.limit <= pInfo->limitInfo.numOfOutputRows + blockDataGetNumOfRows(pBlock)) {
|
if (pBlock->info.rows > 0) {
|
||||||
int32_t remain = pInfo->limitInfo.limit.limit - pInfo->limitInfo.numOfOutputRows;
|
|
||||||
blockDataKeepFirstNRows(pBlock, remain);
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t numOfRows = blockDataGetNumOfRows(pBlock);
|
|
||||||
pInfo->limitInfo.numOfOutputRows += numOfRows;
|
|
||||||
pOperator->resultInfo.totalRows += numOfRows;
|
|
||||||
|
|
||||||
if (numOfRows > 0) {
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -557,7 +544,6 @@ typedef struct SMultiwayMergeOperatorInfo {
|
||||||
SSDataBlock* pIntermediateBlock; // to hold the intermediate result
|
SSDataBlock* pIntermediateBlock; // to hold the intermediate result
|
||||||
int64_t startTs; // sort start time
|
int64_t startTs; // sort start time
|
||||||
bool groupSort;
|
bool groupSort;
|
||||||
bool hasGroupId;
|
|
||||||
uint64_t groupId;
|
uint64_t groupId;
|
||||||
STupleHandle* prefetchedTuple;
|
STupleHandle* prefetchedTuple;
|
||||||
} SMultiwayMergeOperatorInfo;
|
} SMultiwayMergeOperatorInfo;
|
||||||
|
@ -604,7 +590,9 @@ int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity, SSDataBlock* p) {
|
static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity,
|
||||||
|
SSDataBlock* p, bool* newgroup) {
|
||||||
|
*newgroup = false;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
STupleHandle* pTupleHandle = NULL;
|
STupleHandle* pTupleHandle = NULL;
|
||||||
|
@ -613,8 +601,12 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle*
|
||||||
pTupleHandle = tsortNextTuple(pHandle);
|
pTupleHandle = tsortNextTuple(pHandle);
|
||||||
} else {
|
} else {
|
||||||
pTupleHandle = pInfo->prefetchedTuple;
|
pTupleHandle = pInfo->prefetchedTuple;
|
||||||
pInfo->groupId = tsortGetGroupId(pTupleHandle);
|
|
||||||
pInfo->prefetchedTuple = NULL;
|
pInfo->prefetchedTuple = NULL;
|
||||||
|
uint64_t gid = tsortGetGroupId(pTupleHandle);
|
||||||
|
if (gid != pInfo->groupId) {
|
||||||
|
*newgroup = true;
|
||||||
|
pInfo->groupId = gid;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pTupleHandle = tsortNextTuple(pHandle);
|
pTupleHandle = tsortNextTuple(pHandle);
|
||||||
|
@ -627,12 +619,10 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle*
|
||||||
|
|
||||||
if (pInfo->groupSort) {
|
if (pInfo->groupSort) {
|
||||||
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle);
|
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle);
|
||||||
if (!pInfo->hasGroupId) {
|
if (pInfo->groupId == 0 || pInfo->groupId == tupleGroupId) {
|
||||||
|
appendOneRowToDataBlock(p, pTupleHandle);
|
||||||
|
p->info.id.groupId = tupleGroupId;
|
||||||
pInfo->groupId = tupleGroupId;
|
pInfo->groupId = tupleGroupId;
|
||||||
pInfo->hasGroupId = true;
|
|
||||||
appendOneRowToDataBlock(p, pTupleHandle);
|
|
||||||
} else if (pInfo->groupId == tupleGroupId) {
|
|
||||||
appendOneRowToDataBlock(p, pTupleHandle);
|
|
||||||
} else {
|
} else {
|
||||||
pInfo->prefetchedTuple = pTupleHandle;
|
pInfo->prefetchedTuple = pTupleHandle;
|
||||||
break;
|
break;
|
||||||
|
@ -645,11 +635,6 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle*
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInfo->groupSort) {
|
|
||||||
pInfo->hasGroupId = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, SArray* pColMatchInfo,
|
SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, SArray* pColMatchInfo,
|
||||||
|
@ -673,14 +658,19 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* p = pInfo->pIntermediateBlock;
|
SSDataBlock* p = pInfo->pIntermediateBlock;
|
||||||
|
bool newgroup = false;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
doGetSortedBlockData(pInfo, pHandle, capacity, p);
|
doGetSortedBlockData(pInfo, pHandle, capacity, p, &newgroup);
|
||||||
if (p->info.rows == 0) {
|
if (p->info.rows == 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool limitReached = applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo, pOperator);
|
if (newgroup) {
|
||||||
|
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool limitReached = applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo);
|
||||||
if (limitReached) {
|
if (limitReached) {
|
||||||
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1016,7 +1016,7 @@ static int32_t createProjectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSel
|
||||||
|
|
||||||
TSWAP(pProject->node.pLimit, pSelect->pLimit);
|
TSWAP(pProject->node.pLimit, pSelect->pLimit);
|
||||||
TSWAP(pProject->node.pSlimit, pSelect->pSlimit);
|
TSWAP(pProject->node.pSlimit, pSelect->pSlimit);
|
||||||
pProject->ignoreGroupId = (NULL == pSelect->pPartitionByList);
|
pProject->ignoreGroupId = pSelect->isSubquery ? true : (NULL == pSelect->pPartitionByList);
|
||||||
pProject->node.groupAction =
|
pProject->node.groupAction =
|
||||||
(!pSelect->isSubquery && pCxt->pPlanCxt->streamQuery) ? GROUP_ACTION_KEEP : GROUP_ACTION_CLEAR;
|
(!pSelect->isSubquery && pCxt->pPlanCxt->streamQuery) ? GROUP_ACTION_KEEP : GROUP_ACTION_CLEAR;
|
||||||
pProject->node.requireDataOrder = DATA_ORDER_LEVEL_NONE;
|
pProject->node.requireDataOrder = DATA_ORDER_LEVEL_NONE;
|
||||||
|
|
|
@ -560,6 +560,8 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla
|
||||||
if (NULL == pMerge->node.pLimit) {
|
if (NULL == pMerge->node.pLimit) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
((SLimitNode*)pSplitNode->pLimit)->limit += ((SLimitNode*)pSplitNode->pLimit)->offset;
|
||||||
|
((SLimitNode*)pSplitNode->pLimit)->offset = 0;
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
if (NULL == pSubplan) {
|
if (NULL == pSubplan) {
|
||||||
|
|
Loading…
Reference in New Issue