Merge pull request #19836 from taosdata/fix/nodisk

fix(query):add check for no disk error.
This commit is contained in:
Haojun Liao 2023-02-07 13:36:30 +08:00 committed by GitHub
commit 25f7ac7991
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 50 additions and 46 deletions

View File

@ -190,8 +190,9 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
qError("Add to buf failed since %s", terrstr(terrno)); qError("Add to buf failed since %s", terrstr(terrno));
return terrno; return terrno;
} }
int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize,
"doAddToBuf", tsTempDir); "sortExternalBuf", tsTempDir);
dBufSetPrintInfo(pHandle->pBuf); dBufSetPrintInfo(pHandle->pBuf);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
@ -635,6 +636,7 @@ int32_t getProperSortPageSize(size_t rowSize, uint32_t numOfCols) {
static int32_t createInitialSources(SSortHandle* pHandle) { static int32_t createInitialSources(SSortHandle* pHandle) {
size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize; size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
int32_t code = 0;
if (pHandle->type == SORT_SINGLESOURCE_SORT) { if (pHandle->type == SORT_SINGLESOURCE_SORT) {
SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0); SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0);
@ -663,8 +665,8 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
pHandle->beforeFp(pBlock, pHandle->param); pHandle->beforeFp(pBlock, pHandle->param);
} }
int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock); code = blockDataMerge(pHandle->pDataBlock, pBlock);
if (code != 0) { if (code != TSDB_CODE_SUCCESS) {
if (source->param && !source->onlyRef) { if (source->param && !source->onlyRef) {
taosMemoryFree(source->param); taosMemoryFree(source->param);
} }
@ -689,6 +691,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
blockDataDestroy(source->src.pBlock); blockDataDestroy(source->src.pBlock);
source->src.pBlock = NULL; source->src.pBlock = NULL;
} }
taosMemoryFree(source); taosMemoryFree(source);
return code; return code;
} }
@ -696,13 +699,17 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
int64_t el = taosGetTimestampUs() - p; int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el; pHandle->sortElapsed += el;
doAddToBuf(pHandle->pDataBlock, pHandle); code = doAddToBuf(pHandle->pDataBlock, pHandle);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} }
} }
if (source->param && !source->onlyRef) { if (source->param && !source->onlyRef) {
taosMemoryFree(source->param); taosMemoryFree(source->param);
} }
taosMemoryFree(source); taosMemoryFree(source);
if (pHandle->pDataBlock != NULL && pHandle->pDataBlock->info.rows > 0) { if (pHandle->pDataBlock != NULL && pHandle->pDataBlock->info.rows > 0) {
@ -711,7 +718,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
// Perform the in-memory sort and then flush data in the buffer into disk. // Perform the in-memory sort and then flush data in the buffer into disk.
int64_t p = taosGetTimestampUs(); int64_t p = taosGetTimestampUs();
int32_t code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo); code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
if (code != 0) { if (code != 0) {
return code; return code;
} }
@ -729,12 +736,12 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
pHandle->tupleHandle.pBlock = pHandle->pDataBlock; pHandle->tupleHandle.pBlock = pHandle->pDataBlock;
return 0; return 0;
} else { } else {
doAddToBuf(pHandle->pDataBlock, pHandle); code = doAddToBuf(pHandle->pDataBlock, pHandle);
} }
} }
} }
return TSDB_CODE_SUCCESS; return code;
} }
int32_t tsortOpen(SSortHandle* pHandle) { int32_t tsortOpen(SSortHandle* pHandle) {

View File

@ -126,6 +126,30 @@ static uint64_t allocateNewPositionInFile(SDiskbasedBuf* pBuf, size_t size) {
static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize + POINTER_BYTES + sizeof(SFilePage); } static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize + POINTER_BYTES + sizeof(SFilePage); }
static int32_t doFlushBufPageImpl(SDiskbasedBuf* pBuf, int64_t offset, const char* pData, int32_t size) {
int32_t ret = taosLSeekFile(pBuf->pFile, offset, SEEK_SET);
if (ret == -1) {
terrno = TAOS_SYSTEM_ERROR(errno);
return terrno;
}
ret = (int32_t)taosWriteFile(pBuf->pFile, pData, size);
if (ret != size) {
terrno = TAOS_SYSTEM_ERROR(errno);
return terrno;
}
// extend the file
if (pBuf->fileSize < offset + size) {
pBuf->fileSize = offset + size;
}
pBuf->statis.flushBytes += size;
pBuf->statis.flushPages += 1;
return TSDB_CODE_SUCCESS;
}
static char* doFlushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) { static char* doFlushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) {
if (pg->pData == NULL || pg->used) { if (pg->pData == NULL || pg->used) {
uError("invalid params in paged buffer process when flushing buf to disk, %s", pBuf->id); uError("invalid params in paged buffer process when flushing buf to disk, %s", pBuf->id);
@ -134,7 +158,9 @@ static char* doFlushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) {
} }
int32_t size = pBuf->pageSize; int32_t size = pBuf->pageSize;
char* t = NULL; int64_t offset = pg->offset;
char* t = NULL;
if ((!HAS_DATA_IN_DISK(pg)) || pg->dirty) { if ((!HAS_DATA_IN_DISK(pg)) || pg->dirty) {
void* payload = GET_PAYLOAD_DATA(pg); void* payload = GET_PAYLOAD_DATA(pg);
t = doCompressData(payload, pBuf->pageSize, &size, pBuf); t = doCompressData(payload, pBuf->pageSize, &size, pBuf);
@ -147,59 +173,29 @@ static char* doFlushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) {
// this page is flushed to disk for the first time // this page is flushed to disk for the first time
if (pg->dirty) { if (pg->dirty) {
if (!HAS_DATA_IN_DISK(pg)) { if (!HAS_DATA_IN_DISK(pg)) {
pg->offset = allocateNewPositionInFile(pBuf, size); offset = allocateNewPositionInFile(pBuf, size);
pBuf->nextPos += size; pBuf->nextPos += size;
int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET); int32_t code = doFlushBufPageImpl(pBuf, offset, t, size);
if (ret == -1) { if (code != TSDB_CODE_SUCCESS) {
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL; return NULL;
} }
ret = (int32_t)taosWriteFile(pBuf->pFile, t, size);
if (ret != size) {
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
}
// extend the file size
if (pBuf->fileSize < pg->offset + size) {
pBuf->fileSize = pg->offset + size;
}
pBuf->statis.flushBytes += size;
pBuf->statis.flushPages += 1;
} else { } else {
// length becomes greater, current space is not enough, allocate new place, otherwise, do nothing // length becomes greater, current space is not enough, allocate new place, otherwise, do nothing
if (pg->length < size) { if (pg->length < size) {
// 1. add current space to free list // 1. add current space to free list
SPageDiskInfo dinfo = {.length = pg->length, .offset = pg->offset}; SPageDiskInfo dinfo = {.length = pg->length, .offset = offset};
taosArrayPush(pBuf->pFree, &dinfo); taosArrayPush(pBuf->pFree, &dinfo);
// 2. allocate new position, and update the info // 2. allocate new position, and update the info
pg->offset = allocateNewPositionInFile(pBuf, size); offset = allocateNewPositionInFile(pBuf, size);
pBuf->nextPos += size; pBuf->nextPos += size;
} }
// 3. write to disk. int32_t code = doFlushBufPageImpl(pBuf, offset, t, size);
int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET); if (code != TSDB_CODE_SUCCESS) {
if (ret == -1) {
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL; return NULL;
} }
ret = (int32_t)taosWriteFile(pBuf->pFile, t, size);
if (ret != size) {
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
}
if (pBuf->fileSize < pg->offset + size) {
pBuf->fileSize = pg->offset + size;
}
pBuf->statis.flushBytes += size;
pBuf->statis.flushPages += 1;
} }
} else { // NOTE: the size may be -1, the this recycle page has not been flushed to disk yet. } else { // NOTE: the size may be -1, the this recycle page has not been flushed to disk yet.
size = pg->length; size = pg->length;
@ -209,9 +205,10 @@ static char* doFlushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) {
memset(pDataBuf, 0, getAllocPageSize(pBuf->pageSize)); memset(pDataBuf, 0, getAllocPageSize(pBuf->pageSize));
#ifdef BUF_PAGE_DEBUG #ifdef BUF_PAGE_DEBUG
uDebug("page_flush %p, pageId:%d, offset:%d", pDataBuf, pg->pageId, pg->offset); uDebug("page_flush %p, pageId:%d, offset:%d", pDataBuf, pg->pageId, offset);
#endif #endif
pg->offset = offset;
pg->length = size; // on disk size pg->length = size; // on disk size
return pDataBuf; return pDataBuf;
} }