fix: fix memleak during create initial sources for block merge sort
This commit is contained in:
		
							parent
							
								
									2ef67b326a
								
							
						
					
					
						commit
						bcf4c1ebae
					
				| 
						 | 
					@ -2766,7 +2766,7 @@ _error:
 | 
				
			||||||
  return NULL;
 | 
					  return NULL;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static SSDataBlock* getTableDataBlockImpl(void* param) {
 | 
					static SSDataBlock* getBlockForTableMergeScan(void* param) {
 | 
				
			||||||
  STableMergeScanSortSourceParam* source = param;
 | 
					  STableMergeScanSortSourceParam* source = param;
 | 
				
			||||||
  SOperatorInfo*                  pOperator = source->pOperator;
 | 
					  SOperatorInfo*                  pOperator = source->pOperator;
 | 
				
			||||||
  STableMergeScanInfo*            pInfo = pOperator->info;
 | 
					  STableMergeScanInfo*            pInfo = pOperator->info;
 | 
				
			||||||
| 
						 | 
					@ -2784,6 +2784,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
 | 
				
			||||||
    code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext);
 | 
					    code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext);
 | 
				
			||||||
    if (code != 0) {
 | 
					    if (code != 0) {
 | 
				
			||||||
      pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
 | 
					      pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
 | 
				
			||||||
 | 
					      qError("table merge scan fetch next data block error code: %d, %s", code, GET_TASKID(pTaskInfo));
 | 
				
			||||||
      T_LONG_JMP(pTaskInfo->env, code);
 | 
					      T_LONG_JMP(pTaskInfo->env, code);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -2792,8 +2793,9 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if (isTaskKilled(pTaskInfo)) {
 | 
					    if (isTaskKilled(pTaskInfo)) {
 | 
				
			||||||
 | 
					      qInfo("table merge scan fetch next data block found task killed. %s", GET_TASKID(pTaskInfo));
 | 
				
			||||||
      pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
 | 
					      pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
 | 
				
			||||||
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
 | 
					      break;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    // process this data block based on the probabilities
 | 
					    // process this data block based on the probabilities
 | 
				
			||||||
| 
						 | 
					@ -2806,6 +2808,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
 | 
				
			||||||
    code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
 | 
					    code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
 | 
				
			||||||
    //    code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
 | 
					    //    code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
 | 
				
			||||||
    if (code != TSDB_CODE_SUCCESS) {
 | 
					    if (code != TSDB_CODE_SUCCESS) {
 | 
				
			||||||
 | 
					      qInfo("table merge scan load datablock code %d, %s", code, GET_TASKID(pTaskInfo));
 | 
				
			||||||
      T_LONG_JMP(pTaskInfo->env, code);
 | 
					      T_LONG_JMP(pTaskInfo->env, code);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -2896,7 +2899,8 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
 | 
				
			||||||
                                          
 | 
					                                          
 | 
				
			||||||
    tsortSetMergeLimit(pInfo->pSortHandle, mergeLimit);
 | 
					    tsortSetMergeLimit(pInfo->pSortHandle, mergeLimit);
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
  tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL);
 | 
					
 | 
				
			||||||
 | 
					  tsortSetFetchRawDataFp(pInfo->pSortHandle, getBlockForTableMergeScan, NULL, NULL);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  // one table has one data block
 | 
					  // one table has one data block
 | 
				
			||||||
  int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
 | 
					  int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1049,12 +1049,24 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    if (pBlk == NULL) {
 | 
					    if (pBlk == NULL) {
 | 
				
			||||||
      break;
 | 
					      break;
 | 
				
			||||||
    };
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    if (tsortIsClosed(pHandle)) {
 | 
				
			||||||
 | 
					      tSimpleHashClear(mUidBlk);
 | 
				
			||||||
 | 
					      for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
 | 
				
			||||||
 | 
					        blockDataDestroy(taosArrayGetP(aBlkSort, i));
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					      taosArrayClear(aBlkSort);
 | 
				
			||||||
 | 
					      break;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  tSimpleHashCleanup(mUidBlk);
 | 
					  tSimpleHashCleanup(mUidBlk);
 | 
				
			||||||
  taosArrayDestroy(aBlkSort);
 | 
					  taosArrayDestroy(aBlkSort);
 | 
				
			||||||
  tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
 | 
					  tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
 | 
				
			||||||
  taosArrayAddAll(pHandle->pOrderedSource, aExtSrc);
 | 
					  if (!tsortIsClosed(pHandle)) {
 | 
				
			||||||
 | 
					    taosArrayAddAll(pHandle->pOrderedSource, aExtSrc);
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
  taosArrayDestroy(aExtSrc);
 | 
					  taosArrayDestroy(aExtSrc);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  pHandle->type = SORT_SINGLESOURCE_SORT;
 | 
					  pHandle->type = SORT_SINGLESOURCE_SORT;
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue