util simplehash/tpagedbuf/losertree add ret check

This commit is contained in:
wangjiaming0909 2024-07-25 14:49:47 +08:00
parent be04bfa1ce
commit 6d283470fc
8 changed files with 94 additions and 43 deletions

View File

@ -156,7 +156,7 @@ void setBufPageCompressOnDisk(SDiskbasedBuf* pBuf, bool comp);
* @param pBuf
* @param pageId
*/
void dBufSetBufPageRecycled(SDiskbasedBuf* pBuf, void* pPage);
int32_t dBufSetBufPageRecycled(SDiskbasedBuf* pBuf, void* pPage);
/**
* Print the statistics when closing this buffer

View File

@ -152,7 +152,8 @@ static void doTrimBucketPages(SLHashObj* pHashObj, SLHashBucket* pBucket) {
if (pLast->num <= sizeof(SFilePage)) {
// this is empty
dBufSetBufPageRecycled(pHashObj->pBuf, pLast);
// TODO check ret
(void)dBufSetBufPageRecycled(pHashObj->pBuf, pLast);
releaseBufPage(pHashObj->pBuf, pFirst);
taosArrayRemove(pBucket->pPageIdList, numOfPages - 1);
return;
@ -178,7 +179,8 @@ static void doTrimBucketPages(SLHashObj* pHashObj, SLHashBucket* pBucket) {
pStart += nodeSize;
if (pLast->num <= sizeof(SFilePage)) {
// this is empty
dBufSetBufPageRecycled(pHashObj->pBuf, pLast);
// TODO check ret
(void)dBufSetBufPageRecycled(pHashObj->pBuf, pLast);
releaseBufPage(pHashObj->pBuf, pFirst);
taosArrayRemove(pBucket->pPageIdList, numOfPages - 1);
break;

View File

@ -550,7 +550,7 @@ static int32_t fmCreateStateFunc(const SFunctionNode* pFunc, SFunctionNode** pSt
code = createFunction(funcMgtBuiltins[pFunc->funcId].pStateFunc, pParams, pStateFunc);
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyList(pParams);
return terrno;
return code;
}
(void)strcpy((*pStateFunc)->node.aliasName, pFunc->node.aliasName);
(void)strcpy((*pStateFunc)->node.userAlias, pFunc->node.userAlias);

View File

@ -1768,7 +1768,7 @@ int32_t nodesListAppend(SNodeList* pList, SNode* pNode) {
int32_t nodesListStrictAppend(SNodeList* pList, SNode* pNode) {
if (NULL == pNode) {
return terrno;
return TSDB_CODE_INVALID_PARA;
}
int32_t code = nodesListAppend(pList, pNode);
if (TSDB_CODE_SUCCESS != code) {

View File

@ -163,4 +163,4 @@ SListNode *tdListNext(SListIter *pIter) {
}
return node;
}
}

View File

@ -125,10 +125,10 @@ void tMergeTreeRebuild(SMultiwayMergeTreeInfo* pTree) {
* display whole loser tree on screen for debug purpose only.
*/
void tMergeTreePrint(const SMultiwayMergeTreeInfo* pTree) {
printf("the value of loser tree:\t");
(void)printf("the value of loser tree:\t");
for (int32_t i = 0; i < pTree->totalSources; ++i) {
printf("%d\t", pTree->pNode[i].index);
(void)printf("%d\t", pTree->pNode[i].index);
}
printf("\n");
(void)printf("\n");
}

View File

