diff --git a/include/util/tpagedbuf.h b/include/util/tpagedbuf.h index 708c6cf741..c1c246dd64 100644 --- a/include/util/tpagedbuf.h +++ b/include/util/tpagedbuf.h @@ -55,7 +55,7 @@ typedef struct SDiskbasedBufStatis { * @param handle * @return */ -int32_t createDiskbasedBuffer(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir); +int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir); /** * @@ -108,13 +108,13 @@ size_t getTotalBufSize(const SDiskbasedBuf* pBuf); * @param pBuf * @return */ -size_t getNumOfResultBufGroupId(const SDiskbasedBuf* pBuf); +size_t getNumOfBufGroupId(const SDiskbasedBuf* pBuf); /** * destroy result buffer * @param pBuf */ -void destroyResultBuf(SDiskbasedBuf* pBuf); +void destroyDiskbasedBuf(SDiskbasedBuf* pBuf); /** * diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index c5706c7e59..c7111f987d 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -627,7 +627,7 @@ SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema, int32_t numOfOutput); SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, void* param, SArray* pOrderVal, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo); // SSDataBlock* doGlobalAggregate(void* param, bool* newgroup); // SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 75ba620738..c7c72e615d 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2215,7 +2215,7 @@ static void teardownQueryRuntimeEnv(STaskRuntimeEnv *pRuntimeEnv) { destroyScalarFuncSupport(pRuntimeEnv->scalarSup, pQueryAttr->numOfOutput); // destroyUdfInfo(pRuntimeEnv->pUdfInfo); - destroyResultBuf(pRuntimeEnv->pResultBuf); + destroyDiskbasedBuf(pRuntimeEnv->pResultBuf); doFreeQueryHandle(pRuntimeEnv); destroyTsComp(pRuntimeEnv, pQueryAttr); @@ -4629,7 +4629,7 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr getIntermediateBufInfo(pRuntimeEnv, &ps, &pQueryAttr->intermediateResultRowSize); int32_t TENMB = 1024*1024*10; - int32_t code = createDiskbasedBuffer(&pRuntimeEnv->pResultBuf, ps, TENMB, pQInfo->qId, tsTempDir); + int32_t code = createDiskbasedBuf(&pRuntimeEnv->pResultBuf, ps, TENMB, pQInfo->qId, tsTempDir); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -5600,6 +5600,10 @@ SArray* getResultGroupCheckColumns(STaskAttr* pQuery) { } static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) { + SSortedMergeOperatorInfo* pInfo = (SSortedMergeOperatorInfo*) param; + taosArrayDestroy(pInfo->orderInfo); + destroySortHandle(pInfo->pSortHandle); + blockDataDestroy(pInfo->pDataBlock); } static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) { @@ -5655,7 +5659,10 @@ static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataB } SSDataBlock* loadNextDataBlock(void* param) { + SOperatorInfo* pOperator = (SOperatorInfo*) param; + bool newgroup = false; + return pOperator->exec(pOperator, &newgroup); } static SSDataBlock* doSortedMerge(void* param, bool* newgroup) { @@ -5672,14 +5679,17 @@ static SSDataBlock* doSortedMerge(void* param, bool* newgroup) { SSchema* p = blockDataExtractSchema(pInfo->pDataBlock, NULL); int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; - pInfo->pSortHandle = createSortHandle(pInfo->orderInfo, pInfo->nullFirst, SORT_SINGLESOURCE, pInfo->bufPageSize, + pInfo->pSortHandle = createSortHandle(pInfo->orderInfo, pInfo->nullFirst, SORT_MULTIWAY_MERGE, pInfo->bufPageSize, numOfBufPage, p, pInfo->pDataBlock->info.numOfCols, "GET_TASKID(pTaskInfo)"); tfree(p); setFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock); for(int32_t i = 0; i < pOperator->numOfDownstream; ++i) { - sortAddSource(pInfo->pSortHandle, pOperator->pDownstream[i]); + SOperatorSource* ps = calloc(1, sizeof(SOperatorSource)); + ps->param = pOperator->pDownstream[i]; + + sortAddSource(pInfo->pSortHandle, ps); } int32_t code = sortOpen(pInfo->pSortHandle); @@ -5714,7 +5724,7 @@ static SArray* createBlockOrder(SArray* pExprInfo, SArray* pOrderVal) { return pOrderInfo; } -SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, void* param, SArray* pOrderVal, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo) { SSortedMergeOperatorInfo* pInfo = calloc(1, sizeof(SSortedMergeOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 1d42f1f838..196e22d92d 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -39,7 +39,7 @@ typedef struct SSortHandle { bool nullFirst; bool hasVarCol; - SArray *pSources; + SArray *pSources; // TODO refactor, remove it SArray *pOrderedSource; _sort_fetch_block_fn_t fetchfp; @@ -119,6 +119,7 @@ void destroySortHandle(SSortHandle* pSortHandle) { tMergeTreeDestroy(pSortHandle->pMergeTree); } + destroyDiskbasedBuf(pSortHandle->pBuf); tfree(pSortHandle->idStr); tfree(pSortHandle); } @@ -171,7 +172,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { int32_t start = 0; if (pHandle->pBuf == NULL) { - int32_t code = createDiskbasedBuffer(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, 0, "/tmp"); + int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, 0, "/tmp"); setPrintStatis(pHandle->pBuf); if (code != TSDB_CODE_SUCCESS) { return code; @@ -235,7 +236,7 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int } else { // multi-pass internal merge sort is required if (pHandle->pBuf == NULL) { - code = createDiskbasedBuffer(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, 0, "/tmp"); + code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, 0, "/tmp"); setPrintStatis(pHandle->pBuf); if (code != TSDB_CODE_SUCCESS) { return code; @@ -614,7 +615,10 @@ int32_t sortOpen(SSortHandle* pHandle) { } int32_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource); - ASSERT(numOfSources <= getNumOfInMemBufPages(pHandle->pBuf)); + if (pHandle->pBuf != NULL) { + ASSERT(numOfSources <= getNumOfInMemBufPages(pHandle->pBuf)); + } + code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, 0, numOfSources - 1, pHandle); if (code != TSDB_CODE_SUCCESS) { return code; diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index 160bf1f7ca..2651b95b0f 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -35,10 +35,17 @@ namespace { +enum { + data_rand = 0x1, + data_asc = 0x2, + data_desc = 0x3, +}; + typedef struct SDummyInputInfo { int32_t max; int32_t current; int32_t startVal; + int32_t type; SSDataBlock* pBlock; } SDummyInputInfo; @@ -83,10 +90,18 @@ SSDataBlock* getDummyBlock(void* param, bool* newgroup) { char buf[128] = {0}; char b1[128] = {0}; + int32_t v = 0; for(int32_t i = 0; i < numOfRows; ++i) { SColumnInfoData* pColInfo = static_cast(TARRAY_GET_ELEM(pBlock->pDataBlock, 0)); - int32_t v = (--pInfo->startVal); + if (pInfo->type == data_desc) { + v = (--pInfo->startVal); + } else if (pInfo->type == data_asc) { + v = ++pInfo->startVal; + } else if (pInfo->type == data_rand) { + v = random(); + } + colDataAppend(pColInfo, i, reinterpret_cast(&v), false); // sprintf(buf, "this is %d row", i); @@ -103,7 +118,7 @@ SSDataBlock* getDummyBlock(void* param, bool* newgroup) { return pBlock; } -SOperatorInfo* createDummyOperator(int32_t numOfBlocks) { +SOperatorInfo* createDummyOperator(int32_t numOfBlocks, int32_t type) { SOperatorInfo* pOperator = static_cast(calloc(1, sizeof(SOperatorInfo))); pOperator->name = "dummyInputOpertor4Test"; pOperator->exec = getDummyBlock; @@ -111,6 +126,7 @@ SOperatorInfo* createDummyOperator(int32_t numOfBlocks) { SDummyInputInfo *pInfo = (SDummyInputInfo*) calloc(1, sizeof(SDummyInputInfo)); pInfo->max = numOfBlocks; pInfo->startVal = 1500000; + pInfo->type = type; pOperator->info = pInfo; return pOperator; @@ -121,6 +137,7 @@ int main(int argc, char** argv) { return RUN_ALL_TESTS(); } +#if 0 TEST(testCase, build_executor_tree_Test) { const char* msg = "{\n" "\t\"Id\":\t{\n" @@ -216,34 +233,34 @@ TEST(testCase, build_executor_tree_Test) { // int32_t code = qCreateExecTask(&handle, 2, 1, NULL, (void**) &pTaskInfo, &sinkHandle); } -//TEST(testCase, inMem_sort_Test) { -// SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder)); -// SOrder o = {.order = TSDB_ORDER_ASC}; -// o.col.info.colId = 1; -// o.col.info.type = TSDB_DATA_TYPE_INT; -// taosArrayPush(pOrderVal, &o); -// -// SArray* pExprInfo = taosArrayInit(4, sizeof(SExprInfo)); -// SExprInfo *exp = static_cast(calloc(1, sizeof(SExprInfo))); -// exp->base.resSchema = createSchema(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1, "res"); -// taosArrayPush(pExprInfo, &exp); -// -// SExprInfo *exp1 = static_cast(calloc(1, sizeof(SExprInfo))); -// exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1"); -// taosArrayPush(pExprInfo, &exp1); -// -// SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(5), pExprInfo, pOrderVal); -// -// bool newgroup = false; -// SSDataBlock* pRes = pOperator->exec(pOperator, &newgroup); -// -// SColumnInfoData* pCol1 = static_cast(taosArrayGet(pRes->pDataBlock, 0)); -// SColumnInfoData* pCol2 = static_cast(taosArrayGet(pRes->pDataBlock, 1)); -// for(int32_t i = 0; i < pRes->info.rows; ++i) { -// char* p = colDataGet(pCol2, i); -// printf("%d: %d, %s\n", i, ((int32_t*)pCol1->pData)[i], (char*)varDataVal(p)); -// } -//} +TEST(testCase, inMem_sort_Test) { + SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder)); + SOrder o = {.order = TSDB_ORDER_ASC}; + o.col.info.colId = 1; + o.col.info.type = TSDB_DATA_TYPE_INT; + taosArrayPush(pOrderVal, &o); + + SArray* pExprInfo = taosArrayInit(4, sizeof(SExprInfo)); + SExprInfo *exp = static_cast(calloc(1, sizeof(SExprInfo))); + exp->base.resSchema = createSchema(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1, "res"); + taosArrayPush(pExprInfo, &exp); + + SExprInfo *exp1 = static_cast(calloc(1, sizeof(SExprInfo))); + exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1"); + taosArrayPush(pExprInfo, &exp1); + + SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(5), pExprInfo, pOrderVal, NULL); + + bool newgroup = false; + SSDataBlock* pRes = pOperator->exec(pOperator, &newgroup); + + SColumnInfoData* pCol1 = static_cast(taosArrayGet(pRes->pDataBlock, 0)); + SColumnInfoData* pCol2 = static_cast(taosArrayGet(pRes->pDataBlock, 1)); + for(int32_t i = 0; i < pRes->info.rows; ++i) { + char* p = colDataGet(pCol2, i); + printf("%d: %d, %s\n", i, ((int32_t*)pCol1->pData)[i], (char*)varDataVal(p)); + } +} typedef struct su { int32_t v; @@ -339,6 +356,7 @@ TEST(testCase, external_sort_Test) { taosArrayDestroy(pExprInfo); taosArrayDestroy(pOrderVal); } +#endif TEST(testCase, sorted_merge_Test) { srand(time(NULL)); @@ -359,7 +377,13 @@ TEST(testCase, sorted_merge_Test) { exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1"); // taosArrayPush(pExprInfo, &exp1); - SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(1500), pExprInfo, pOrderVal, NULL); + int32_t numOfSources = 10; + SOperatorInfo** plist = (SOperatorInfo**) calloc(numOfSources, sizeof(void*)); + for(int32_t i = 0; i < numOfSources; ++i) { + plist[i] = createDummyOperator(1, data_asc); + } + + SOperatorInfo* pOperator = createSortedMergeOperatorInfo(plist, numOfSources, pExprInfo, pOrderVal, NULL); bool newgroup = false; SSDataBlock* pRes = NULL; diff --git a/source/libs/executor/test/executorUtilTests.cpp b/source/libs/executor/test/executorUtilTests.cpp index 57691d6d11..5c94b89c3d 100644 --- a/source/libs/executor/test/executorUtilTests.cpp +++ b/source/libs/executor/test/executorUtilTests.cpp @@ -219,50 +219,50 @@ int32_t docomp(const void* p1, const void* p2, void* param) { // destroySortHandle(phandle); //} -TEST(testCase, ordered_merge_sort_Test) { - SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder)); - SOrder o = {.order = TSDB_ORDER_ASC}; - o.col.info.colId = 1; - o.col.info.type = TSDB_DATA_TYPE_INT; - taosArrayPush(pOrderVal, &o); - - int32_t numOfRows = 1000; - SBlockOrderInfo oi = {0}; - oi.order = TSDB_ORDER_ASC; - oi.colIndex = 0; - SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); - taosArrayPush(orderInfo, &oi); - - SSchema s = {.type = TSDB_DATA_TYPE_INT, .colId = 1, .bytes = 4}; - SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_MULTIWAY_MERGE, 1024, 5, &s, 1,"test_abc"); - setFetchRawDataFp(phandle, getSingleColDummyBlock); - setComparFn(phandle, docomp); - - for(int32_t i = 0; i < 10; ++i) { - SOperatorSource* p = static_cast(calloc(1, sizeof(SOperatorSource))); - _info* c = static_cast<_info*>(calloc(1, sizeof(_info))); - c->count = 1; - c->pageRows = 1000; - c->startVal = 0; - - p->param = c; - sortAddSource(phandle, p); - } - - int32_t code = sortOpen(phandle); - int32_t row = 1; - - while(1) { - STupleHandle* pTupleHandle = sortNextTuple(phandle); - if (pTupleHandle == NULL) { - break; - } - - void* v = sortGetValue(pTupleHandle, 0); - printf("%d: %d\n", row++, *(int32_t*) v); - - } - destroySortHandle(phandle); -} +//TEST(testCase, ordered_merge_sort_Test) { +// SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder)); +// SOrder o = {.order = TSDB_ORDER_ASC}; +// o.col.info.colId = 1; +// o.col.info.type = TSDB_DATA_TYPE_INT; +// taosArrayPush(pOrderVal, &o); +// +// int32_t numOfRows = 1000; +// SBlockOrderInfo oi = {0}; +// oi.order = TSDB_ORDER_ASC; +// oi.colIndex = 0; +// SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); +// taosArrayPush(orderInfo, &oi); +// +// SSchema s = {.type = TSDB_DATA_TYPE_INT, .colId = 1, .bytes = 4}; +// SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_MULTIWAY_MERGE, 1024, 5, &s, 1,"test_abc"); +// setFetchRawDataFp(phandle, getSingleColDummyBlock); +// setComparFn(phandle, docomp); +// +// for(int32_t i = 0; i < 10; ++i) { +// SOperatorSource* p = static_cast(calloc(1, sizeof(SOperatorSource))); +// _info* c = static_cast<_info*>(calloc(1, sizeof(_info))); +// c->count = 1; +// c->pageRows = 1000; +// c->startVal = 0; +// +// p->param = c; +// sortAddSource(phandle, p); +// } +// +// int32_t code = sortOpen(phandle); +// int32_t row = 1; +// +// while(1) { +// STupleHandle* pTupleHandle = sortNextTuple(phandle); +// if (pTupleHandle == NULL) { +// break; +// } +// +// void* v = sortGetValue(pTupleHandle, 0); +// printf("%d: %d\n", row++, *(int32_t*) v); +// +// } +// destroySortHandle(phandle); +//} #pragma GCC diagnostic pop diff --git a/source/libs/function/src/tpercentile.c b/source/libs/function/src/tpercentile.c index 40731adc58..c6ab125362 100644 --- a/source/libs/function/src/tpercentile.c +++ b/source/libs/function/src/tpercentile.c @@ -254,7 +254,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, resetSlotInfo(pBucket); - int32_t ret = createDiskbasedBuffer(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, 1, tsTempDir); + int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, 1, tsTempDir); if (ret != 0) { tMemBucketDestroy(pBucket); return NULL; @@ -269,7 +269,7 @@ void tMemBucketDestroy(tMemBucket *pBucket) { return; } - destroyResultBuf(pBucket->pBuffer); + destroyDiskbasedBuf(pBucket->pBuffer); tfree(pBucket->pSlots); tfree(pBucket); } diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index d5415c219f..f014d8e07e 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -53,7 +53,7 @@ typedef struct SDiskbasedBuf { static void printStatisData(const SDiskbasedBuf* pBuf); -int32_t createDiskbasedBuffer(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir) { +int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir) { *pBuf = calloc(1, sizeof(SDiskbasedBuf)); SDiskbasedBuf* pResBuf = *pBuf; @@ -473,7 +473,7 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) { pBuf->statis.releasePages += 1; } -size_t getNumOfResultBufGroupId(const SDiskbasedBuf* pBuf) { return taosHashGetSize(pBuf->groupSet); } +size_t getNumOfBufGroupId(const SDiskbasedBuf* pBuf) { return taosHashGetSize(pBuf->groupSet); } size_t getTotalBufSize(const SDiskbasedBuf* pBuf) { return (size_t)pBuf->totalBufSize; } @@ -488,7 +488,7 @@ SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf, int32_t groupId) { } } -void destroyResultBuf(SDiskbasedBuf* pBuf) { +void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) { if (pBuf == NULL) { return; } diff --git a/source/util/test/pageBufferTest.cpp b/source/util/test/pageBufferTest.cpp index 8fa8216223..d310ff450b 100644 --- a/source/util/test/pageBufferTest.cpp +++ b/source/util/test/pageBufferTest.cpp @@ -13,7 +13,7 @@ namespace { // simple test void simpleTest() { SDiskbasedBuf* pResultBuf = NULL; - int32_t ret = createDiskbasedBuffer(&pResultBuf, 1024, 4096, 1, "/tmp/"); + int32_t ret = createDiskbasedBuf(&pResultBuf, 1024, 4096, 1, "/tmp/"); int32_t pageId = 0; int32_t groupId = 0; @@ -25,7 +25,7 @@ void simpleTest() { SIDList list = getDataBufPagesIdList(pResultBuf, groupId); ASSERT_EQ(taosArrayGetSize(list), 1); - ASSERT_EQ(getNumOfResultBufGroupId(pResultBuf), 1); + ASSERT_EQ(getNumOfBufGroupId(pResultBuf), 1); releaseBufPage(pResultBuf, pBufPage); @@ -50,12 +50,12 @@ void simpleTest() { SFilePage* t4 = getBufPage(pResultBuf, pageId); ASSERT_TRUE(t4 == pBufPage5); - destroyResultBuf(pResultBuf); + destroyDiskbasedBuf(pResultBuf); } void writeDownTest() { SDiskbasedBuf* pResultBuf = NULL; - int32_t ret = createDiskbasedBuffer(&pResultBuf, 1024, 4*1024, 1, "/tmp/"); + int32_t ret = createDiskbasedBuf(&pResultBuf, 1024, 4*1024, 1, "/tmp/"); int32_t pageId = 0; int32_t writePageId = 0; @@ -97,12 +97,12 @@ void writeDownTest() { SArray* pa = getDataBufPagesIdList(pResultBuf, groupId); ASSERT_EQ(taosArrayGetSize(pa), 5); - destroyResultBuf(pResultBuf); + destroyDiskbasedBuf(pResultBuf); } void recyclePageTest() { SDiskbasedBuf* pResultBuf = NULL; - int32_t ret = createDiskbasedBuffer(&pResultBuf, 1024, 4*1024, 1, "/tmp/"); + int32_t ret = createDiskbasedBuf(&pResultBuf, 1024, 4*1024, 1, "/tmp/"); int32_t pageId = 0; int32_t writePageId = 0; @@ -150,7 +150,7 @@ void recyclePageTest() { SArray* pa = getDataBufPagesIdList(pResultBuf, groupId); ASSERT_EQ(taosArrayGetSize(pa), 6); - destroyResultBuf(pResultBuf); + destroyDiskbasedBuf(pResultBuf); } } // namespace