diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 324011f238..e4081ddf0d 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3061,14 +3061,12 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, if (pHandle->currentPage == -1) { pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage); if (pPage == NULL) { - terrno = TSDB_CODE_NO_AVAIL_DISK; return terrno; } pPage->num = sizeof(SFilePage); } else { pPage = getBufPage(pHandle->pBuf, pHandle->currentPage); if (pPage == NULL) { - terrno = TSDB_CODE_NO_AVAIL_DISK; return terrno; } if (pPage->num + length > getBufPageSize(pHandle->pBuf)) { @@ -3076,7 +3074,6 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, releaseBufPage(pHandle->pBuf, pPage); pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage); if (pPage == NULL) { - terrno = TSDB_CODE_NO_AVAIL_DISK; return terrno; } pPage->num = sizeof(SFilePage); @@ -3123,7 +3120,6 @@ static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf if (pHandle->pBuf != NULL) { SFilePage* pPage = getBufPage(pHandle->pBuf, pPos->pageId); if (pPage == NULL) { - terrno = TSDB_CODE_NO_AVAIL_DISK; return terrno; } memcpy(pPage->data + pPos->offset, pBuf, length); diff --git a/source/libs/function/src/tpercentile.c b/source/libs/function/src/tpercentile.c index 871b668cfd..6b85155486 100644 --- a/source/libs/function/src/tpercentile.c +++ b/source/libs/function/src/tpercentile.c @@ -43,8 +43,8 @@ static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx) if (pg == NULL) { return NULL; } - memcpy(buffer->data + offset, pg->data, (size_t)(pg->num * pMemBucket->bytes)); + memcpy(buffer->data + offset, pg->data, (size_t)(pg->num * pMemBucket->bytes)); offset += (int32_t)(pg->num * pMemBucket->bytes); } @@ -109,7 +109,7 @@ int32_t findOnlyResult(tMemBucket *pMemBucket, double *result) { int32_t *pageId = taosArrayGet(list, 0); SFilePage *pPage = getBufPage(pMemBucket->pBuffer, *pageId); if (pPage == NULL) { - return TSDB_CODE_NO_AVAIL_DISK; + return terrno; } ASSERT(pPage->num == 1); @@ -276,7 +276,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, return NULL; } - int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, "1", tsTempDir); + int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 1024, "1", tsTempDir); if (ret != 0) { tMemBucketDestroy(pBucket); return NULL; @@ -388,7 +388,7 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) { pSlot->info.data = getNewBufPage(pBucket->pBuffer, &pageId); if (pSlot->info.data == NULL) { - return TSDB_CODE_NO_AVAIL_DISK; + return terrno; } pSlot->info.pageId = pageId; taosArrayPush(pPageIdList, &pageId); @@ -482,8 +482,9 @@ int32_t getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction // data in buffer and file are merged together to be processed. SFilePage *buffer = loadDataFromFilePage(pMemBucket, i); if (buffer == NULL) { - return TSDB_CODE_NO_AVAIL_DISK; + return terrno; } + int32_t currentIdx = count - num; char *thisVal = buffer->data + pMemBucket->bytes * currentIdx; @@ -520,7 +521,7 @@ int32_t getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction int32_t *pageId = taosArrayGet(list, f); SFilePage *pg = getBufPage(pMemBucket->pBuffer, *pageId); if (pg == NULL) { - return TSDB_CODE_NO_AVAIL_DISK; + return terrno; } int32_t code = tMemBucketPut(pMemBucket, pg->data, (int32_t)pg->num); diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index f696a02e6e..99f555e20d 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -5,7 +5,7 @@ #include "thash.h" #include "tlog.h" -#define GET_DATA_PAYLOAD(_p) ((char*)(_p)->pData + POINTER_BYTES) +#define GET_PAYLOAD_DATA(_p) ((char*)(_p)->pData + POINTER_BYTES) #define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages) typedef struct SPageDiskInfo { @@ -89,7 +89,7 @@ static char* doDecompressData(void* data, int32_t srcSize, int32_t* dst, SDiskba return data; } -static uint64_t allocatePositionInFile(SDiskbasedBuf* pBuf, size_t size) { +static uint64_t allocateNewPositionInFile(SDiskbasedBuf* pBuf, size_t size) { if (pBuf->pFree == NULL) { return pBuf->nextPos; } else { @@ -114,8 +114,6 @@ static uint64_t allocatePositionInFile(SDiskbasedBuf* pBuf, size_t size) { static void setPageNotInBuf(SPageInfo* pPageInfo) { pPageInfo->pData = NULL; } -static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize + POINTER_BYTES + sizeof(SFilePage); } - /** * +--------------------------+-------------------+--------------+ * | PTR to SPageInfo (8bytes)| Payload (PageSize)| 2 Extra Bytes| @@ -124,13 +122,16 @@ static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize * @param pg * @return */ -static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { + +static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize + POINTER_BYTES + sizeof(SFilePage); } + +static char* doFlushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) { ASSERT(!pg->used && pg->pData != NULL); int32_t size = pBuf->pageSize; char* t = NULL; if (pg->offset == -1 || pg->dirty) { - void* payload = GET_DATA_PAYLOAD(pg); + void* payload = GET_PAYLOAD_DATA(pg); t = doCompressData(payload, pBuf->pageSize, &size, pBuf); ASSERTS(size >= 0, "size is negative"); } @@ -140,7 +141,7 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { if (pg->offset == -1) { ASSERTS(pg->dirty == true, "pg->dirty is false"); - pg->offset = allocatePositionInFile(pBuf, size); + pg->offset = allocateNewPositionInFile(pBuf, size); pBuf->nextPos += size; int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET); @@ -169,7 +170,7 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { taosArrayPush(pBuf->pFree, &dinfo); // 2. allocate new position, and update the info - pg->offset = allocatePositionInFile(pBuf, size); + pg->offset = allocateNewPositionInFile(pBuf, size); pBuf->nextPos += size; } @@ -208,9 +209,8 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { return pDataBuf; } -static char* flushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { +static char* flushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) { int32_t ret = TSDB_CODE_SUCCESS; - ASSERT(((int64_t)pBuf->numOfPages * pBuf->pageSize) == pBuf->totalBufSize && pBuf->numOfPages >= pBuf->inMemPages); if (pBuf->pFile == NULL) { if ((ret = createDiskFile(pBuf)) != TSDB_CODE_SUCCESS) { @@ -219,7 +219,7 @@ static char* flushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { } } - char* p = doFlushPageToDisk(pBuf, pg); + char* p = doFlushBufPage(pBuf, pg); setPageNotInBuf(pg); pg->dirty = false; return p; @@ -233,7 +233,7 @@ static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { return ret; } - void* pPage = (void*)GET_DATA_PAYLOAD(pg); + void* pPage = (void*)GET_PAYLOAD_DATA(pg); ret = (int32_t)taosReadFile(pBuf->pFile, pPage, pg->length); if (ret != pg->length) { ret = TAOS_SYSTEM_ERROR(errno); @@ -252,6 +252,10 @@ static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t pageId) { pBuf->numOfPages += 1; SPageInfo* ppi = taosMemoryMalloc(sizeof(SPageInfo)); + if (ppi == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } ppi->pageId = pageId; ppi->pData = NULL; @@ -274,17 +278,14 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) { ASSERT(pageInfo->pageId >= 0 && pageInfo->pn == pn); if (!pageInfo->used) { - // printf("%d is chosen\n", pageInfo->pageId); break; - } else { - // printf("page %d is used, dirty:%d\n", pageInfo->pageId, pageInfo->dirty); } } return pn; } -static char* evacOneDataPage(SDiskbasedBuf* pBuf) { +static char* evictBufPage(SDiskbasedBuf* pBuf) { SListNode* pn = getEldestUnrefedPage(pBuf); if (pn == NULL) { // no available buffer pages now, return. return NULL; @@ -299,7 +300,7 @@ static char* evacOneDataPage(SDiskbasedBuf* pBuf) { d->pn = NULL; taosMemoryFreeClear(pn); - return flushPageToDisk(pBuf, d); + return flushBufPage(pBuf, d); } static void lruListPushFront(SList* pList, SPageInfo* pi) { @@ -374,8 +375,9 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem static char* doExtractPage(SDiskbasedBuf* pBuf) { char* availablePage = NULL; if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) { - availablePage = evacOneDataPage(pBuf); + availablePage = evictBufPage(pBuf); if (availablePage == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; uWarn("no available buf pages, current:%d, max:%d", listNEles(pBuf->lruList), pBuf->inMemPages) } } else { @@ -409,6 +411,9 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) { // register page id info pi = registerPage(pBuf, *pageId); + if (pi == NULL) { + return NULL; + } // add to hash map taosHashPut(pBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES); @@ -423,11 +428,15 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) { #ifdef BUF_PAGE_DEBUG uDebug("page_getNewBufPage , pi->pData:%p, pageId:%d, offset:%" PRId64, pi->pData, pi->pageId, pi->offset); #endif - return (void*)(GET_DATA_PAYLOAD(pi)); + + return (void*)(GET_PAYLOAD_DATA(pi)); } void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { - ASSERT(pBuf != NULL && id >= 0); + if (id < 0) { + return NULL; + } + pBuf->statis.getPages += 1; SPageInfo** pi = taosHashGet(pBuf->all, &id, sizeof(int32_t)); @@ -437,7 +446,7 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { // no need to update the LRU list if only one page exists if (pBuf->numOfPages == 1) { (*pi)->used = true; - return (void*)(GET_DATA_PAYLOAD(*pi)); + return (void*)(GET_PAYLOAD_DATA(*pi)); } SPageInfo** pInfo = (SPageInfo**)((*pi)->pn->data); @@ -445,10 +454,11 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { lruListMoveToFront(pBuf->lruList, (*pi)); (*pi)->used = true; + #ifdef BUF_PAGE_DEBUG uDebug("page_getBufPage1 pageId:%d, offset:%" PRId64, (*pi)->pageId, (*pi)->offset); #endif - return (void*)(GET_DATA_PAYLOAD(*pi)); + return (void*)(GET_PAYLOAD_DATA(*pi)); } else { // not in memory ASSERT((*pi)->pData == NULL && (*pi)->pn == NULL && (((*pi)->length >= 0 && (*pi)->offset >= 0) || ((*pi)->length == -1 && (*pi)->offset == -1))); @@ -475,14 +485,15 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { #ifdef BUF_PAGE_DEBUG uDebug("page_getBufPage2 pageId:%d, offset:%" PRId64, (*pi)->pageId, (*pi)->offset); #endif - return (void*)(GET_DATA_PAYLOAD(*pi)); + return (void*)(GET_PAYLOAD_DATA(*pi)); } } void releaseBufPage(SDiskbasedBuf* pBuf, void* page) { - if (ASSERTS(pBuf != NULL && page != NULL, "pBuf or page is NULL")) { + if (page == NULL) { return; } + SPageInfo* ppi = getPageInfoFromPayload(page); releaseBufPageInfo(pBuf, ppi); } @@ -491,6 +502,7 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) { #ifdef BUF_PAGE_DEBUG uDebug("page_releaseBufPageInfo pageId:%d, used:%d, offset:%" PRId64, pi->pageId, pi->used, pi->offset); #endif + if (ASSERTS(pi->pData != NULL, "pi->pData is NULL")) { return; }