@ -82,17 +82,20 @@ static char* doCompressData(void* data, int32_t srcSize, int32_t* dst, SDiskbase
return data;
}
static char* doDecompressData(void* data, int32_t srcSize, int32_t* dst, SDiskbasedBuf* pBuf) { // do nothing
static int32_t doDecompressData(void* data, int32_t srcSize, int32_t* dst, SDiskbasedBuf* pBuf) { // do nothing
int32_t code = 0;
if (!pBuf->comp) {
*dst = srcSize;
return data;
return code;
}
*dst = tsDecompressString(data, srcSize, 1, pBuf->assistBuf, pBuf->pageSize, ONE_STAGE_COMP, NULL, 0);
if (*dst > 0) {
memcpy(data, pBuf->assistBuf, *dst);
} else if (*dst < 0) {
return terrno;
}
return data;
return code;;
}
static uint64_t allocateNewPositionInFile(SDiskbasedBuf* pBuf, size_t size) {
@ -189,7 +192,9 @@ static char* doFlushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) {
if (pg->length < size) {
// 1. add current space to free list
SPageDiskInfo dinfo = {.length = pg->length, .offset = offset};
taosArrayPush(pBuf->pFree, &dinfo);
if (NULL == taosArrayPush(pBuf->pFree, &dinfo)) {
return NULL;
}
// 2. allocate new position, and update the info
offset = allocateNewPositionInFile(pBuf, size);
@ -258,8 +263,7 @@ static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
pBuf->statis.loadPages += 1;
int32_t fullSize = 0;
doDecompressData(pPage, pg->length, &fullSize, pBuf);
return 0;
return doDecompressData(pPage, pg->length, &fullSize, pBuf);
}
static SPageInfo* registerNewPageInfo(SDiskbasedBuf* pBuf, int32_t pageId) {
@ -279,7 +283,12 @@ static SPageInfo* registerNewPageInfo(SDiskbasedBuf* pBuf, int32_t pageId) {
ppi->pn = NULL;
ppi->dirty = false;
return *(SPageInfo**)taosArrayPush(pBuf->pIdList, &ppi);
SPageInfo** pRet = taosArrayPush(pBuf->pIdList, &ppi);
if (NULL == pRet) {
taosMemoryFree(ppi);
return NULL;
}
return *pRet;
}
static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) {
@ -309,7 +318,7 @@ static char* evictBufPage(SDiskbasedBuf* pBuf) {
}
terrno = 0;
tdListPopNode(pBuf->lruList, pn);
(void)tdListPopNode(pBuf->lruList, pn);
SPageInfo* d = *(SPageInfo**)pn->data;
@ -319,14 +328,18 @@ static char* evictBufPage(SDiskbasedBuf* pBuf) {
return flushBufPage(pBuf, d);
}
static void lruListPushFront(SList* pList, SPageInfo* pi) {
tdListPrepend(pList, &pi);
static int32_t lruListPushFront(SList* pList, SPageInfo* pi) {
int32_t code = tdListPrepend(pList, &pi);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
SListNode* front = tdListGetHead(pList);
pi->pn = front;
return TSDB_CODE_SUCCESS;
}
static void lruListMoveToFront(SList* pList, SPageInfo* pi) {
tdListPopNode(pList, pi->pn);
(void)tdListPopNode(pList, pi->pn);
tdListPrependNode(pList, pi->pn);
}
@ -421,12 +434,22 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
}
SPageInfo* pi = NULL;
int32_t code = 0;
if (listNEles(pBuf->freePgList) != 0) {
SListNode* pItem = tdListPopHead(pBuf->freePgList);
pi = *(SPageInfo**)pItem->data;
pi->used = true;
*pageId = pi->pageId;
taosMemoryFreeClear(pItem);
code = lruListPushFront(pBuf->lruList, pi);
if (TSDB_CODE_SUCCESS != code) {
taosMemoryFree(pi);
if (newPage) {
taosMemoryFree(availablePage);
}
terrno = code;
return NULL;
}
} else { // create a new pageinfo
// register new id in this group
*pageId = (++pBuf->allocateId);
@ -441,12 +464,24 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
}
// add to hash map
tSimpleHashPut(pBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES);
pBuf->totalBufSize += pBuf->pageSize;
int32_t code = tSimpleHashPut(pBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES);
if (TSDB_CODE_SUCCESS == code) {
// add to LRU list
code = lruListPushFront(pBuf->lruList, pi);
}
if (TSDB_CODE_SUCCESS == code) {
pBuf->totalBufSize += pBuf->pageSize;
} else {
if (newPage) taosMemoryFree(availablePage);
(void)taosArrayPop(pBuf->pIdList);
(void)tSimpleHashRemove(pBuf->all, pageId, sizeof(int32_t));
taosMemoryFree(pi);
terrno = code;
return NULL;
}
}
// add to LRU list
lruListPushFront(pBuf->lruList, pi);
pi->pData = availablePage;
((void**)pi->pData)[0] = pi;
@ -509,7 +544,12 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
// set the ptr to the new SPageInfo
((void**)((*pi)->pData))[0] = (*pi);
lruListPushFront(pBuf->lruList, *pi);
int32_t code = lruListPushFront(pBuf->lruList, *pi);
if (TSDB_CODE_SUCCESS != code) {
if (newPage) taosMemoryFree((*pi)->pData);
terrno = code;
return NULL;
}
(*pi)->used = true;
// some data has been flushed to disk, and needs to be loaded into buffer again.
@ -578,7 +618,10 @@ void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) {
pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0,
listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->id);
taosCloseFile(&pBuf->pFile);
int32_t code = taosCloseFile(&pBuf->pFile);
if (TSDB_CODE_SUCCESS != code) {
uDebug("WARNING tPage failed to close file when destroy disk basebuf: %s", pBuf->path);
}
} else {
uDebug("Paged buffer closed, total:%.2f Kb, no file created, %s", pBuf->totalBufSize / 1024.0, pBuf->id);
}
@ -615,8 +658,8 @@ void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) {
taosArrayDestroy(pBuf->pIdList);
tdListFree(pBuf->lruList);
tdListFree(pBuf->freePgList);
(void)tdListFree(pBuf->lruList);
(void)tdListFree(pBuf->freePgList);
taosArrayDestroy(pBuf->emptyDummyIdList);
taosArrayDestroy(pBuf->pFree);
@ -654,9 +697,14 @@ void setBufPageCompressOnDisk(SDiskbasedBuf* pBuf, bool comp) {
}
}
void dBufSetBufPageRecycled(SDiskbasedBuf* pBuf, void* pPage) {
int32_t dBufSetBufPageRecycled(SDiskbasedBuf* pBuf, void* pPage) {
SPageInfo* ppi = getPageInfoFromPayload(pPage);
int32_t code = tdListAppend(pBuf->freePgList, &ppi);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
ppi->used = false;
ppi->dirty = false;
@ -665,8 +713,7 @@ void dBufSetBufPageRecycled(SDiskbasedBuf* pBuf, void* pPage) {
taosMemoryFreeClear(ppi->pData);
taosMemoryFreeClear(pNode);
ppi->pn = NULL;
tdListAppend(pBuf->freePgList, &ppi);
return TSDB_CODE_SUCCESS;
}
void dBufSetPrintInfo(SDiskbasedBuf* pBuf) { pBuf->printStatis = true; }
@ -689,7 +736,7 @@ void dBufPrintStatis(const SDiskbasedBuf* pBuf) {
#endif
if (ps->loadPages > 0) {
printf(
(void)printf(
"Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f "
"Kb\n",
ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f,

View File

@ -66,7 +66,6 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn) {
SSHashObj *pHashObj = (SSHashObj *)taosMemoryMalloc(sizeof(SSHashObj));
if (!pHashObj) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
@ -82,7 +81,6 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn) {
pHashObj->hashList = (SHNode **)taosMemoryCalloc(pHashObj->capacity, sizeof(void *));
if (!pHashObj->hashList) {
taosMemoryFree(pHashObj);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
@ -131,7 +129,6 @@ static SHNode *doCreateHashNode(SSHashObj *pHashObj, const void *key, size_t key
uint32_t hashVal) {
SHNode *pNewNode = doInternalAlloc(pHashObj, sizeof(SHNode) + keyLen + dataLen);
if (!pNewNode) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
@ -148,23 +145,24 @@ static SHNode *doCreateHashNode(SSHashObj *pHashObj, const void *key, size_t key
return pNewNode;
}
static void tSimpleHashTableResize(SSHashObj *pHashObj) {
static int32_t tSimpleHashTableResize(SSHashObj *pHashObj) {
int32_t code = 0;
if (!SHASH_NEED_RESIZE(pHashObj)) {
return;
return code;
}
int32_t newCapacity = (int32_t)(pHashObj->capacity << 1u);
if (newCapacity > HASH_MAX_CAPACITY) {
uDebug("current capacity:%" PRIzu ", maximum capacity:%" PRId32 ", no resize applied due to limitation is reached",
pHashObj->capacity, (int32_t)HASH_MAX_CAPACITY);
return;
return code;
}
// int64_t st = taosGetTimestampUs();
void *pNewEntryList = taosMemoryRealloc(pHashObj->hashList, POINTER_BYTES * newCapacity);
if (!pNewEntryList) {
uWarn("hash resize failed due to out of memory, capacity remain:%zu", pHashObj->capacity);
return;
return terrno;
}
size_t inc = newCapacity - pHashObj->capacity;
@ -206,18 +204,22 @@ static void tSimpleHashTableResize(SSHashObj *pHashObj) {
// uDebug("hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms",
// (int32_t)pHashObj->capacity,
// ((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0);
return code;
}
int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, size_t keyLen, const void *data, size_t dataLen) {
if (!pHashObj || !key) {
return -1;
return TSDB_CODE_FAILED;
}
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
// need the resize process, write lock applied
if (SHASH_NEED_RESIZE(pHashObj)) {
tSimpleHashTableResize(pHashObj);
int32_t code = tSimpleHashTableResize(pHashObj);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
}
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
@ -226,7 +228,7 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, size_t keyLen, cons
if (!pNode) {
SHNode *pNewNode = doCreateHashNode(pHashObj, key, keyLen, data, dataLen, hashVal);
if (!pNewNode) {
return -1;
return terrno;
}
pHashObj->hashList[slot] = pNewNode;
@ -244,7 +246,7 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, size_t keyLen, cons
if (!pNode) {
SHNode *pNewNode = doCreateHashNode(pHashObj, key, keyLen, data, dataLen, hashVal);
if (!pNewNode) {
return -1;
return terrno;
}
pNewNode->next = pHashObj->hashList[slot];
pHashObj->hashList[slot] = pNewNode;