refactor reader
This commit is contained in:
parent
63827f7551
commit
419e501002
|
@ -4725,9 +4725,8 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
STableListInfo* tableListInfo = pInfo->tableListInfo;
|
STableListInfo* tableListInfo = pInfo->tableListInfo;
|
||||||
|
|
||||||
pInfo->dataReaders = taosArrayInit(64, POINTER_BYTES);
|
// pInfo->dataReaders = taosArrayInit(64, POINTER_BYTES);
|
||||||
pInfo->pReader = NULL;
|
pInfo->pReader = NULL;
|
||||||
|
|
||||||
// todo the total available buffer should be determined by total capacity of buffer of this task.
|
// todo the total available buffer should be determined by total capacity of buffer of this task.
|
||||||
// the additional one is reserved for merge result
|
// the additional one is reserved for merge result
|
||||||
pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
|
pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
|
||||||
|
@ -4775,7 +4774,7 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
STableMergeScanInfo* pInfo = pOperator->info;
|
STableMergeScanInfo* pInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
size_t numReaders = taosArrayGetSize(pInfo->dataReaders);
|
int32_t numOfTable = taosArrayGetSize(pInfo->queryConds);
|
||||||
|
|
||||||
SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
|
SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
|
||||||
pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
|
pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
|
||||||
|
@ -4784,7 +4783,7 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
|
pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
|
||||||
pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;
|
pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;
|
||||||
|
|
||||||
for (int32_t i = 0; i < numReaders; ++i) {
|
for (int32_t i = 0; i < numOfTable; ++i) {
|
||||||
STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
|
STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
|
||||||
blockDataDestroy(param->inputBlock);
|
blockDataDestroy(param->inputBlock);
|
||||||
}
|
}
|
||||||
|
@ -4792,10 +4791,6 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
tsortDestroySortHandle(pInfo->pSortHandle);
|
tsortDestroySortHandle(pInfo->pSortHandle);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numReaders; ++i) {
|
|
||||||
STsdbReader* reader = taosArrayGetP(pInfo->dataReaders, i);
|
|
||||||
tsdbReaderClose(reader);
|
|
||||||
}
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->queryConds); i++) {
|
for (int32_t i = 0; i < taosArrayGetSize(pInfo->queryConds); i++) {
|
||||||
SQueryTableDataCond* cond = taosArrayGet(pInfo->queryConds, i);
|
SQueryTableDataCond* cond = taosArrayGet(pInfo->queryConds, i);
|
||||||
taosMemoryFree(cond->colList);
|
taosMemoryFree(cond->colList);
|
||||||
|
@ -4803,8 +4798,6 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
taosArrayDestroy(pInfo->queryConds);
|
taosArrayDestroy(pInfo->queryConds);
|
||||||
pInfo->queryConds = NULL;
|
pInfo->queryConds = NULL;
|
||||||
|
|
||||||
taosArrayDestroy(pInfo->dataReaders);
|
|
||||||
pInfo->dataReaders = NULL;
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4888,15 +4881,17 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
void destroyTableMergeScanOperatorInfo(void* param) {
|
void destroyTableMergeScanOperatorInfo(void* param) {
|
||||||
STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
|
STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
|
||||||
cleanupQueryTableDataCond(&pTableScanInfo->cond);
|
cleanupQueryTableDataCond(&pTableScanInfo->cond);
|
||||||
|
|
||||||
|
int32_t numOfTable = taosArrayGetSize(pTableScanInfo->queryConds);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfTable; i++) {
|
||||||
|
STableMergeScanSortSourceParam* param = taosArrayGet(pTableScanInfo->sortSourceParams, i);
|
||||||
|
blockDataDestroy(param->inputBlock);
|
||||||
|
}
|
||||||
taosArrayDestroy(pTableScanInfo->sortSourceParams);
|
taosArrayDestroy(pTableScanInfo->sortSourceParams);
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->dataReaders); ++i) {
|
|
||||||
STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, i);
|
|
||||||
tsdbReaderClose(reader);
|
|
||||||
}
|
|
||||||
taosArrayDestroy(pTableScanInfo->dataReaders);
|
|
||||||
|
|
||||||
tsdbReaderClose(pTableScanInfo->pReader);
|
tsdbReaderClose(pTableScanInfo->pReader);
|
||||||
|
pTableScanInfo->pReader = NULL;
|
||||||
|
|
||||||
for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) {
|
for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) {
|
||||||
SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i);
|
SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i);
|
||||||
|
|
Loading…
Reference in New Issue