fix(query):add check for no disk error.
This commit is contained in:
parent
2d951c1690
commit
5f89fe7652
|
@ -190,8 +190,9 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
|
|||
qError("Add to buf failed since %s", terrstr(terrno));
|
||||
return terrno;
|
||||
}
|
||||
|
||||
int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize,
|
||||
"doAddToBuf", tsTempDir);
|
||||
"sortExternalBuf", tsTempDir);
|
||||
dBufSetPrintInfo(pHandle->pBuf);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -635,6 +636,7 @@ int32_t getProperSortPageSize(size_t rowSize, uint32_t numOfCols) {
|
|||
|
||||
static int32_t createInitialSources(SSortHandle* pHandle) {
|
||||
size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
||||
int32_t code = 0;
|
||||
|
||||
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
|
||||
SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0);
|
||||
|
@ -663,8 +665,8 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
|
|||
pHandle->beforeFp(pBlock, pHandle->param);
|
||||
}
|
||||
|
||||
int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock);
|
||||
if (code != 0) {
|
||||
code = blockDataMerge(pHandle->pDataBlock, pBlock);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
if (source->param && !source->onlyRef) {
|
||||
taosMemoryFree(source->param);
|
||||
}
|
||||
|
@ -689,6 +691,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
|
|||
blockDataDestroy(source->src.pBlock);
|
||||
source->src.pBlock = NULL;
|
||||
}
|
||||
|
||||
taosMemoryFree(source);
|
||||
return code;
|
||||
}
|
||||
|
@ -696,13 +699,17 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
|
|||
int64_t el = taosGetTimestampUs() - p;
|
||||
pHandle->sortElapsed += el;
|
||||
|
||||
doAddToBuf(pHandle->pDataBlock, pHandle);
|
||||
code = doAddToBuf(pHandle->pDataBlock, pHandle);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (source->param && !source->onlyRef) {
|
||||
taosMemoryFree(source->param);
|
||||
}
|
||||
|
||||
taosMemoryFree(source);
|
||||
|
||||
if (pHandle->pDataBlock != NULL && pHandle->pDataBlock->info.rows > 0) {
|
||||
|
@ -711,7 +718,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
|
|||
// Perform the in-memory sort and then flush data in the buffer into disk.
|
||||
int64_t p = taosGetTimestampUs();
|
||||
|
||||
int32_t code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
|
||||
code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
|
@ -729,12 +736,12 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
|
|||
pHandle->tupleHandle.pBlock = pHandle->pDataBlock;
|
||||
return 0;
|
||||
} else {
|
||||
doAddToBuf(pHandle->pDataBlock, pHandle);
|
||||
code = doAddToBuf(pHandle->pDataBlock, pHandle);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsortOpen(SSortHandle* pHandle) {
|
||||
|
|
|
@ -126,6 +126,30 @@ static uint64_t allocateNewPositionInFile(SDiskbasedBuf* pBuf, size_t size) {
|
|||
|
||||
static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize + POINTER_BYTES + sizeof(SFilePage); }
|
||||
|
||||
static int32_t doFlushBufPageImpl(SDiskbasedBuf* pBuf, int64_t offset, const char* pData, int32_t size) {
|
||||
int32_t ret = taosLSeekFile(pBuf->pFile, offset, SEEK_SET);
|
||||
if (ret == -1) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
ret = (int32_t)taosWriteFile(pBuf->pFile, pData, size);
|
||||
if (ret != size) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
// extend the file
|
||||
if (pBuf->fileSize < offset + size) {
|
||||
pBuf->fileSize = offset + size;
|
||||
}
|
||||
|
||||
pBuf->statis.flushBytes += size;
|
||||
pBuf->statis.flushPages += 1;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static char* doFlushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) {
|
||||
if (pg->pData == NULL || pg->used) {
|
||||
uError("invalid params in paged buffer process when flushing buf to disk, %s", pBuf->id);
|
||||
|
@ -134,6 +158,8 @@ static char* doFlushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) {
|
|||
}
|
||||
|
||||
int32_t size = pBuf->pageSize;
|
||||
int64_t offset = pg->offset;
|
||||
|
||||
char* t = NULL;
|
||||
if ((!HAS_DATA_IN_DISK(pg)) || pg->dirty) {
|
||||
void* payload = GET_PAYLOAD_DATA(pg);
|
||||
|
@ -147,59 +173,29 @@ static char* doFlushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) {
|
|||
// this page is flushed to disk for the first time
|
||||
if (pg->dirty) {
|
||||
if (!HAS_DATA_IN_DISK(pg)) {
|
||||
pg->offset = allocateNewPositionInFile(pBuf, size);
|
||||
offset = allocateNewPositionInFile(pBuf, size);
|
||||
pBuf->nextPos += size;
|
||||
|
||||
int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET);
|
||||
if (ret == -1) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
int32_t code = doFlushBufPageImpl(pBuf, offset, t, size);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ret = (int32_t)taosWriteFile(pBuf->pFile, t, size);
|
||||
if (ret != size) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// extend the file size
|
||||
if (pBuf->fileSize < pg->offset + size) {
|
||||
pBuf->fileSize = pg->offset + size;
|
||||
}
|
||||
|
||||
pBuf->statis.flushBytes += size;
|
||||
pBuf->statis.flushPages += 1;
|
||||
} else {
|
||||
// length becomes greater, current space is not enough, allocate new place, otherwise, do nothing
|
||||
if (pg->length < size) {
|
||||
// 1. add current space to free list
|
||||
SPageDiskInfo dinfo = {.length = pg->length, .offset = pg->offset};
|
||||
SPageDiskInfo dinfo = {.length = pg->length, .offset = offset};
|
||||
taosArrayPush(pBuf->pFree, &dinfo);
|
||||
|
||||
// 2. allocate new position, and update the info
|
||||
pg->offset = allocateNewPositionInFile(pBuf, size);
|
||||
offset = allocateNewPositionInFile(pBuf, size);
|
||||
pBuf->nextPos += size;
|
||||
}
|
||||
|
||||
// 3. write to disk.
|
||||
int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET);
|
||||
if (ret == -1) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
int32_t code = doFlushBufPageImpl(pBuf, offset, t, size);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ret = (int32_t)taosWriteFile(pBuf->pFile, t, size);
|
||||
if (ret != size) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (pBuf->fileSize < pg->offset + size) {
|
||||
pBuf->fileSize = pg->offset + size;
|
||||
}
|
||||
|
||||
pBuf->statis.flushBytes += size;
|
||||
pBuf->statis.flushPages += 1;
|
||||
}
|
||||
} else { // NOTE: the size may be -1, the this recycle page has not been flushed to disk yet.
|
||||
size = pg->length;
|
||||
|
@ -209,9 +205,10 @@ static char* doFlushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) {
|
|||
memset(pDataBuf, 0, getAllocPageSize(pBuf->pageSize));
|
||||
|
||||
#ifdef BUF_PAGE_DEBUG
|
||||
uDebug("page_flush %p, pageId:%d, offset:%d", pDataBuf, pg->pageId, pg->offset);
|
||||
uDebug("page_flush %p, pageId:%d, offset:%d", pDataBuf, pg->pageId, offset);
|
||||
#endif
|
||||
|
||||
pg->offset = offset;
|
||||
pg->length = size; // on disk size
|
||||
return pDataBuf;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue