enhance: pass simple test

This commit is contained in:
slzhou 2023-07-16 20:28:54 +08:00
parent 8b6d7db7ad
commit f93af4d2e0
2 changed files with 6 additions and 14 deletions

View File

@ -238,6 +238,7 @@ typedef struct STableMergeScanInfo {
SArray* pSortInfo;
SSortHandle* pSortHandle;
SSDataBlock* pSortInputBlock;
SSDataBlock* pReaderBlock;
int64_t startTs; // sort start time
SArray* sortSourceParams;
SLimitInfo limitInfo;

View File

@ -53,7 +53,6 @@ typedef struct STableMergeScanSortSourceParam {
SOperatorInfo* pOperator;
int32_t readerIdx;
uint64_t uid;
SSDataBlock* inputBlock;
} STableMergeScanSortSourceParam;
typedef struct STableCountScanOperatorInfo {
@ -2735,7 +2734,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
int32_t readIdx = source->readerIdx;
SSDataBlock* pBlock = source->inputBlock;
SSDataBlock* pBlock = pInfo->pReaderBlock;
int32_t code = 0;
int64_t st = taosGetTimestampUs();
@ -2753,6 +2752,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
} else if (pInfo->readIdx != readIdx + pInfo->tableStartIndex) {
pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, p, 1);
pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond);
pInfo->readIdx = readIdx + pInfo->tableStartIndex ;
}
STsdbReader* reader = pInfo->base.dataReader;
@ -2802,8 +2802,6 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
return pBlock;
}
blockDataDestroy(source->inputBlock);
source->inputBlock = NULL;
return NULL;
}
@ -2872,7 +2870,6 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
STableMergeScanSortSourceParam param = {0};
param.readerIdx = i;
param.pOperator = pOperator;
param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
taosArrayPush(pInfo->sortSourceParams, &param);
}
@ -2906,10 +2903,6 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;
for (int32_t i = 0; i < taosArrayGetSize(pInfo->sortSourceParams); ++i) {
STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
blockDataDestroy(param->inputBlock);
}
taosArrayClear(pInfo->sortSourceParams);
tsortDestroySortHandle(pInfo->pSortHandle);
@ -3014,11 +3007,6 @@ void destroyTableMergeScanOperatorInfo(void* param) {
int32_t numOfTable = taosArrayGetSize(pTableScanInfo->sortSourceParams);
for (int32_t i = 0; i < numOfTable; i++) {
STableMergeScanSortSourceParam* p = taosArrayGet(pTableScanInfo->sortSourceParams, i);
blockDataDestroy(p->inputBlock);
}
pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
pTableScanInfo->base.dataReader = NULL;
@ -3030,6 +3018,7 @@ void destroyTableMergeScanOperatorInfo(void* param) {
pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);
pTableScanInfo->pReaderBlock = blockDataDestroy(pTableScanInfo->pReaderBlock);
taosArrayDestroy(pTableScanInfo->pSortInfo);
taosMemoryFreeClear(param);
@ -3115,6 +3104,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
int32_t rowSize = pInfo->pResBlock->info.rowSize;
uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock);
pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);