refactor(query): do some internal refactor.
This commit is contained in:
parent
fc20be8699
commit
11c4878ca1
|
@ -279,7 +279,6 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
|
|||
|
||||
(*pIter)->pBlockLoadInfo = pBlockLoadInfo;
|
||||
|
||||
// size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
|
||||
if (!pBlockLoadInfo->sttBlockLoaded) {
|
||||
int64_t st = taosGetTimestampUs();
|
||||
pBlockLoadInfo->sttBlockLoaded = true;
|
||||
|
|
|
@ -1482,10 +1482,6 @@ void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray
|
|||
while (i < numOfSrcCols && j < taosArrayGetSize(pColMatchInfo)) {
|
||||
SColumnInfoData* p = taosArrayGet(pCols, i);
|
||||
SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, j);
|
||||
/* if (!outputEveryColumn && pmInfo->reserved) {
|
||||
j++;
|
||||
continue;
|
||||
}*/
|
||||
|
||||
if (p->info.colId == pmInfo->colId) {
|
||||
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, pmInfo->dstSlotId);
|
||||
|
|
|
@ -579,21 +579,8 @@ int32_t doOpenMultiwayMergeOperator(SOperatorInfo* pOperator) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity,
|
||||
SArray* pColMatchInfo, SOperatorInfo* pOperator) {
|
||||
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity, SSDataBlock* p) {
|
||||
|
||||
blockDataCleanup(pDataBlock);
|
||||
|
||||
SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
|
||||
if (p == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
blockDataEnsureCapacity(p, capacity);
|
||||
|
||||
_retry:
|
||||
while (1) {
|
||||
STupleHandle* pTupleHandle = NULL;
|
||||
if (pInfo->groupSort) {
|
||||
|
@ -638,22 +625,48 @@ _retry:
|
|||
pInfo->hasGroupId = false;
|
||||
}
|
||||
|
||||
if (p->info.rows > 0) { // todo extract method
|
||||
applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo, pOperator);
|
||||
if (p->info.rows == 0) {
|
||||
goto _retry;
|
||||
}
|
||||
|
||||
SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity,
|
||||
SArray* pColMatchInfo, SOperatorInfo* pOperator) {
|
||||
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
|
||||
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
blockDataCleanup(pDataBlock);
|
||||
|
||||
SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
|
||||
if (p == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
blockDataEnsureCapacity(p, capacity);
|
||||
|
||||
while (1) {
|
||||
doGetSortedBlockData(pInfo, pHandle, capacity, p);
|
||||
if (p->info.rows == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (p->info.rows > 0) {
|
||||
applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo, pOperator);
|
||||
if (p->info.rows > 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (p->info.rows > 0) {
|
||||
blockDataEnsureCapacity(pDataBlock, p->info.rows);
|
||||
|
||||
int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i);
|
||||
// ASSERT(pColMatchInfo-> == COL_MATCH_FROM_SLOT_ID);
|
||||
|
||||
SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
|
||||
SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId);
|
||||
colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
|
||||
}
|
||||
|
||||
pInfo->limitInfo.numOfOutputRows += p->info.rows;
|
||||
pDataBlock->info.rows = p->info.rows;
|
||||
pDataBlock->info.groupId = pInfo->groupId;
|
||||
|
|
Loading…
Reference in New Issue