refactor: do some internal refactor.
This commit is contained in:
parent
f3be445412
commit
6ae82b071e
|
@ -536,6 +536,7 @@ typedef struct SMultiwayMergeOperatorInfo {
|
||||||
SSortHandle* pSortHandle;
|
SSortHandle* pSortHandle;
|
||||||
SColMatchInfo matchInfo;
|
SColMatchInfo matchInfo;
|
||||||
SSDataBlock* pInputBlock;
|
SSDataBlock* pInputBlock;
|
||||||
|
SSDataBlock* pIntermediateBlock; // to hold the intermediate result
|
||||||
int64_t startTs; // sort start time
|
int64_t startTs; // sort start time
|
||||||
bool groupSort;
|
bool groupSort;
|
||||||
bool hasGroupId;
|
bool hasGroupId;
|
||||||
|
@ -635,12 +636,19 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
blockDataCleanup(pDataBlock);
|
blockDataCleanup(pDataBlock);
|
||||||
|
|
||||||
SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
|
if (pInfo->pIntermediateBlock == NULL) {
|
||||||
if (p == NULL) {
|
pInfo->pIntermediateBlock = tsortGetSortedDataBlock(pHandle);
|
||||||
return NULL;
|
if (pInfo->pIntermediateBlock == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
blockDataEnsureCapacity(pInfo->pIntermediateBlock, capacity);
|
||||||
|
} else {
|
||||||
|
blockDataCleanup(pInfo->pIntermediateBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
blockDataEnsureCapacity(p, capacity);
|
SSDataBlock* p = pInfo->pIntermediateBlock;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
doGetSortedBlockData(pInfo, pHandle, capacity, p);
|
doGetSortedBlockData(pInfo, pHandle, capacity, p);
|
||||||
if (p->info.rows == 0) {
|
if (p->info.rows == 0) {
|
||||||
|
@ -670,7 +678,6 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
|
||||||
pDataBlock->info.groupId = pInfo->groupId;
|
pDataBlock->info.groupId = pInfo->groupId;
|
||||||
}
|
}
|
||||||
|
|
||||||
blockDataDestroy(p);
|
|
||||||
qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%d", GET_TASKID(pTaskInfo), pDataBlock->info.groupId,
|
qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%d", GET_TASKID(pTaskInfo), pDataBlock->info.groupId,
|
||||||
pDataBlock->info.rows);
|
pDataBlock->info.rows);
|
||||||
|
|
||||||
|
@ -704,6 +711,7 @@ void destroyMultiwayMergeOperatorInfo(void* param) {
|
||||||
SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)param;
|
SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)param;
|
||||||
pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes);
|
pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes);
|
||||||
pInfo->pInputBlock = blockDataDestroy(pInfo->pInputBlock);
|
pInfo->pInputBlock = blockDataDestroy(pInfo->pInputBlock);
|
||||||
|
pInfo->pIntermediateBlock = blockDataDestroy(pInfo->pIntermediateBlock);
|
||||||
|
|
||||||
tsortDestroySortHandle(pInfo->pSortHandle);
|
tsortDestroySortHandle(pInfo->pSortHandle);
|
||||||
taosArrayDestroy(pInfo->pSortInfo);
|
taosArrayDestroy(pInfo->pSortInfo);
|
||||||
|
|
Loading…
Reference in New Issue