enhance: remove length col and refactor

This commit is contained in:
slzhou 2023-12-20 09:54:27 +08:00
parent f3847a00c6
commit bde2cf7b34
2 changed files with 28 additions and 23 deletions

View File

@ -312,7 +312,7 @@ typedef struct STableMergeScanInfo {
bool rtnNextDurationBlocks; bool rtnNextDurationBlocks;
int32_t nextDurationBlocksIdx; int32_t nextDurationBlocksIdx;
bool bSortRowId; bool bSortRowId;
STmsSortRowIdInfo tmsSortRowIdInfo; STmsSortRowIdInfo sortRowIdInfo;
} STableMergeScanInfo; } STableMergeScanInfo;
typedef struct STagScanFilterContext { typedef struct STagScanFilterContext {

View File

@ -3221,12 +3221,11 @@ _error:
// ========================= table merge scan // ========================= table merge scan
static int32_t saveBlockRowToBuf(STableMergeScanInfo* pInfo, SSDataBlock* pBlock, int32_t rowIdx, int32_t* pPageId, int32_t* pOffset, int32_t* pLength) { static int32_t saveBlockRowToBuf(STableMergeScanInfo* pInfo, SSDataBlock* pBlock, int32_t rowIdx, int32_t* pPageId, int32_t* pOffset, int32_t* pLength) {
SDiskbasedBuf* pResultBuf = pInfo->tmsSortRowIdInfo.pExtSrcRowsBuf; SDiskbasedBuf* pResultBuf = pInfo->sortRowIdInfo.pExtSrcRowsBuf;
int32_t rowBytes = blockDataGetRowSize(pBlock) + taosArrayGetSize(pBlock->pDataBlock); int32_t rowBytes = blockDataGetRowSize(pBlock) + taosArrayGetSize(pBlock->pDataBlock);
SFilePage* pFilePage = NULL; SFilePage* pFilePage = NULL;
// in the first scan, new space needed for results
int32_t pageId = -1; int32_t pageId = -1;
SArray* list = getDataBufPagesIdList(pResultBuf); SArray* list = getDataBufPagesIdList(pResultBuf);
@ -3244,7 +3243,6 @@ static int32_t saveBlockRowToBuf(STableMergeScanInfo* pInfo, SSDataBlock* pBlock
pageId = getPageId(pi); pageId = getPageId(pi);
if (pFilePage->num + rowBytes > getBufPageSize(pResultBuf)) { if (pFilePage->num + rowBytes > getBufPageSize(pResultBuf)) {
// release current page first, and prepare the next one
releaseBufPageInfo(pResultBuf, pi); releaseBufPageInfo(pResultBuf, pi);
pFilePage = getNewBufPage(pResultBuf, &pageId); pFilePage = getNewBufPage(pResultBuf, &pageId);
@ -3255,8 +3253,10 @@ static int32_t saveBlockRowToBuf(STableMergeScanInfo* pInfo, SSDataBlock* pBlock
} }
if (pFilePage == NULL) { if (pFilePage == NULL) {
return -1; qError("failed to get buffer, code:%s", tstrerror(terrno));
return terrno;
} }
*pPageId = pageId; *pPageId = pageId;
*pOffset = pFilePage->num; *pOffset = pFilePage->num;
char* buf = (char*)pFilePage + (*pOffset); char* buf = (char*)pFilePage + (*pOffset);
@ -3298,8 +3298,8 @@ static int32_t saveBlockRowToBuf(STableMergeScanInfo* pInfo, SSDataBlock* pBlock
return 0; return 0;
} }
static int32_t transformIntoSortInputBlock(STableMergeScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pSortInputBlk) { static int32_t fillSortInputBlock(STableMergeScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pSortInputBlk) {
STmsSortRowIdInfo* pSortInfo = &pInfo->tmsSortRowIdInfo; STmsSortRowIdInfo* pSortInfo = &pInfo->sortRowIdInfo;
int32_t nRows = pSrcBlock->info.rows; int32_t nRows = pSrcBlock->info.rows;
pSortInputBlk->info.window = pSrcBlock->info.window; pSortInputBlk->info.window = pSrcBlock->info.window;
@ -3321,7 +3321,6 @@ static int32_t transformIntoSortInputBlock(STableMergeScanInfo* pInfo, SSDataBlo
saveBlockRowToBuf(pInfo, pSrcBlock, i, &pageId, &offset, &length); saveBlockRowToBuf(pInfo, pSrcBlock, i, &pageId, &offset, &length);
colDataSetInt32(pageIdCol, i, &pageId); colDataSetInt32(pageIdCol, i, &pageId);
colDataSetInt32(offsetCol, i, &offset); colDataSetInt32(offsetCol, i, &offset);
colDataSetInt32(lengthCol, i, &length);
} }
pSortInputBlk->info.rows = nRows; pSortInputBlk->info.rows = nRows;
@ -3329,13 +3328,12 @@ static int32_t transformIntoSortInputBlock(STableMergeScanInfo* pInfo, SSDataBlo
return 0; return 0;
} }
void appendOneRowIdRowToDataBlock(STableMergeScanInfo* pInfo, SSDataBlock* pBlock, STupleHandle* pTupleHandle) { static void appendOneRowIdRowToDataBlock(STableMergeScanInfo* pInfo, SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
STmsSortRowIdInfo* pSortInfo = &pInfo->tmsSortRowIdInfo; STmsSortRowIdInfo* pSortInfo = &pInfo->sortRowIdInfo;
int32_t pageId = *(int32_t*)tsortGetValue(pTupleHandle, 1); int32_t pageId = *(int32_t*)tsortGetValue(pTupleHandle, 1);
int32_t offset = *(int32_t*)tsortGetValue(pTupleHandle, 2); int32_t offset = *(int32_t*)tsortGetValue(pTupleHandle, 2);
int32_t length = *(int32_t*)tsortGetValue(pTupleHandle, 3); void* page = getBufPage(pInfo->sortRowIdInfo.pExtSrcRowsBuf, pageId);
void* page = getBufPage(pInfo->tmsSortRowIdInfo.pExtSrcRowsBuf, pageId);
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
char* buf = (char*)page + offset; char* buf = (char*)page + offset;
@ -3359,7 +3357,7 @@ void appendOneRowIdRowToDataBlock(STableMergeScanInfo* pInfo, SSDataBlock* pBloc
colDataSetNULL(pColInfo, pBlock->info.rows); colDataSetNULL(pColInfo, pBlock->info.rows);
} }
} }
releaseBufPage(pInfo->tmsSortRowIdInfo.pExtSrcRowsBuf, page); releaseBufPage(pInfo->sortRowIdInfo.pExtSrcRowsBuf, page);
pBlock->info.dataLoad = 1; pBlock->info.dataLoad = 1;
pBlock->info.scanFlag = ((SDataBlockInfo*)tsortGetBlockInfo(pTupleHandle))->scanFlag; pBlock->info.scanFlag = ((SDataBlockInfo*)tsortGetBlockInfo(pTupleHandle))->scanFlag;
@ -3511,7 +3509,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
SSDataBlock* pSortInputBlk = NULL; SSDataBlock* pSortInputBlk = NULL;
if (pInfo->bSortRowId) { if (pInfo->bSortRowId) {
blockDataCleanup(pInfo->pSortInputBlock); blockDataCleanup(pInfo->pSortInputBlock);
transformIntoSortInputBlock(pInfo, pBlock, pInfo->pSortInputBlock); fillSortInputBlock(pInfo, pBlock, pInfo->pSortInputBlock);
pSortInputBlk = pInfo->pSortInputBlock; pSortInputBlk = pInfo->pSortInputBlock;
} else { } else {
pSortInputBlk = pBlock; pSortInputBlk = pBlock;
@ -3568,17 +3566,17 @@ void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo*
} }
int32_t startRowIdSort(STableMergeScanInfo *pInfo) { int32_t startRowIdSort(STableMergeScanInfo *pInfo) {
STmsSortRowIdInfo* pSort = &pInfo->tmsSortRowIdInfo; STmsSortRowIdInfo* pSort = &pInfo->sortRowIdInfo;
int32_t pageSize = getProperSortPageSize(blockDataGetRowSize(pInfo->pResBlock), int32_t pageSize = getProperSortPageSize(blockDataGetRowSize(pInfo->pResBlock),
taosArrayGetSize(pInfo->pResBlock->pDataBlock)); taosArrayGetSize(pInfo->pResBlock->pDataBlock));
int32_t memSize = pageSize * 4 * 8192; int32_t memSize = pageSize * 1024;
createDiskbasedBuf(&pSort->pExtSrcRowsBuf, pageSize * 4, memSize, "tms-ext-src-block", tsTempDir); int32_t code = createDiskbasedBuf(&pSort->pExtSrcRowsBuf, pageSize, memSize, "tms-ext-src-block", tsTempDir);
dBufSetPrintInfo(pSort->pExtSrcRowsBuf); dBufSetPrintInfo(pSort->pExtSrcRowsBuf);
return 0; return code;
} }
int32_t stopRowIdSort(STableMergeScanInfo *pInfo) { int32_t stopRowIdSort(STableMergeScanInfo *pInfo) {
STmsSortRowIdInfo* pSort = &pInfo->tmsSortRowIdInfo; STmsSortRowIdInfo* pSort = &pInfo->sortRowIdInfo;
destroyDiskbasedBuf(pSort->pExtSrcRowsBuf); destroyDiskbasedBuf(pSort->pExtSrcRowsBuf);
pSort->pExtSrcRowsBuf = NULL; pSort->pExtSrcRowsBuf = NULL;
@ -3595,7 +3593,11 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
pInfo->bNewFilesetEvent = false; pInfo->bNewFilesetEvent = false;
pInfo->bNextDurationBlockEvent = false; pInfo->bNextDurationBlockEvent = false;
startRowIdSort(pInfo); code = startRowIdSort(pInfo);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
pInfo->sortBufSize = 2048 * pInfo->bufPageSize; pInfo->sortBufSize = 2048 * pInfo->bufPageSize;
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage, pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage,
@ -3814,6 +3816,11 @@ void destroyTableMergeScanOperatorInfo(void* param) {
STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param; STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
cleanupQueryTableDataCond(&pTableScanInfo->base.cond); cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
if (pTableScanInfo->sortRowIdInfo.pExtSrcRowsBuf != NULL) {
destroyDiskbasedBuf(pTableScanInfo->sortRowIdInfo.pExtSrcRowsBuf);
pTableScanInfo->sortRowIdInfo.pExtSrcRowsBuf = NULL;
}
int32_t numOfTable = taosArrayGetSize(pTableScanInfo->sortSourceParams); int32_t numOfTable = taosArrayGetSize(pTableScanInfo->sortSourceParams);
pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader); pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
@ -3931,8 +3938,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
blockDataAppendColInfo(pSortInput, &pageIdCol); blockDataAppendColInfo(pSortInput, &pageIdCol);
SColumnInfoData offsetCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 3); SColumnInfoData offsetCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 3);
blockDataAppendColInfo(pSortInput, &offsetCol); blockDataAppendColInfo(pSortInput, &offsetCol);
SColumnInfoData lengthCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 4);
blockDataAppendColInfo(pSortInput, &lengthCol);
pInfo->pSortInputBlock = pSortInput; pInfo->pSortInputBlock = pSortInput;
int32_t srcTsSlotId = 0; int32_t srcTsSlotId = 0;
@ -3942,7 +3947,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
srcTsSlotId = colInfo->dstSlotId; srcTsSlotId = colInfo->dstSlotId;
} }
} }
pInfo->tmsSortRowIdInfo.srcTsSlotId = srcTsSlotId; pInfo->sortRowIdInfo.srcTsSlotId = srcTsSlotId;
SArray* pList = taosArrayInit(1, sizeof(SBlockOrderInfo)); SArray* pList = taosArrayInit(1, sizeof(SBlockOrderInfo));
SBlockOrderInfo bi = {0}; SBlockOrderInfo bi = {0};
bi.order = pInfo->base.cond.order; bi.order = pInfo->base.cond.order;