fix(query): check for failure during add new buf pages.
This commit is contained in:
parent
ba092cede8
commit
21d57a3624
|
@ -593,8 +593,11 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
|
||||||
|
|
||||||
int32_t pageId = 0;
|
int32_t pageId = 0;
|
||||||
pPage = getNewBufPage(pInfo->pBuf, &pageId);
|
pPage = getNewBufPage(pInfo->pBuf, &pageId);
|
||||||
taosArrayPush(p->pPageList, &pageId);
|
if (pPage == NULL) {
|
||||||
|
return pPage;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayPush(p->pPageList, &pageId);
|
||||||
*(int32_t*)pPage = 0;
|
*(int32_t*)pPage = 0;
|
||||||
} else {
|
} else {
|
||||||
int32_t* curId = taosArrayGetLast(p->pPageList);
|
int32_t* curId = taosArrayGetLast(p->pPageList);
|
||||||
|
@ -612,6 +615,11 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
|
||||||
// add a new page for current group
|
// add a new page for current group
|
||||||
int32_t pageId = 0;
|
int32_t pageId = 0;
|
||||||
pPage = getNewBufPage(pInfo->pBuf, &pageId);
|
pPage = getNewBufPage(pInfo->pBuf, &pageId);
|
||||||
|
if (pPage == NULL) {
|
||||||
|
qError("failed to get new buffer, code:%s", tstrerror(terrno));
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
taosArrayPush(p->pPageList, &pageId);
|
taosArrayPush(p->pPageList, &pageId);
|
||||||
memset(pPage, 0, getBufPageSize(pInfo->pBuf));
|
memset(pPage, 0, getBufPageSize(pInfo->pBuf));
|
||||||
}
|
}
|
||||||
|
|
|
@ -222,7 +222,6 @@ static char* flushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
|
||||||
char* p = doFlushPageToDisk(pBuf, pg);
|
char* p = doFlushPageToDisk(pBuf, pg);
|
||||||
setPageNotInBuf(pg);
|
setPageNotInBuf(pg);
|
||||||
pg->dirty = false;
|
pg->dirty = false;
|
||||||
|
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -286,32 +285,21 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static char* evacOneDataPage(SDiskbasedBuf* pBuf) {
|
static char* evacOneDataPage(SDiskbasedBuf* pBuf) {
|
||||||
char* bufPage = NULL;
|
|
||||||
SListNode* pn = getEldestUnrefedPage(pBuf);
|
SListNode* pn = getEldestUnrefedPage(pBuf);
|
||||||
terrno = 0;
|
if (pn == NULL) { // no available buffer pages now, return.
|
||||||
|
return NULL;
|
||||||
// all pages are referenced by user, try to allocate new space
|
|
||||||
if (pn == NULL) {
|
|
||||||
int32_t prev = pBuf->inMemPages;
|
|
||||||
|
|
||||||
// increase by 50% of previous mem pages
|
|
||||||
pBuf->inMemPages = (int32_t)(pBuf->inMemPages * 1.5f);
|
|
||||||
|
|
||||||
// qWarn("%p in memory buf page not sufficient, expand from %d to %d, page size:%d", pBuf, prev,
|
|
||||||
// pBuf->inMemPages, pBuf->pageSize);
|
|
||||||
} else {
|
|
||||||
tdListPopNode(pBuf->lruList, pn);
|
|
||||||
|
|
||||||
SPageInfo* d = *(SPageInfo**)pn->data;
|
|
||||||
ASSERTS(d->pn == pn, "d->pn not equal pn");
|
|
||||||
|
|
||||||
d->pn = NULL;
|
|
||||||
taosMemoryFreeClear(pn);
|
|
||||||
|
|
||||||
bufPage = flushPageToDisk(pBuf, d);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return bufPage;
|
terrno = 0;
|
||||||
|
tdListPopNode(pBuf->lruList, pn);
|
||||||
|
|
||||||
|
SPageInfo* d = *(SPageInfo**)pn->data;
|
||||||
|
ASSERTS(d->pn == pn, "d->pn not equal pn");
|
||||||
|
|
||||||
|
d->pn = NULL;
|
||||||
|
taosMemoryFreeClear(pn);
|
||||||
|
|
||||||
|
return flushPageToDisk(pBuf, d);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void lruListPushFront(SList* pList, SPageInfo* pi) {
|
static void lruListPushFront(SList* pList, SPageInfo* pi) {
|
||||||
|
@ -338,7 +326,7 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem
|
||||||
|
|
||||||
SDiskbasedBuf* pPBuf = *pBuf;
|
SDiskbasedBuf* pPBuf = *pBuf;
|
||||||
if (pPBuf == NULL) {
|
if (pPBuf == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pPBuf->pageSize = pagesize;
|
pPBuf->pageSize = pagesize;
|
||||||
|
@ -362,24 +350,50 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem
|
||||||
pPBuf->pIdList = taosArrayInit(4, POINTER_BYTES);
|
pPBuf->pIdList = taosArrayInit(4, POINTER_BYTES);
|
||||||
|
|
||||||
pPBuf->assistBuf = taosMemoryMalloc(pPBuf->pageSize + 2); // EXTRA BYTES
|
pPBuf->assistBuf = taosMemoryMalloc(pPBuf->pageSize + 2); // EXTRA BYTES
|
||||||
pPBuf->all = taosHashInit(10, fn, true, false);
|
if (pPBuf->assistBuf == NULL) {
|
||||||
pPBuf->prefix = (char*) dir;
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
|
pPBuf->all = taosHashInit(10, fn, true, false);
|
||||||
|
if (pPBuf->all == NULL) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
|
pPBuf->prefix = (char*) dir;
|
||||||
pPBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t));
|
pPBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t));
|
||||||
|
|
||||||
// qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId,
|
// qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId,
|
||||||
// pPBuf->pageSize,
|
// pPBuf->pageSize, pPBuf->inMemPages, pPBuf->path);
|
||||||
// pPBuf->inMemPages, pPBuf->path);
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
_error:
|
||||||
|
destroyDiskbasedBuf(pPBuf);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
static char* doExtractPage(SDiskbasedBuf* pBuf) {
|
||||||
|
char* availablePage = NULL;
|
||||||
|
if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) {
|
||||||
|
availablePage = evacOneDataPage(pBuf);
|
||||||
|
if (availablePage == NULL) {
|
||||||
|
uWarn("no available buf pages, current:%d, max:%d", listNEles(pBuf->lruList), pBuf->inMemPages)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
availablePage = taosMemoryCalloc(1, getAllocPageSize(pBuf->pageSize)); // add extract bytes in case of zipped buffer increased.
|
||||||
|
if (availablePage == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return availablePage;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
|
void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
|
||||||
pBuf->statis.getPages += 1;
|
pBuf->statis.getPages += 1;
|
||||||
|
|
||||||
char* availablePage = NULL;
|
char* availablePage = doExtractPage(pBuf);
|
||||||
if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) {
|
if (availablePage == NULL) {
|
||||||
availablePage = evacOneDataPage(pBuf);
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SPageInfo* pi = NULL;
|
SPageInfo* pi = NULL;
|
||||||
|
@ -402,16 +416,8 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// add to LRU list
|
// add to LRU list
|
||||||
ASSERT(listNEles(pBuf->lruList) < pBuf->inMemPages && pBuf->inMemPages > 0);
|
|
||||||
lruListPushFront(pBuf->lruList, pi);
|
lruListPushFront(pBuf->lruList, pi);
|
||||||
|
pi->pData = availablePage;
|
||||||
// allocate buf
|
|
||||||
if (availablePage == NULL) {
|
|
||||||
pi->pData =
|
|
||||||
taosMemoryCalloc(1, getAllocPageSize(pBuf->pageSize)); // add extract bytes in case of zipped buffer increased.
|
|
||||||
} else {
|
|
||||||
pi->pData = availablePage;
|
|
||||||
}
|
|
||||||
|
|
||||||
((void**)pi->pData)[0] = pi;
|
((void**)pi->pData)[0] = pi;
|
||||||
#ifdef BUF_PAGE_DEBUG
|
#ifdef BUF_PAGE_DEBUG
|
||||||
|
@ -447,18 +453,9 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
|
||||||
ASSERT((*pi)->pData == NULL && (*pi)->pn == NULL &&
|
ASSERT((*pi)->pData == NULL && (*pi)->pn == NULL &&
|
||||||
(((*pi)->length >= 0 && (*pi)->offset >= 0) || ((*pi)->length == -1 && (*pi)->offset == -1)));
|
(((*pi)->length >= 0 && (*pi)->offset >= 0) || ((*pi)->length == -1 && (*pi)->offset == -1)));
|
||||||
|
|
||||||
char* availablePage = NULL;
|
(*pi)->pData = doExtractPage(pBuf);
|
||||||
if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) {
|
if ((*pi)->pData == NULL) {
|
||||||
availablePage = evacOneDataPage(pBuf);
|
return NULL;
|
||||||
if (availablePage == NULL) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (availablePage == NULL) {
|
|
||||||
(*pi)->pData = taosMemoryCalloc(1, getAllocPageSize(pBuf->pageSize));
|
|
||||||
} else {
|
|
||||||
(*pi)->pData = availablePage;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// set the ptr to the new SPageInfo
|
// set the ptr to the new SPageInfo
|
||||||
|
@ -471,6 +468,7 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
|
||||||
if ((*pi)->length > 0 && (*pi)->offset >= 0) {
|
if ((*pi)->length > 0 && (*pi)->offset >= 0) {
|
||||||
int32_t code = loadPageFromDisk(pBuf, *pi);
|
int32_t code = loadPageFromDisk(pBuf, *pi);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
terrno = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue