Merge pull request #24360 from taosdata/szhou/fix/TD-28169
fix: sort error processing of out of space
This commit is contained in:
commit
003e4a886d
|
@ -157,7 +157,7 @@ INSERT INTO d21001 USING meters TAGS ('California.SanFrancisco', 2) FILE '/tmp/c
|
||||||
Automatically creating table and the table name is specified through the `tbname` column
|
Automatically creating table and the table name is specified through the `tbname` column
|
||||||
|
|
||||||
```sql
|
```sql
|
||||||
INSERT INTO meters(tbname, location, groupId, ts, current, phase)
|
INSERT INTO meters(tbname, location, groupId, ts, current, voltage, phase)
|
||||||
values('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:34.630', 10.2, 219, 0.32)
|
values('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:34.630', 10.2, 219, 0.32)
|
||||||
('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:35.779', 10.15, 217, 0.33)
|
('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:35.779', 10.15, 217, 0.33)
|
||||||
('d31002', NULL, 2, '2021-07-13 14:06:34.255', 10.15, 217, 0.33)
|
('d31002', NULL, 2, '2021-07-13 14:06:34.255', 10.15, 217, 0.33)
|
||||||
|
|
|
@ -157,7 +157,7 @@ INSERT INTO d21001 USING meters TAGS ('California.SanFrancisco', 2) FILE '/tmp/c
|
||||||
|
|
||||||
自动建表, 表名通过tbname列指定
|
自动建表, 表名通过tbname列指定
|
||||||
```sql
|
```sql
|
||||||
INSERT INTO meters(tbname, location, groupId, ts, current, phase)
|
INSERT INTO meters(tbname, location, groupId, ts, current, voltage, phase)
|
||||||
values('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:34.630', 10.2, 219, 0.32)
|
values('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:34.630', 10.2, 219, 0.32)
|
||||||
('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:35.779', 10.15, 217, 0.33)
|
('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:35.779', 10.15, 217, 0.33)
|
||||||
('d31002', NULL, 2, '2021-07-13 14:06:34.255', 10.15, 217, 0.33)
|
('d31002', NULL, 2, '2021-07-13 14:06:34.255', 10.15, 217, 0.33)
|
||||||
|
|
|
@ -878,6 +878,9 @@ static int32_t blockCompareTsFn(const void* pLeft, const void* pRight, void* par
|
||||||
static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk, SArray* aPgId) {
|
static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk, SArray* aPgId) {
|
||||||
int32_t pageId = -1;
|
int32_t pageId = -1;
|
||||||
void* pPage = getNewBufPage(pHandle->pBuf, &pageId);
|
void* pPage = getNewBufPage(pHandle->pBuf, &pageId);
|
||||||
|
if (pPage == NULL) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
taosArrayPush(aPgId, &pageId);
|
taosArrayPush(aPgId, &pageId);
|
||||||
|
|
||||||
int32_t size = blockDataGetSize(blk) + sizeof(int32_t) + taosArrayGetSize(blk->pDataBlock) * sizeof(int32_t);
|
int32_t size = blockDataGetSize(blk) + sizeof(int32_t) + taosArrayGetSize(blk->pDataBlock) * sizeof(int32_t);
|
||||||
|
@ -944,16 +947,15 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
|
||||||
totalRows += blk->info.rows;
|
totalRows += blk->info.rows;
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* aPgId = taosArrayInit(8, sizeof(int32_t));
|
|
||||||
|
|
||||||
SMultiwayMergeTreeInfo* pTree = NULL;
|
SMultiwayMergeTreeInfo* pTree = NULL;
|
||||||
code = tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, blockCompareTsFn);
|
code = tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, blockCompareTsFn);
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
taosMemoryFree(sup.aRowIdx);
|
taosMemoryFree(sup.aRowIdx);
|
||||||
taosMemoryFree(sup.aTs);
|
taosMemoryFree(sup.aTs);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SArray* aPgId = taosArrayInit(8, sizeof(int32_t));
|
||||||
int32_t nRows = 0;
|
int32_t nRows = 0;
|
||||||
int32_t nMergedRows = 0;
|
int32_t nMergedRows = 0;
|
||||||
bool mergeLimitReached = false;
|
bool mergeLimitReached = false;
|
||||||
|
@ -969,7 +971,14 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
|
||||||
if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) {
|
if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) {
|
||||||
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, order->slotId);
|
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, order->slotId);
|
||||||
lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
|
lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
|
||||||
appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
|
code = appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
taosMemoryFree(pTree);
|
||||||
|
taosArrayDestroy(aPgId);
|
||||||
|
taosMemoryFree(sup.aRowIdx);
|
||||||
|
taosMemoryFree(sup.aTs);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
nMergedRows += pHandle->pDataBlock->info.rows;
|
nMergedRows += pHandle->pDataBlock->info.rows;
|
||||||
blockDataCleanup(pHandle->pDataBlock);
|
blockDataCleanup(pHandle->pDataBlock);
|
||||||
blkPgSz = pgHeaderSz;
|
blkPgSz = pgHeaderSz;
|
||||||
|
@ -1001,7 +1010,14 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
|
||||||
if (!mergeLimitReached) {
|
if (!mergeLimitReached) {
|
||||||
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, order->slotId);
|
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, order->slotId);
|
||||||
lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
|
lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
|
||||||
appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
|
code = appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
taosArrayDestroy(aPgId);
|
||||||
|
taosMemoryFree(pTree);
|
||||||
|
taosMemoryFree(sup.aRowIdx);
|
||||||
|
taosMemoryFree(sup.aTs);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
nMergedRows += pHandle->pDataBlock->info.rows;
|
nMergedRows += pHandle->pDataBlock->info.rows;
|
||||||
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
|
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
|
||||||
mergeLimitReached = true;
|
mergeLimitReached = true;
|
||||||
|
|
Loading…
Reference in New Issue