From b0ecf209759c3b67400b7f938f1bc16cd789f389 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 8 Feb 2022 18:01:21 +0800 Subject: [PATCH] [td-11818] fix memory leak. --- include/common/common.h | 2 - include/common/tep.h | 1 + include/util/tlosertree.h | 2 + source/common/src/tep.c | 60 +++++++++++-- source/common/src/tname.c | 22 ----- source/common/test/commonTests.cpp | 2 +- source/libs/executor/inc/executorimpl.h | 12 ++- source/libs/executor/src/executorimpl.c | 111 ++++++++++++++---------- source/util/src/tlosertree.c | 17 +++- source/util/src/tpagedfile.c | 2 - 10 files changed, 141 insertions(+), 90 deletions(-) diff --git a/include/common/common.h b/include/common/common.h index c33689d66f..cd3c845f53 100644 --- a/include/common/common.h +++ b/include/common/common.h @@ -136,8 +136,6 @@ typedef struct SSessionWindow { #define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP) -void *destroySDataBlock(SSDataBlock *pBlock); - #ifdef __cplusplus } #endif diff --git a/include/common/tep.h b/include/common/tep.h index 7d81b829bc..5913e26057 100644 --- a/include/common/tep.h +++ b/include/common/tep.h @@ -57,6 +57,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows); void blockDataClearup(SSDataBlock* pDataBlock, bool hasVarCol); +void *blockDataDestroy(SSDataBlock *pBlock); #ifdef __cplusplus } diff --git a/include/util/tlosertree.h b/include/util/tlosertree.h index 7be4270dcb..736135f7e9 100644 --- a/include/util/tlosertree.h +++ b/include/util/tlosertree.h @@ -32,6 +32,8 @@ typedef struct SMultiwayMergeTreeInfo { int32_t tMergeTreeCreate(SMultiwayMergeTreeInfo **pTree, uint32_t numOfEntries, void *param, __merge_compare_fn_t compareFn); +void tMergeTreeDestroy(SMultiwayMergeTreeInfo* pTree); + void tMergeTreeAdjust(SMultiwayMergeTreeInfo *pTree, int32_t idx); void tMergeTreeRebuild(SMultiwayMergeTreeInfo *pTree); diff --git a/source/common/src/tep.c b/source/common/src/tep.c index a5a9e8dac7..cc571411e6 100644 --- a/source/common/src/tep.c +++ b/source/common/src/tep.c @@ -166,7 +166,7 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con char* buf = realloc(pColumnInfoData->pData, newSize); if (buf == NULL) { - // TODO handle the malloc failure. + return TSDB_CODE_OUT_OF_MEMORY; } pColumnInfoData->pData = buf; @@ -621,7 +621,8 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) { return 0; } -static void doAssignOneTuple(SColumnInfoData* pDstCols, int32_t numOfRows, const SSDataBlock* pSrcBlock, int32_t tupleIndex) { +static int32_t doAssignOneTuple(SColumnInfoData* pDstCols, int32_t numOfRows, const SSDataBlock* pSrcBlock, int32_t tupleIndex) { + int32_t code = 0; int32_t numOfCols = pSrcBlock->info.numOfCols; for (int32_t i = 0; i < numOfCols; ++i) { @@ -630,18 +631,29 @@ static void doAssignOneTuple(SColumnInfoData* pDstCols, int32_t numOfRows, const bool isNull = colDataIsNull(pSrc, pSrcBlock->info.rows, tupleIndex, NULL); if (isNull) { - colDataAppend(pDst, numOfRows, NULL, true); + code = colDataAppend(pDst, numOfRows, NULL, true); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } else { char* p = colDataGet((SColumnInfoData*)pSrc, tupleIndex); - colDataAppend(pDst, numOfRows, p, false); + code = colDataAppend(pDst, numOfRows, p, false); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } } } -static void blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataBlock, int32_t* index) { +static int32_t blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataBlock, int32_t* index) { for (int32_t i = 0; i < pDataBlock->info.rows; ++i) { - doAssignOneTuple(pCols, i, pDataBlock, index[i]); + int32_t code = doAssignOneTuple(pCols, i, pDataBlock, index[i]); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } + + return TSDB_CODE_SUCCESS; } static SColumnInfoData* createHelpColInfoData(const SSDataBlock* pDataBlock) { @@ -668,7 +680,7 @@ static SColumnInfoData* createHelpColInfoData(const SSDataBlock* pDataBlock) { return pCols; } -static int32_t copyBackToBlock(SSDataBlock* pDataBlock, SColumnInfoData* pCols) { +static void copyBackToBlock(SSDataBlock* pDataBlock, SColumnInfoData* pCols) { int32_t numOfCols = pDataBlock->info.numOfCols; for(int32_t i = 0; i < numOfCols; ++i) { @@ -742,7 +754,12 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs #endif int64_t p2 = taosGetTimestampUs(); - blockDataAssign(pCols, pDataBlock, index); + int32_t code = blockDataAssign(pCols, pDataBlock, index); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return code; + } + int64_t p3 = taosGetTimestampUs(); #if 0 @@ -762,6 +779,8 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs printf("sort:%ld, create:%ld, assign:%ld, copyback:%ld\n", p1-p0, p2 - p1, p3 - p2, p4-p3); destroyTupleIndex(index); + + return TSDB_CODE_SUCCESS; } void blockDataClearup(SSDataBlock* pDataBlock, bool hasVarCol) { @@ -810,4 +829,29 @@ int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) { p->pData = tmp; } } + + return TSDB_CODE_SUCCESS; +} + +void* blockDataDestroy(SSDataBlock* pBlock) { + if (pBlock == NULL) { + return NULL; + } + + int32_t numOfOutput = pBlock->info.numOfCols; + for(int32_t i = 0; i < numOfOutput; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { + tfree(pColInfoData->varmeta.offset); + } else { + tfree(pColInfoData->nullbitmap); + } + + tfree(pColInfoData->pData); + } + + taosArrayDestroy(pBlock->pDataBlock); + tfree(pBlock->pBlockAgg); + tfree(pBlock); + return NULL; } \ No newline at end of file diff --git a/source/common/src/tname.c b/source/common/src/tname.c index 4d606b0bdf..f3deb84ccf 100644 --- a/source/common/src/tname.c +++ b/source/common/src/tname.c @@ -271,25 +271,3 @@ SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* nam return s; } -void* destroySDataBlock(SSDataBlock* pBlock) { - if (pBlock == NULL) { - return NULL; - } - - int32_t numOfOutput = pBlock->info.numOfCols; - for(int32_t i = 0; i < numOfOutput; ++i) { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); - if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { - tfree(pColInfoData->varmeta.offset); - } else { - tfree(pColInfoData->nullbitmap); - } - - tfree(pColInfoData->pData); - } - - taosArrayDestroy(pBlock->pDataBlock); - tfree(pBlock->pBlockAgg); - tfree(pBlock); - return NULL; -} diff --git a/source/common/test/commonTests.cpp b/source/common/test/commonTests.cpp index 13e053b034..e9e8d086b3 100644 --- a/source/common/test/commonTests.cpp +++ b/source/common/test/commonTests.cpp @@ -170,7 +170,7 @@ TEST(testCase, Datablock_test) { taosArrayPush(pOrderInfo, &order); blockDataSort(b, pOrderInfo, true); - destroySDataBlock(b); + blockDataDestroy(b); taosArrayDestroy(pOrderInfo); } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 25877468b4..3783608d97 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -557,19 +557,25 @@ typedef struct SMultiwayMergeInfo { bool multiGroupResults; } SMultiwayMergeInfo; +typedef struct SMsortComparParam { + struct SExternalMemSource **pSources; + int32_t numOfSources; + SArray *orderInfo; // SArray + bool nullFirst; +} SMsortComparParam; + typedef struct SOrderOperatorInfo { int32_t sourceId; uint32_t sortBufSize; // max buffer size for in-memory sort - SArray *orderInfo; // SArray SSDataBlock *pDataBlock; - bool nullFirst; // null value is put in the front bool hasVarCol; // has variable length column, such as binary/varchar/nchar - int32_t numOfSources; int32_t numOfCompleted; SDiskbasedBuf *pSortInternalBuf; SMultiwayMergeTreeInfo *pMergeTree; SArray *pSources; // SArray int32_t capacity; + + SMsortComparParam cmpParam; } SOrderOperatorInfo; SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pSchema, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 0e12963250..d3e22b33f0 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5423,7 +5423,7 @@ static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) { static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) { SSLimitOperatorInfo *pInfo = (SSLimitOperatorInfo*) param; taosArrayDestroy(pInfo->orderColumnList); - pInfo->pRes = destroySDataBlock(pInfo->pRes); + pInfo->pRes = blockDataDestroy(pInfo->pRes); tfree(pInfo->prevRow); } @@ -5543,13 +5543,6 @@ typedef struct SExternalMemSource { SSDataBlock *pBlock; } SExternalMemSource; -typedef struct SMsortComparParam { - SExternalMemSource **pSources; - int32_t num; - SArray *orderInfo; // SArray - bool nullFirst; -} SMsortComparParam; - int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) { int32_t pLeftIdx = *(int32_t *)pLeft; int32_t pRightIdx = *(int32_t *)pRight; @@ -5692,8 +5685,11 @@ static int32_t doAddNewSource(SOrderOperatorInfo* pInfo, int32_t numOfCols) { } taosArrayPush(pInfo->pSources, &pSource); - pInfo->sourceId += 1; + pInfo->sourceId += 1; + pInfo->cmpParam.numOfSources += 1; + + ASSERT(pInfo->cmpParam.numOfSources == taosArrayGetSize(pInfo->pSources)); return blockDataEnsureCapacity(pSource->pBlock, pInfo->capacity); } @@ -5712,6 +5708,7 @@ void addToDiskbasedBuf(SOrderOperatorInfo* pInfo, jmp_buf env) { SFilePage* pPage = getNewDataBuf(pInfo->pSortInternalBuf, pInfo->sourceId, &pageId); blockDataToBuf(pPage->data, p); + blockDataDestroy(p); start = stop + 1; } @@ -5725,12 +5722,9 @@ void addToDiskbasedBuf(SOrderOperatorInfo* pInfo, jmp_buf env) { } static int32_t sortComparInit(SMsortComparParam* cmpParam, const SOrderOperatorInfo* pInfo) { - cmpParam->nullFirst = pInfo->nullFirst; - cmpParam->orderInfo = pInfo->orderInfo; - cmpParam->num = pInfo->numOfSources; cmpParam->pSources = pInfo->pSources->pData; - for(int32_t i = 0; i < pInfo->numOfSources; ++i) { + for(int32_t i = 0; i < pInfo->cmpParam.numOfSources; ++i) { SExternalMemSource* pSource = cmpParam->pSources[i]; SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex); @@ -5740,6 +5734,34 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, const SOrderOperatorI return code; } } + + return TSDB_CODE_SUCCESS; +} + +static SSDataBlock* getSortedBlockData(SExecTaskInfo* pTaskInfo, SOrderOperatorInfo* pInfo, SMsortComparParam* cmpParam) { + blockDataClearup(pInfo->pDataBlock, pInfo->hasVarCol); + + while(1) { + if (pInfo->cmpParam.numOfSources == pInfo->numOfCompleted) { + break; + } + + int32_t index = tMergeTreeGetChosenIndex(pInfo->pMergeTree); + + SExternalMemSource *pSource = (*cmpParam).pSources[index]; + appendOneRowToDataBlock(pInfo->pDataBlock, pSource->pBlock, &pSource->rowIndex); + + int32_t code = adjustMergeTreeForNextTuple(pSource, pInfo->pMergeTree, pInfo); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, code); + } + + if (pInfo->pDataBlock->info.rows >= pInfo->capacity) { + return pInfo->pDataBlock; + } + } + + return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL; } static SSDataBlock* doSort(void* param, bool* newgroup) { @@ -5752,6 +5774,10 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { SOrderOperatorInfo* pInfo = pOperator->info; SSDataBlock* pBlock = NULL; + if (pOperator->status == OP_RES_TO_RETURN) { + return getSortedBlockData(pTaskInfo, pInfo, &pInfo->cmpParam); + } + while(1) { publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup); @@ -5771,7 +5797,7 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { if (size > pInfo->sortBufSize) { // Perform the in-memory sort and then flush data in the buffer into disk. int64_t p = taosGetTimestampUs(); - blockDataSort(pInfo->pDataBlock, pInfo->orderInfo, pInfo->nullFirst); + blockDataSort(pInfo->pDataBlock, pInfo->cmpParam.orderInfo, pInfo->cmpParam.nullFirst); printf("sort time:%ld\n", taosGetTimestampUs() - p); addToDiskbasedBuf(pInfo, pTaskInfo->env); @@ -5779,50 +5805,31 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { } if (pInfo->pDataBlock->info.rows > 0) { - pInfo->numOfSources += 1; - // Perform the in-memory sort and then flush data in the buffer into disk. - blockDataSort(pInfo->pDataBlock, pInfo->orderInfo, pInfo->nullFirst); + blockDataSort(pInfo->pDataBlock, pInfo->cmpParam.orderInfo, pInfo->cmpParam.nullFirst); // All sorted data are resident in memory, external memory sort is not needed. // Return to the upstream operator directly if (isAllDataInMemBuf(pInfo->pSortInternalBuf)) { - pOperator->status = OP_RES_TO_RETURN; + pOperator->status = OP_EXEC_DONE; return (pInfo->pDataBlock->info.rows == 0)? NULL:pInfo->pDataBlock; } addToDiskbasedBuf(pInfo, pTaskInfo->env); } - SMsortComparParam cmpParam = {0}; - int32_t code = sortComparInit(&cmpParam, pInfo); + int32_t code = sortComparInit(&pInfo->cmpParam, pInfo); if (code != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, code); } - code = tMergeTreeCreate(&pInfo->pMergeTree, pInfo->numOfSources, &cmpParam, msortComparFn); + code = tMergeTreeCreate(&pInfo->pMergeTree, pInfo->cmpParam.numOfSources, &pInfo->cmpParam, msortComparFn); if (code != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, code); } - while(1) { - if (pInfo->numOfSources == pInfo->numOfCompleted) { - break; - } - - SExternalMemSource *pSource = cmpParam.pSources[tMergeTreeGetChosenIndex(pInfo->pMergeTree)]; - appendOneRowToDataBlock(pInfo->pDataBlock, pSource->pBlock, &pSource->rowIndex); - code = adjustMergeTreeForNextTuple(pSource, pInfo->pMergeTree, pInfo); - if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, code); - } - - if (pInfo->pDataBlock->info.rows >= pInfo->capacity) { - return pInfo->pDataBlock; - } - } - - return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL; + pOperator->status = OP_RES_TO_RETURN; + return getSortedBlockData(pTaskInfo, pInfo, &pInfo->cmpParam); } static SArray* createBlockOrder(SArray* pExprInfo, SArray* pOrderVal) { @@ -5852,10 +5859,10 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo)); pInfo->sortBufSize = 1024 * 1024; // 1MB - pInfo->capacity = 4096; - pInfo->pDataBlock = createOutputBuf_rv(pExprInfo, pInfo->capacity); - pInfo->orderInfo = createBlockOrder(pExprInfo, pOrderVal); - pInfo->pSources = taosArrayInit(4, POINTER_BYTES); + pInfo->capacity = 4096; + pInfo->pDataBlock = createOutputBuf_rv(pExprInfo, pInfo->capacity); + pInfo->pSources = taosArrayInit(4, POINTER_BYTES); + pInfo->cmpParam.orderInfo = createBlockOrder(pExprInfo, pOrderVal); for(int32_t i = 0; i < taosArrayGetSize(pExprInfo); ++i) { SExprInfo* pExpr = taosArrayGetP(pExprInfo, i); @@ -5865,6 +5872,7 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI } } + // TODO check error int32_t code = createDiskbasedBuffer(&pInfo->pSortInternalBuf, 4096, 4096*1000, 1, "/tmp/"); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); @@ -6850,7 +6858,7 @@ static void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) { tfree(pInfo->rowCellInfoOffset); cleanupResultRowInfo(&pInfo->resultRowInfo); - pInfo->pRes = destroySDataBlock(pInfo->pRes); + pInfo->pRes = blockDataDestroy(pInfo->pRes); } static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) { @@ -6874,7 +6882,7 @@ static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) { static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) { SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param; pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo); - pInfo->pRes = destroySDataBlock(pInfo->pRes); + pInfo->pRes = blockDataDestroy(pInfo->pRes); tfree(pInfo->p); } @@ -6891,12 +6899,19 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { STagScanInfo* pInfo = (STagScanInfo*) param; - pInfo->pRes = destroySDataBlock(pInfo->pRes); + pInfo->pRes = blockDataDestroy(pInfo->pRes); } static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param; - pInfo->pDataBlock = destroySDataBlock(pInfo->pDataBlock); + pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock); + + taosArrayDestroy(pInfo->cmpParam.orderInfo); + destroyResultBuf(pInfo->pSortInternalBuf); + + tMergeTreeDestroy(pInfo->pMergeTree); + +// for(int32_t i = 0; i < pInfo->cmpParam.pSources) } static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) { @@ -6909,7 +6924,7 @@ static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) { taosHashCleanup(pInfo->pSet); tfree(pInfo->buf); taosArrayDestroy(pInfo->pDistinctDataInfo); - pInfo->pRes = destroySDataBlock(pInfo->pRes); + pInfo->pRes = blockDataDestroy(pInfo->pRes); } SOperatorInfo* createMultiTableAggOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { diff --git a/source/util/src/tlosertree.c b/source/util/src/tlosertree.c index 7ccd49a64e..53dabbdbdd 100644 --- a/source/util/src/tlosertree.c +++ b/source/util/src/tlosertree.c @@ -14,8 +14,9 @@ */ #include "os.h" -#include "tlosertree.h" #include "ulog.h" +#include "tlosertree.h" +#include "taoserror.h" typedef struct STreeNode { int32_t index; @@ -39,12 +40,12 @@ int32_t tMergeTreeCreate(SMultiwayMergeTreeInfo** pTree, uint32_t numOfSources, int32_t totalEntries = numOfSources << 1u; SMultiwayMergeTreeInfo* pTreeInfo = (SMultiwayMergeTreeInfo*)calloc(1, sizeof(SMultiwayMergeTreeInfo) + sizeof(STreeNode) * totalEntries); - if ((*pTree) == NULL) { + if (pTreeInfo == NULL) { uError("allocate memory for loser-tree failed. reason:%s", strerror(errno)); - return -1; + return TAOS_SYSTEM_ERROR(errno); } - pTreeInfo->pNode = (STreeNode*)(((char*)(*pTree)) + sizeof(SMultiwayMergeTreeInfo)); + pTreeInfo->pNode = (STreeNode*)(((char*)pTreeInfo) + sizeof(SMultiwayMergeTreeInfo)); pTreeInfo->numOfSources = numOfSources; pTreeInfo->totalSources = totalEntries; @@ -73,6 +74,14 @@ int32_t tMergeTreeCreate(SMultiwayMergeTreeInfo** pTree, uint32_t numOfSources, return 0; } +void tMergeTreeDestroy(SMultiwayMergeTreeInfo* pTree) { + if (pTree == NULL) { + return; + } + + tfree(pTree); +} + void tMergeTreeAdjust(SMultiwayMergeTreeInfo* pTree, int32_t idx) { assert(idx <= pTree->totalSources - 1 && idx >= pTree->numOfSources && pTree->totalSources >= 2); diff --git a/source/util/src/tpagedfile.c b/source/util/src/tpagedfile.c index 3e9b99cb21..2d8fdb1fc3 100644 --- a/source/util/src/tpagedfile.c +++ b/source/util/src/tpagedfile.c @@ -510,5 +510,3 @@ int32_t getBufPageSize(const SDiskbasedBuf* pResultBuf) { bool isAllDataInMemBuf(const SDiskbasedBuf* pResultBuf) { return pResultBuf->fileSize == 0; } - -