Merge branch 'feature/3.0_debug_wxy' of github.com:taosdata/TDengine into feature/3.0_debug_wxy
This commit is contained in:
commit
ac41f77669
|
@ -2829,29 +2829,31 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
STableMergeScanInfo* pInfo = pOperator->info;
|
STableMergeScanInfo* pInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
tsortDestroySortHandle(pInfo->pSortHandle);
|
size_t numReaders = taosArrayGetSize(pInfo->dataReaders);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numReaders; ++i) {
|
||||||
|
STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
|
||||||
|
blockDataDestroy(param->inputBlock);
|
||||||
|
}
|
||||||
taosArrayClear(pInfo->sortSourceParams);
|
taosArrayClear(pInfo->sortSourceParams);
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->dataReaders); ++i) {
|
tsortDestroySortHandle(pInfo->pSortHandle);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numReaders; ++i) {
|
||||||
STsdbReader* reader = taosArrayGetP(pInfo->dataReaders, i);
|
STsdbReader* reader = taosArrayGetP(pInfo->dataReaders, i);
|
||||||
tsdbReaderClose(reader);
|
tsdbReaderClose(reader);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(pInfo->dataReaders);
|
taosArrayDestroy(pInfo->dataReaders);
|
||||||
pInfo->dataReaders = NULL;
|
pInfo->dataReaders = NULL;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capacity, SOperatorInfo* pOperator) {
|
SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pResBlock, int32_t capacity, SOperatorInfo* pOperator) {
|
||||||
STableMergeScanInfo* pInfo = pOperator->info;
|
STableMergeScanInfo* pInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
|
blockDataCleanup(pResBlock);
|
||||||
if (p == NULL) {
|
blockDataEnsureCapacity(pResBlock, capacity);
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
blockDataEnsureCapacity(p, capacity);
|
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
|
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
|
||||||
|
@ -2859,14 +2861,15 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capa
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
appendOneRowToDataBlock(p, pTupleHandle);
|
appendOneRowToDataBlock(pResBlock, pTupleHandle);
|
||||||
if (p->info.rows >= capacity) {
|
if (pResBlock->info.rows >= capacity) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), p->info.rows);
|
|
||||||
return (p->info.rows > 0) ? p : NULL;
|
qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), pResBlock->info.rows);
|
||||||
|
return (pResBlock->info.rows > 0) ? pResBlock : NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
|
SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
|
@ -2895,7 +2898,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
SSDataBlock* pBlock = NULL;
|
SSDataBlock* pBlock = NULL;
|
||||||
while (pInfo->tableStartIndex < tableListSize) {
|
while (pInfo->tableStartIndex < tableListSize) {
|
||||||
pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator);
|
pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity, pOperator);
|
||||||
if (pBlock != NULL) {
|
if (pBlock != NULL) {
|
||||||
pBlock->info.groupId = pInfo->groupId;
|
pBlock->info.groupId = pInfo->groupId;
|
||||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||||
|
@ -2919,6 +2922,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) {
|
void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
|
STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
|
||||||
cleanupQueryTableDataCond(&pTableScanInfo->cond);
|
cleanupQueryTableDataCond(&pTableScanInfo->cond);
|
||||||
|
taosArrayDestroy(pTableScanInfo->sortSourceParams);
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->dataReaders); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->dataReaders); ++i) {
|
||||||
STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, i);
|
STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, i);
|
||||||
|
|
Loading…
Reference in New Issue