refactor:do some internal refactor.

This commit is contained in:
Haojun Liao 2023-01-23 01:11:04 +08:00
parent 21d57a3624
commit c1ffbb5c1c
3 changed files with 43 additions and 34 deletions

View File

@ -3061,14 +3061,12 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf,
if (pHandle->currentPage == -1) { if (pHandle->currentPage == -1) {
pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage); pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage);
if (pPage == NULL) { if (pPage == NULL) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
return terrno; return terrno;
} }
pPage->num = sizeof(SFilePage); pPage->num = sizeof(SFilePage);
} else { } else {
pPage = getBufPage(pHandle->pBuf, pHandle->currentPage); pPage = getBufPage(pHandle->pBuf, pHandle->currentPage);
if (pPage == NULL) { if (pPage == NULL) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
return terrno; return terrno;
} }
if (pPage->num + length > getBufPageSize(pHandle->pBuf)) { if (pPage->num + length > getBufPageSize(pHandle->pBuf)) {
@ -3076,7 +3074,6 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf,
releaseBufPage(pHandle->pBuf, pPage); releaseBufPage(pHandle->pBuf, pPage);
pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage); pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage);
if (pPage == NULL) { if (pPage == NULL) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
return terrno; return terrno;
} }
pPage->num = sizeof(SFilePage); pPage->num = sizeof(SFilePage);
@ -3123,7 +3120,6 @@ static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf
if (pHandle->pBuf != NULL) { if (pHandle->pBuf != NULL) {
SFilePage* pPage = getBufPage(pHandle->pBuf, pPos->pageId); SFilePage* pPage = getBufPage(pHandle->pBuf, pPos->pageId);
if (pPage == NULL) { if (pPage == NULL) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
return terrno; return terrno;
} }
memcpy(pPage->data + pPos->offset, pBuf, length); memcpy(pPage->data + pPos->offset, pBuf, length);

View File

@ -43,8 +43,8 @@ static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx)
if (pg == NULL) { if (pg == NULL) {
return NULL; return NULL;
} }
memcpy(buffer->data + offset, pg->data, (size_t)(pg->num * pMemBucket->bytes));
memcpy(buffer->data + offset, pg->data, (size_t)(pg->num * pMemBucket->bytes));
offset += (int32_t)(pg->num * pMemBucket->bytes); offset += (int32_t)(pg->num * pMemBucket->bytes);
} }
@ -109,7 +109,7 @@ int32_t findOnlyResult(tMemBucket *pMemBucket, double *result) {
int32_t *pageId = taosArrayGet(list, 0); int32_t *pageId = taosArrayGet(list, 0);
SFilePage *pPage = getBufPage(pMemBucket->pBuffer, *pageId); SFilePage *pPage = getBufPage(pMemBucket->pBuffer, *pageId);
if (pPage == NULL) { if (pPage == NULL) {
return TSDB_CODE_NO_AVAIL_DISK; return terrno;
} }
ASSERT(pPage->num == 1); ASSERT(pPage->num == 1);
@ -276,7 +276,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval,
return NULL; return NULL;
} }
int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, "1", tsTempDir); int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 1024, "1", tsTempDir);
if (ret != 0) { if (ret != 0) {
tMemBucketDestroy(pBucket); tMemBucketDestroy(pBucket);
return NULL; return NULL;
@ -388,7 +388,7 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
pSlot->info.data = getNewBufPage(pBucket->pBuffer, &pageId); pSlot->info.data = getNewBufPage(pBucket->pBuffer, &pageId);
if (pSlot->info.data == NULL) { if (pSlot->info.data == NULL) {
return TSDB_CODE_NO_AVAIL_DISK; return terrno;
} }
pSlot->info.pageId = pageId; pSlot->info.pageId = pageId;
taosArrayPush(pPageIdList, &pageId); taosArrayPush(pPageIdList, &pageId);
@ -482,8 +482,9 @@ int32_t getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction
// data in buffer and file are merged together to be processed. // data in buffer and file are merged together to be processed.
SFilePage *buffer = loadDataFromFilePage(pMemBucket, i); SFilePage *buffer = loadDataFromFilePage(pMemBucket, i);
if (buffer == NULL) { if (buffer == NULL) {
return TSDB_CODE_NO_AVAIL_DISK; return terrno;
} }
int32_t currentIdx = count - num; int32_t currentIdx = count - num;
char *thisVal = buffer->data + pMemBucket->bytes * currentIdx; char *thisVal = buffer->data + pMemBucket->bytes * currentIdx;
@ -520,7 +521,7 @@ int32_t getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction
int32_t *pageId = taosArrayGet(list, f); int32_t *pageId = taosArrayGet(list, f);
SFilePage *pg = getBufPage(pMemBucket->pBuffer, *pageId); SFilePage *pg = getBufPage(pMemBucket->pBuffer, *pageId);
if (pg == NULL) { if (pg == NULL) {
return TSDB_CODE_NO_AVAIL_DISK; return terrno;
} }
int32_t code = tMemBucketPut(pMemBucket, pg->data, (int32_t)pg->num); int32_t code = tMemBucketPut(pMemBucket, pg->data, (int32_t)pg->num);

View File

@ -5,7 +5,7 @@
#include "thash.h" #include "thash.h"
#include "tlog.h" #include "tlog.h"
#define GET_DATA_PAYLOAD(_p) ((char*)(_p)->pData + POINTER_BYTES) #define GET_PAYLOAD_DATA(_p) ((char*)(_p)->pData + POINTER_BYTES)
#define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages) #define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages)
typedef struct SPageDiskInfo { typedef struct SPageDiskInfo {
@ -89,7 +89,7 @@ static char* doDecompressData(void* data, int32_t srcSize, int32_t* dst, SDiskba
return data; return data;
} }
static uint64_t allocatePositionInFile(SDiskbasedBuf* pBuf, size_t size) { static uint64_t allocateNewPositionInFile(SDiskbasedBuf* pBuf, size_t size) {
if (pBuf->pFree == NULL) { if (pBuf->pFree == NULL) {
return pBuf->nextPos; return pBuf->nextPos;
} else { } else {
@ -114,8 +114,6 @@ static uint64_t allocatePositionInFile(SDiskbasedBuf* pBuf, size_t size) {
static void setPageNotInBuf(SPageInfo* pPageInfo) { pPageInfo->pData = NULL; } static void setPageNotInBuf(SPageInfo* pPageInfo) { pPageInfo->pData = NULL; }
static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize + POINTER_BYTES + sizeof(SFilePage); }
/** /**
* +--------------------------+-------------------+--------------+ * +--------------------------+-------------------+--------------+
* | PTR to SPageInfo (8bytes)| Payload (PageSize)| 2 Extra Bytes| * | PTR to SPageInfo (8bytes)| Payload (PageSize)| 2 Extra Bytes|
@ -124,13 +122,16 @@ static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize
* @param pg * @param pg
* @return * @return
*/ */
static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize + POINTER_BYTES + sizeof(SFilePage); }
static char* doFlushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) {
ASSERT(!pg->used && pg->pData != NULL); ASSERT(!pg->used && pg->pData != NULL);
int32_t size = pBuf->pageSize; int32_t size = pBuf->pageSize;
char* t = NULL; char* t = NULL;
if (pg->offset == -1 || pg->dirty) { if (pg->offset == -1 || pg->dirty) {
void* payload = GET_DATA_PAYLOAD(pg); void* payload = GET_PAYLOAD_DATA(pg);
t = doCompressData(payload, pBuf->pageSize, &size, pBuf); t = doCompressData(payload, pBuf->pageSize, &size, pBuf);
ASSERTS(size >= 0, "size is negative"); ASSERTS(size >= 0, "size is negative");
} }
@ -140,7 +141,7 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
if (pg->offset == -1) { if (pg->offset == -1) {
ASSERTS(pg->dirty == true, "pg->dirty is false"); ASSERTS(pg->dirty == true, "pg->dirty is false");
pg->offset = allocatePositionInFile(pBuf, size); pg->offset = allocateNewPositionInFile(pBuf, size);
pBuf->nextPos += size; pBuf->nextPos += size;
int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET); int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET);
@ -169,7 +170,7 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
taosArrayPush(pBuf->pFree, &dinfo); taosArrayPush(pBuf->pFree, &dinfo);
// 2. allocate new position, and update the info // 2. allocate new position, and update the info
pg->offset = allocatePositionInFile(pBuf, size); pg->offset = allocateNewPositionInFile(pBuf, size);
pBuf->nextPos += size; pBuf->nextPos += size;
} }
@ -208,9 +209,8 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
return pDataBuf; return pDataBuf;
} }
static char* flushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { static char* flushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) {
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
ASSERT(((int64_t)pBuf->numOfPages * pBuf->pageSize) == pBuf->totalBufSize && pBuf->numOfPages >= pBuf->inMemPages);
if (pBuf->pFile == NULL) { if (pBuf->pFile == NULL) {
if ((ret = createDiskFile(pBuf)) != TSDB_CODE_SUCCESS) { if ((ret = createDiskFile(pBuf)) != TSDB_CODE_SUCCESS) {
@ -219,7 +219,7 @@ static char* flushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
} }
} }
char* p = doFlushPageToDisk(pBuf, pg); char* p = doFlushBufPage(pBuf, pg);
setPageNotInBuf(pg); setPageNotInBuf(pg);
pg->dirty = false; pg->dirty = false;
return p; return p;
@ -233,7 +233,7 @@ static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
return ret; return ret;
} }
void* pPage = (void*)GET_DATA_PAYLOAD(pg); void* pPage = (void*)GET_PAYLOAD_DATA(pg);
ret = (int32_t)taosReadFile(pBuf->pFile, pPage, pg->length); ret = (int32_t)taosReadFile(pBuf->pFile, pPage, pg->length);
if (ret != pg->length) { if (ret != pg->length) {
ret = TAOS_SYSTEM_ERROR(errno); ret = TAOS_SYSTEM_ERROR(errno);
@ -252,6 +252,10 @@ static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t pageId) {
pBuf->numOfPages += 1; pBuf->numOfPages += 1;
SPageInfo* ppi = taosMemoryMalloc(sizeof(SPageInfo)); SPageInfo* ppi = taosMemoryMalloc(sizeof(SPageInfo));
if (ppi == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
ppi->pageId = pageId; ppi->pageId = pageId;
ppi->pData = NULL; ppi->pData = NULL;
@ -274,17 +278,14 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) {
ASSERT(pageInfo->pageId >= 0 && pageInfo->pn == pn); ASSERT(pageInfo->pageId >= 0 && pageInfo->pn == pn);
if (!pageInfo->used) { if (!pageInfo->used) {
// printf("%d is chosen\n", pageInfo->pageId);
break; break;
} else {
// printf("page %d is used, dirty:%d\n", pageInfo->pageId, pageInfo->dirty);
} }
} }
return pn; return pn;
} }
static char* evacOneDataPage(SDiskbasedBuf* pBuf) { static char* evictBufPage(SDiskbasedBuf* pBuf) {
SListNode* pn = getEldestUnrefedPage(pBuf); SListNode* pn = getEldestUnrefedPage(pBuf);
if (pn == NULL) { // no available buffer pages now, return. if (pn == NULL) { // no available buffer pages now, return.
return NULL; return NULL;
@ -299,7 +300,7 @@ static char* evacOneDataPage(SDiskbasedBuf* pBuf) {
d->pn = NULL; d->pn = NULL;
taosMemoryFreeClear(pn); taosMemoryFreeClear(pn);
return flushPageToDisk(pBuf, d); return flushBufPage(pBuf, d);
} }
static void lruListPushFront(SList* pList, SPageInfo* pi) { static void lruListPushFront(SList* pList, SPageInfo* pi) {
@ -374,8 +375,9 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem
static char* doExtractPage(SDiskbasedBuf* pBuf) { static char* doExtractPage(SDiskbasedBuf* pBuf) {
char* availablePage = NULL; char* availablePage = NULL;
if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) { if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) {
availablePage = evacOneDataPage(pBuf); availablePage = evictBufPage(pBuf);
if (availablePage == NULL) { if (availablePage == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
uWarn("no available buf pages, current:%d, max:%d", listNEles(pBuf->lruList), pBuf->inMemPages) uWarn("no available buf pages, current:%d, max:%d", listNEles(pBuf->lruList), pBuf->inMemPages)
} }
} else { } else {
@ -409,6 +411,9 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
// register page id info // register page id info
pi = registerPage(pBuf, *pageId); pi = registerPage(pBuf, *pageId);
if (pi == NULL) {
return NULL;
}
// add to hash map // add to hash map
taosHashPut(pBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES); taosHashPut(pBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES);
@ -423,11 +428,15 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
#ifdef BUF_PAGE_DEBUG #ifdef BUF_PAGE_DEBUG
uDebug("page_getNewBufPage , pi->pData:%p, pageId:%d, offset:%" PRId64, pi->pData, pi->pageId, pi->offset); uDebug("page_getNewBufPage , pi->pData:%p, pageId:%d, offset:%" PRId64, pi->pData, pi->pageId, pi->offset);
#endif #endif
return (void*)(GET_DATA_PAYLOAD(pi));
return (void*)(GET_PAYLOAD_DATA(pi));
} }
void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
ASSERT(pBuf != NULL && id >= 0); if (id < 0) {
return NULL;
}
pBuf->statis.getPages += 1; pBuf->statis.getPages += 1;
SPageInfo** pi = taosHashGet(pBuf->all, &id, sizeof(int32_t)); SPageInfo** pi = taosHashGet(pBuf->all, &id, sizeof(int32_t));
@ -437,7 +446,7 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
// no need to update the LRU list if only one page exists // no need to update the LRU list if only one page exists
if (pBuf->numOfPages == 1) { if (pBuf->numOfPages == 1) {
(*pi)->used = true; (*pi)->used = true;
return (void*)(GET_DATA_PAYLOAD(*pi)); return (void*)(GET_PAYLOAD_DATA(*pi));
} }
SPageInfo** pInfo = (SPageInfo**)((*pi)->pn->data); SPageInfo** pInfo = (SPageInfo**)((*pi)->pn->data);
@ -445,10 +454,11 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
lruListMoveToFront(pBuf->lruList, (*pi)); lruListMoveToFront(pBuf->lruList, (*pi));
(*pi)->used = true; (*pi)->used = true;
#ifdef BUF_PAGE_DEBUG #ifdef BUF_PAGE_DEBUG
uDebug("page_getBufPage1 pageId:%d, offset:%" PRId64, (*pi)->pageId, (*pi)->offset); uDebug("page_getBufPage1 pageId:%d, offset:%" PRId64, (*pi)->pageId, (*pi)->offset);
#endif #endif
return (void*)(GET_DATA_PAYLOAD(*pi)); return (void*)(GET_PAYLOAD_DATA(*pi));
} else { // not in memory } else { // not in memory
ASSERT((*pi)->pData == NULL && (*pi)->pn == NULL && ASSERT((*pi)->pData == NULL && (*pi)->pn == NULL &&
(((*pi)->length >= 0 && (*pi)->offset >= 0) || ((*pi)->length == -1 && (*pi)->offset == -1))); (((*pi)->length >= 0 && (*pi)->offset >= 0) || ((*pi)->length == -1 && (*pi)->offset == -1)));
@ -475,14 +485,15 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
#ifdef BUF_PAGE_DEBUG #ifdef BUF_PAGE_DEBUG
uDebug("page_getBufPage2 pageId:%d, offset:%" PRId64, (*pi)->pageId, (*pi)->offset); uDebug("page_getBufPage2 pageId:%d, offset:%" PRId64, (*pi)->pageId, (*pi)->offset);
#endif #endif
return (void*)(GET_DATA_PAYLOAD(*pi)); return (void*)(GET_PAYLOAD_DATA(*pi));
} }
} }
void releaseBufPage(SDiskbasedBuf* pBuf, void* page) { void releaseBufPage(SDiskbasedBuf* pBuf, void* page) {
if (ASSERTS(pBuf != NULL && page != NULL, "pBuf or page is NULL")) { if (page == NULL) {
return; return;
} }
SPageInfo* ppi = getPageInfoFromPayload(page); SPageInfo* ppi = getPageInfoFromPayload(page);
releaseBufPageInfo(pBuf, ppi); releaseBufPageInfo(pBuf, ppi);
} }
@ -491,6 +502,7 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) {
#ifdef BUF_PAGE_DEBUG #ifdef BUF_PAGE_DEBUG
uDebug("page_releaseBufPageInfo pageId:%d, used:%d, offset:%" PRId64, pi->pageId, pi->used, pi->offset); uDebug("page_releaseBufPageInfo pageId:%d, used:%d, offset:%" PRId64, pi->pageId, pi->used, pi->offset);
#endif #endif
if (ASSERTS(pi->pData != NULL, "pi->pData is NULL")) { if (ASSERTS(pi->pData != NULL, "pi->pData is NULL")) {
return; return;
} }