refactor:do some internal refactor.

This commit is contained in:
Haojun Liao 2023-01-24 23:56:29 +08:00
parent 670eec4db5
commit fc80c3d373
1 changed files with 41 additions and 20 deletions

View File

@ -6,6 +6,9 @@
#include "tlog.h" #include "tlog.h"
#define GET_PAYLOAD_DATA(_p) ((char*)(_p)->pData + POINTER_BYTES) #define GET_PAYLOAD_DATA(_p) ((char*)(_p)->pData + POINTER_BYTES)
#define BUF_PAGE_IN_MEM(_p) ((_p)->pData != NULL)
#define CLEAR_BUF_PAGE_IN_MEM_FLAG(_p) ((_p)->pData = NULL)
#define HAS_DATA_IN_DISK(_p) ((_p)->offset >= 0)
#define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages) #define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages)
typedef struct SPageDiskInfo { typedef struct SPageDiskInfo {
@ -14,7 +17,7 @@ typedef struct SPageDiskInfo {
} SPageDiskInfo, SFreeListItem; } SPageDiskInfo, SFreeListItem;
struct SPageInfo { struct SPageInfo {
SListNode* pn; // point to list node struct SListNode* pn; // point to list node struct. it is NULL when the page is evicted from the in-memory buffer
void* pData; void* pData;
int64_t offset; int64_t offset;
int32_t pageId; int32_t pageId;
@ -112,8 +115,6 @@ static uint64_t allocateNewPositionInFile(SDiskbasedBuf* pBuf, size_t size) {
} }
} }
static void setPageNotInBuf(SPageInfo* pPageInfo) { pPageInfo->pData = NULL; }
/** /**
* +--------------------------+-------------------+--------------+ * +--------------------------+-------------------+--------------+
* | PTR to SPageInfo (8bytes)| Payload (PageSize)| 2 Extra Bytes| * | PTR to SPageInfo (8bytes)| Payload (PageSize)| 2 Extra Bytes|
@ -134,17 +135,18 @@ static char* doFlushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) {
int32_t size = pBuf->pageSize; int32_t size = pBuf->pageSize;
char* t = NULL; char* t = NULL;
if (pg->offset == -1 || pg->dirty) { if ((!HAS_DATA_IN_DISK(pg)) || pg->dirty) {
void* payload = GET_PAYLOAD_DATA(pg); void* payload = GET_PAYLOAD_DATA(pg);
t = doCompressData(payload, pBuf->pageSize, &size, pBuf); t = doCompressData(payload, pBuf->pageSize, &size, pBuf);
ASSERTS(size >= 0, "size is negative"); if (size < 0) {
uError("failed to compress data when flushing data to disk, %s", pBuf->id);
return NULL;
}
} }
// this page is flushed to disk for the first time // this page is flushed to disk for the first time
if (pg->dirty) { if (pg->dirty) {
if (pg->offset == -1) { if (!HAS_DATA_IN_DISK(pg)) {
ASSERTS(pg->dirty == true, "pg->dirty is false");
pg->offset = allocateNewPositionInFile(pBuf, size); pg->offset = allocateNewPositionInFile(pBuf, size);
pBuf->nextPos += size; pBuf->nextPos += size;
@ -160,6 +162,7 @@ static char* doFlushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) {
return NULL; return NULL;
} }
// extend the file size
if (pBuf->fileSize < pg->offset + size) { if (pBuf->fileSize < pg->offset + size) {
pBuf->fileSize = pg->offset + size; pBuf->fileSize = pg->offset + size;
} }
@ -202,13 +205,13 @@ static char* doFlushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) {
size = pg->length; size = pg->length;
} }
ASSERT(size > 0 || (pg->offset == -1 && pg->length == -1));
char* pDataBuf = pg->pData; char* pDataBuf = pg->pData;
memset(pDataBuf, 0, getAllocPageSize(pBuf->pageSize)); memset(pDataBuf, 0, getAllocPageSize(pBuf->pageSize));
#ifdef BUF_PAGE_DEBUG #ifdef BUF_PAGE_DEBUG
uDebug("page_flush %p, pageId:%d, offset:%d", pDataBuf, pg->pageId, pg->offset); uDebug("page_flush %p, pageId:%d, offset:%d", pDataBuf, pg->pageId, pg->offset);
#endif #endif
pg->length = size; // on disk size pg->length = size; // on disk size
return pDataBuf; return pDataBuf;
} }
@ -224,13 +227,19 @@ static char* flushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) {
} }
char* p = doFlushBufPage(pBuf, pg); char* p = doFlushBufPage(pBuf, pg);
setPageNotInBuf(pg); CLEAR_BUF_PAGE_IN_MEM_FLAG(pg);
pg->dirty = false; pg->dirty = false;
return p; return p;
} }
// load file block data in disk // load file block data in disk
static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
if (pg->offset < 0 || pg->length <= 0) {
uError("failed to load buf page from disk, offset:%"PRId64", length:%d, %s", pg->offset, pg->length, pBuf->id);
return TSDB_CODE_INVALID_PARA;
}
int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET); int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET);
if (ret == -1) { if (ret == -1) {
ret = TAOS_SYSTEM_ERROR(errno); ret = TAOS_SYSTEM_ERROR(errno);
@ -252,7 +261,7 @@ static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
return 0; return 0;
} }
static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t pageId) { static SPageInfo* registerNewPageInfo(SDiskbasedBuf* pBuf, int32_t pageId) {
pBuf->numOfPages += 1; pBuf->numOfPages += 1;
SPageInfo* ppi = taosMemoryMalloc(sizeof(SPageInfo)); SPageInfo* ppi = taosMemoryMalloc(sizeof(SPageInfo));
@ -279,7 +288,9 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) {
SListNode* pn = NULL; SListNode* pn = NULL;
while ((pn = tdListNext(&iter)) != NULL) { while ((pn = tdListNext(&iter)) != NULL) {
SPageInfo* pageInfo = *(SPageInfo**)pn->data; SPageInfo* pageInfo = *(SPageInfo**)pn->data;
ASSERT(pageInfo->pageId >= 0 && pageInfo->pn == pn);
SPageInfo* p = *(SPageInfo**)(pageInfo->pData);
ASSERT(pageInfo->pageId >= 0 && pageInfo->pn == pn && p == pageInfo);
if (!pageInfo->used) { if (!pageInfo->used) {
break; break;
@ -299,7 +310,6 @@ static char* evictBufPage(SDiskbasedBuf* pBuf) {
tdListPopNode(pBuf->lruList, pn); tdListPopNode(pBuf->lruList, pn);
SPageInfo* d = *(SPageInfo**)pn->data; SPageInfo* d = *(SPageInfo**)pn->data;
ASSERTS(d->pn == pn, "d->pn not equal pn");
d->pn = NULL; d->pn = NULL;
taosMemoryFreeClear(pn); taosMemoryFreeClear(pn);
@ -422,7 +432,7 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
*pageId = (++pBuf->allocateId); *pageId = (++pBuf->allocateId);
// register page id info // register page id info
pi = registerPage(pBuf, *pageId); pi = registerNewPageInfo(pBuf, *pageId);
if (pi == NULL) { if (pi == NULL) {
return NULL; return NULL;
} }
@ -446,15 +456,21 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
if (id < 0) { if (id < 0) {
terrno = TSDB_CODE_INVALID_PARA;
uError("invalid page id:%d, %s", id, pBuf->id);
return NULL; return NULL;
} }
pBuf->statis.getPages += 1; pBuf->statis.getPages += 1;
SPageInfo** pi = taosHashGet(pBuf->all, &id, sizeof(int32_t)); SPageInfo** pi = taosHashGet(pBuf->all, &id, sizeof(int32_t));
ASSERT(pi != NULL && *pi != NULL); if (pi == NULL || *pi == NULL) {
uError("failed to locate the buffer page:%d, %s", id, pBuf->id);
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
}
if ((*pi)->pData != NULL) { // it is in memory if (BUF_PAGE_IN_MEM(*pi)) { // it is in memory
// no need to update the LRU list if only one page exists // no need to update the LRU list if only one page exists
if (pBuf->numOfPages == 1) { if (pBuf->numOfPages == 1) {
(*pi)->used = true; (*pi)->used = true;
@ -462,7 +478,10 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
} }
SPageInfo** pInfo = (SPageInfo**)((*pi)->pn->data); SPageInfo** pInfo = (SPageInfo**)((*pi)->pn->data);
ASSERT(*pInfo == *pi); if (*pInfo != *pi) {
uError("inconsistently data in paged buffer, pInfo:%p, pi:%p, %s", *pInfo, *pi, pBuf->id);
return NULL;
}
lruListMoveToFront(pBuf->lruList, (*pi)); lruListMoveToFront(pBuf->lruList, (*pi));
(*pi)->used = true; (*pi)->used = true;
@ -472,10 +491,12 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
#endif #endif
return (void*)(GET_PAYLOAD_DATA(*pi)); return (void*)(GET_PAYLOAD_DATA(*pi));
} else { // not in memory } else { // not in memory
ASSERT((*pi)->pData == NULL && (*pi)->pn == NULL && ASSERT((!BUF_PAGE_IN_MEM(*pi)) && (*pi)->pn == NULL &&
(((*pi)->length >= 0 && (*pi)->offset >= 0) || ((*pi)->length == -1 && (*pi)->offset == -1))); (((*pi)->length >= 0 && (*pi)->offset >= 0) || ((*pi)->length == -1 && (*pi)->offset == -1)));
(*pi)->pData = doExtractPage(pBuf); (*pi)->pData = doExtractPage(pBuf);
// failed to evict buffer page, return with error code.
if ((*pi)->pData == NULL) { if ((*pi)->pData == NULL) {
return NULL; return NULL;
} }
@ -487,7 +508,7 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
(*pi)->used = true; (*pi)->used = true;
// some data has been flushed to disk, and needs to be loaded into buffer again. // some data has been flushed to disk, and needs to be loaded into buffer again.
if ((*pi)->length > 0 && (*pi)->offset >= 0) { if (HAS_DATA_IN_DISK(*pi)) {
int32_t code = loadPageFromDisk(pBuf, *pi); int32_t code = loadPageFromDisk(pBuf, *pi);
if (code != 0) { if (code != 0) {
terrno = code; terrno = code;