fix: use different datablock for each sort source

This commit is contained in:
slzhou 2022-06-15 20:12:04 +08:00
parent b6bddd3fb1
commit 8efaa65ade
1 changed files with 6 additions and 5 deletions

View File

@ -2052,17 +2052,20 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
typedef struct STableMergeScanSortSourceParam { typedef struct STableMergeScanSortSourceParam {
SOperatorInfo* pOperator; SOperatorInfo* pOperator;
int32_t readerIdx; int32_t readerIdx;
SSDataBlock* inputBlock;
} STableMergeScanSortSourceParam; } STableMergeScanSortSourceParam;
static SSDataBlock* getTableDataBlock(void* param) { static SSDataBlock* getTableDataBlock(void* param) {
STableMergeScanSortSourceParam* source = param; STableMergeScanSortSourceParam* source = param;
SOperatorInfo* pOperator = source->pOperator; SOperatorInfo* pOperator = source->pOperator;
int32_t readerIdx = source->readerIdx; int32_t readerIdx = source->readerIdx;
SSDataBlock* pBlock = source->inputBlock;
STableMergeScanInfo* pTableScanInfo = pOperator->info; STableMergeScanInfo* pTableScanInfo = pOperator->info;
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
blockDataCleanup(pBlock);
tsdbReaderT* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx); tsdbReaderT* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
while (tsdbNextDataBlock(reader)) { while (tsdbNextDataBlock(reader)) {
if (isTaskKilled(pOperator->pTaskInfo)) { if (isTaskKilled(pOperator->pTaskInfo)) {
@ -2097,7 +2100,6 @@ static SSDataBlock* getTableDataBlock(void* param) {
pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows; pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime;
return pBlock; return pBlock;
} }
return NULL; return NULL;
@ -2134,9 +2136,7 @@ int32_t doOpenTableMergeScanOperator(SOperatorInfo* pOperator) {
size_t numReaders = taosArrayGetSize(pInfo->dataReaders); size_t numReaders = taosArrayGetSize(pInfo->dataReaders);
for (int32_t i = 0; i < numReaders; ++i) { for (int32_t i = 0; i < numReaders; ++i) {
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
STableMergeScanSortSourceParam* param = taosMemoryCalloc(1, sizeof(STableMergeScanSortSourceParam)); STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
param->readerIdx = i;
param->pOperator = pOperator;
ps->param = param; ps->param = param;
tsortAddSource(pInfo->pSortHandle, ps); tsortAddSource(pInfo->pSortHandle, ps);
} }
@ -2310,6 +2310,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
STableMergeScanSortSourceParam* param = taosMemoryCalloc(1, sizeof(STableMergeScanSortSourceParam)); STableMergeScanSortSourceParam* param = taosMemoryCalloc(1, sizeof(STableMergeScanSortSourceParam));
param->readerIdx = i; param->readerIdx = i;
param->pOperator = pOperator; param->pOperator = pOperator;
param->inputBlock = createOneDataBlock(pInfo->pResBlock, false);
taosArrayPush(pInfo->sortSourceParams, param); taosArrayPush(pInfo->sortSourceParams, param);
taosMemoryFree(param); taosMemoryFree(param);
} }