fix: save data when creating intial sources

This commit is contained in:
shenglian zhou 2024-01-03 14:03:22 +08:00
parent 91a705f731
commit fbf67e4294
4 changed files with 276 additions and 280 deletions

View File

@ -273,11 +273,6 @@ typedef struct STableScanInfo {
bool filesetDelimited;
} STableScanInfo;
typedef struct STmsSortRowIdInfo {
SDiskbasedBuf* pExtSrcRowsBuf;
int32_t srcTsSlotId;
} STmsSortRowIdInfo;
typedef struct STableMergeScanInfo {
int32_t tableStartIndex;
int32_t tableEndIndex;
@ -312,7 +307,6 @@ typedef struct STableMergeScanInfo {
bool rtnNextDurationBlocks;
int32_t nextDurationBlocksIdx;
bool bSortRowId;
STmsSortRowIdInfo sortRowIdInfo;
} STableMergeScanInfo;
typedef struct STagScanFilterContext {

View File

@ -194,6 +194,9 @@ void tsortSetClosed(SSortHandle* pHandle);
void tsortSetSingleTableMerge(SSortHandle* pHandle);
void tsortSetAbortCheckFn(SSortHandle* pHandle, bool (*checkFn)(void* param), void* param);
int32_t tsortSetSortByRowId(SSortHandle* pHandle, int32_t extRowsPageSize, int32_t extRowsSize);
void tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupleHandle* pTupleHandle);
/**
* @brief comp the tuple with keyBuf, if not equal, new keys will be built in keyBuf, newLen will be stored in keyLen
* @param [in] pSortCols cols to comp and build

View File

@ -3220,179 +3220,6 @@ _error:
// ========================= table merge scan
static int32_t getPageFromExtSrcRowsBuf(SDiskbasedBuf* pResultBuf, int32_t rowBytes, int32_t* pPageId, SFilePage** ppFilePage) {
SFilePage* pFilePage = NULL;
int32_t pageId = -1;
SArray* list = getDataBufPagesIdList(pResultBuf);
if (taosArrayGetSize(list) == 0) {
pFilePage = getNewBufPage(pResultBuf, &pageId);
pFilePage->num = sizeof(SFilePage);
} else {
SPageInfo* pi = getLastPageInfo(list);
pFilePage = getBufPage(pResultBuf, getPageId(pi));
if (pFilePage == NULL) {
qError("failed to get buffer, code:%s", tstrerror(terrno));
return terrno;
}
pageId = getPageId(pi);
if (pFilePage->num + rowBytes > getBufPageSize(pResultBuf)) {
releaseBufPageInfo(pResultBuf, pi);
pFilePage = getNewBufPage(pResultBuf, &pageId);
if (pFilePage != NULL) {
pFilePage->num = sizeof(SFilePage);
}
}
}
if (pFilePage == NULL) {
qError("failed to get buffer, code:%s", tstrerror(terrno));
return terrno;
}
*pPageId = pageId;
*ppFilePage = pFilePage;
return TSDB_CODE_SUCCESS;
}
static int32_t blockRowToBuf(SSDataBlock* pBlock, int32_t rowIdx, char* buf) {
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
char* isNull = (char*)buf;
char* pStart = (char*)buf + sizeof(int8_t) * numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
if (colDataIsNull_s(pCol, rowIdx)) {
isNull[i] = 1;
continue;
}
isNull[i] = 0;
char* pData = colDataGetData(pCol, rowIdx);
if (pCol->info.type == TSDB_DATA_TYPE_JSON) {
if (pCol->pData) {
int32_t dataLen = getJsonValueLen(pData);
memcpy(pStart, pData, dataLen);
pStart += dataLen;
} else {
// the column that is pre-allocated has no data and has offset
*pStart = 0;
pStart += 1;
}
} else if (IS_VAR_DATA_TYPE(pCol->info.type)) {
if (pCol->pData) {
varDataCopy(pStart, pData);
pStart += varDataTLen(pData);
} else {
// the column that is pre-allocated has no data and has offset
*(VarDataLenT*)(pStart) = 0;
pStart += VARSTR_HEADER_SIZE;
}
} else {
int32_t bytes = pCol->info.bytes;
memcpy(pStart, pData, bytes);
pStart += bytes;
}
}
*(int32_t*)pStart = (char*)pStart - (char*)buf;
pStart += sizeof(int32_t);
return (int32_t)(pStart - (char*)buf);
}
static int32_t saveBlockRowToExtRowsBuf(STableMergeScanInfo* pInfo, SSDataBlock* pBlock, int32_t rowIdx, int32_t* pPageId, int32_t* pOffset, int32_t* pLength) {
SDiskbasedBuf* pResultBuf = pInfo->sortRowIdInfo.pExtSrcRowsBuf;
int32_t rowBytes = blockDataGetRowSize(pBlock) + taosArrayGetSize(pBlock->pDataBlock) + sizeof(int32_t);
int32_t pageId = -1;
SFilePage* pFilePage = NULL;
int32_t code = getPageFromExtSrcRowsBuf(pResultBuf, rowBytes, &pageId, &pFilePage);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
*pPageId = pageId;
*pOffset = pFilePage->num;
*pLength = blockRowToBuf(pBlock, rowIdx, (char*)pFilePage + (*pOffset));
pFilePage->num += (*pLength);
setBufPageDirty(pFilePage, true);
releaseBufPage(pResultBuf, pFilePage);
return 0;
}
static int32_t fillSortInputBlock(STableMergeScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pSortInputBlk) {
STmsSortRowIdInfo* pSortInfo = &pInfo->sortRowIdInfo;
int32_t nRows = pSrcBlock->info.rows;
pSortInputBlk->info.window = pSrcBlock->info.window;
pSortInputBlk->info.id = pSrcBlock->info.id;
blockDataEnsureCapacity(pSortInputBlk, nRows);
SColumnInfoData* tsCol = taosArrayGet(pSortInputBlk->pDataBlock, 0);
SColumnInfoData* pSrcTsCol = taosArrayGet(pSrcBlock->pDataBlock, pSortInfo->srcTsSlotId);
colDataAssign(tsCol, pSrcTsCol, nRows, &pSortInputBlk->info);
SColumnInfoData* pageIdCol = taosArrayGet(pSortInputBlk->pDataBlock, 1);
SColumnInfoData* offsetCol = taosArrayGet(pSortInputBlk->pDataBlock, 2);
for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) {
int32_t pageId = -1;
int32_t offset = -1;
int32_t length = -1;
saveBlockRowToExtRowsBuf(pInfo, pSrcBlock, i, &pageId, &offset, &length);
colDataSetInt32(pageIdCol, i, &pageId);
colDataSetInt32(offsetCol, i, &offset);
}
pSortInputBlk->info.rows = nRows;
return 0;
}
static void appendOneRowIdRowToDataBlock(STableMergeScanInfo* pInfo, SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
STmsSortRowIdInfo* pSortInfo = &pInfo->sortRowIdInfo;
int32_t pageId = *(int32_t*)tsortGetValue(pTupleHandle, 1);
int32_t offset = *(int32_t*)tsortGetValue(pTupleHandle, 2);
void* page = getBufPage(pInfo->sortRowIdInfo.pExtSrcRowsBuf, pageId);
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
char* buf = (char*)page + offset;
char* isNull = (char*)buf;
char* pStart = (char*)buf + sizeof(int8_t) * numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
if (!isNull[i]) {
colDataSetVal(pColInfo, pBlock->info.rows, pStart, false);
if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) {
int32_t dataLen = getJsonValueLen(pStart);
pStart += dataLen;
} else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
pStart += varDataTLen(pStart);
} else {
int32_t bytes = pColInfo->info.bytes;
pStart += bytes;
}
} else {
colDataSetNULL(pColInfo, pBlock->info.rows);
}
}
if (*(int32_t*)pStart != pStart-buf) {
qError("table merge scan row buf deserialization. length error %d != %d ", *(int32_t*)pStart, (int32_t)(pStart-buf));
};
releaseBufPage(pInfo->sortRowIdInfo.pExtSrcRowsBuf, page);
pBlock->info.dataLoad = 1;
pBlock->info.scanFlag = ((SDataBlockInfo*)tsortGetBlockInfo(pTupleHandle))->scanFlag;
pBlock->info.rows += 1;
}
static int32_t tableMergeScanDoSkipTable(STableMergeScanInfo* pInfo, SSDataBlock* pBlock) {
int64_t nRows = 0;
void* pNum = tSimpleHashGet(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));
@ -3535,18 +3362,9 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
pOperator->resultInfo.totalRows += pBlock->info.rows;
SSDataBlock* pSortInputBlk = NULL;
if (pInfo->bSortRowId) {
blockDataCleanup(pInfo->pSortInputBlock);
fillSortInputBlock(pInfo, pBlock, pInfo->pSortInputBlock);
pSortInputBlk = pInfo->pSortInputBlock;
} else {
pSortInputBlk = pBlock;
}
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
return pSortInputBlk;
return pBlock;
}
return NULL;
@ -3594,26 +3412,6 @@ void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo*
return;
}
int32_t startRowIdSort(STableMergeScanInfo *pInfo) {
STmsSortRowIdInfo* pSort = &pInfo->sortRowIdInfo;
int32_t pageSize = getProperSortPageSize(blockDataGetRowSize(pInfo->pResBlock),
taosArrayGetSize(pInfo->pResBlock->pDataBlock));
pageSize *= 2;
int numOfTables = pInfo->tableEndIndex - pInfo->tableStartIndex + 1;
int32_t memSize = TMIN(pageSize * numOfTables, 512 * 1024 * 1024);
int32_t code = createDiskbasedBuf(&pSort->pExtSrcRowsBuf, pageSize, memSize, "tms-ext-src-block", tsTempDir);
dBufSetPrintInfo(pSort->pExtSrcRowsBuf);
return code;
}
int32_t stopRowIdSort(STableMergeScanInfo *pInfo) {
STmsSortRowIdInfo* pSort = &pInfo->sortRowIdInfo;
destroyDiskbasedBuf(pSort->pExtSrcRowsBuf);
pSort->pExtSrcRowsBuf = NULL;
return 0;
}
int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -3624,16 +3422,20 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
pInfo->bNewFilesetEvent = false;
pInfo->bNextDurationBlockEvent = false;
code = startRowIdSort(pInfo);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
pInfo->sortBufSize = 2048 * pInfo->bufPageSize;
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
if (pInfo->bSortRowId) {
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
int32_t memSize = 512 * 1024 * 1024;
int32_t rowBytes = blockDataGetRowSize(pInfo->pResBlock) + taosArrayGetSize(pInfo->pResBlock->pDataBlock) + sizeof(int32_t);
int32_t pageSize = TMAX(memSize/numOfTable, rowBytes);
tsortSetSortByRowId(pInfo->pSortHandle, pageSize, memSize);
} else {
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
}
tsortSetMergeLimit(pInfo->pSortHandle, pInfo->mergeLimit);
tsortSetAbortCheckFn(pInfo->pSortHandle, isTaskKilled, pOperator->pTaskInfo);
@ -3670,7 +3472,6 @@ void stopDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
tsortDestroySortHandle(pInfo->pSortHandle);
pInfo->pSortHandle = NULL;
stopRowIdSort(pInfo);
}
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
@ -3757,11 +3558,7 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
if (pTupleHandle == NULL) {
break;
}
if (!pInfo->bSortRowId) {
appendOneRowToDataBlock(pResBlock, pTupleHandle);
} else {
appendOneRowIdRowToDataBlock(pInfo, pResBlock, pTupleHandle);
}
tsortAppendTupleToBlock(pInfo->pSortHandle, pResBlock, pTupleHandle);
if (pResBlock->info.rows >= capacity) {
break;
}
@ -3847,11 +3644,6 @@ void destroyTableMergeScanOperatorInfo(void* param) {
STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
if (pTableScanInfo->sortRowIdInfo.pExtSrcRowsBuf != NULL) {
destroyDiskbasedBuf(pTableScanInfo->sortRowIdInfo.pExtSrcRowsBuf);
pTableScanInfo->sortRowIdInfo.pExtSrcRowsBuf = NULL;
}
int32_t numOfTable = taosArrayGetSize(pTableScanInfo->sortSourceParams);
pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
@ -3895,34 +3687,6 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla
return TSDB_CODE_SUCCESS;
}
static void initRowIdSortputBlock(STableMergeScanInfo* pInfo) {
SSDataBlock* pSortInput = createDataBlock();
SColumnInfoData tsCol = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, 8, 1);
blockDataAppendColInfo(pSortInput, &tsCol);
SColumnInfoData pageIdCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 2);
blockDataAppendColInfo(pSortInput, &pageIdCol);
SColumnInfoData offsetCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 3);
blockDataAppendColInfo(pSortInput, &offsetCol);
pInfo->pSortInputBlock = pSortInput;
int32_t srcTsSlotId = 0;
for (int32_t i = 0; i < taosArrayGetSize(pInfo->base.matchInfo.pList); ++i) {
SColMatchItem* colInfo = taosArrayGet(pInfo->base.matchInfo.pList, i);
if (colInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
srcTsSlotId = colInfo->dstSlotId;
}
}
pInfo->sortRowIdInfo.srcTsSlotId = srcTsSlotId;
SArray* pList = taosArrayInit(1, sizeof(SBlockOrderInfo));
SBlockOrderInfo bi = {0};
bi.order = pInfo->base.cond.order;
bi.slotId = 0;
bi.nullFirst = NULL_ORDER_FIRST;
taosArrayPush(pList, &bi);
pInfo->pSortInfo = pList;
return;
}
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
@ -4000,12 +3764,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->bSortRowId = false;
}
if (!pInfo->bSortRowId) {
pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
} else {
initRowIdSortputBlock(pInfo);
}
pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
int32_t rowSize = blockDataGetRowSize(pInfo->pSortInputBlock);

View File

@ -75,6 +75,13 @@ struct SSortHandle {
bool (*abortCheckFn)(void* param);
void* abortCheckParam;
bool bSortByRowId;
SDiskbasedBuf* pExtRowsBuf;
int32_t extRowsPageSize;
int32_t extRowsMemSize;
int32_t srcTsSlotId;
SBlockOrderInfo extRowsOrderInfo;
};
void tsortSetSingleTableMerge(SSortHandle* pHandle) {
@ -199,7 +206,7 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page
pSortHandle->type = type;
pSortHandle->pageSize = pageSize;
pSortHandle->numOfPages = numOfPages;
pSortHandle->pSortInfo = pSortInfo;
pSortHandle->pSortInfo = taosArrayDup(pSortInfo, NULL);
pSortHandle->loops = 0;
pSortHandle->pqMaxTupleLength = pqMaxTupleLength;
@ -303,6 +310,10 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) {
taosArrayDestroy(pSortHandle->pOrderedSource);
taosMemoryFreeClear(pSortHandle);
if (pSortHandle->pExtRowsBuf != NULL) {
destroyDiskbasedBuf(pSortHandle->pExtRowsBuf);
}
taosArrayDestroy(pSortHandle->pSortInfo);
}
int32_t tsortAddSource(SSortHandle* pSortHandle, void* pSource) {
@ -848,6 +859,228 @@ static int32_t createPageBuf(SSortHandle* pHandle) {
return 0;
}
void tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
if (pHandle->bSortByRowId) {
int32_t pageId = *(int32_t*)tsortGetValue(pTupleHandle, 1);
int32_t offset = *(int32_t*)tsortGetValue(pTupleHandle, 2);
void* page = getBufPage(pHandle->pExtRowsBuf, pageId);
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
char* buf = (char*)page + offset;
char* isNull = (char*)buf;
char* pStart = (char*)buf + sizeof(int8_t) * numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
if (!isNull[i]) {
colDataSetVal(pColInfo, pBlock->info.rows, pStart, false);
if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) {
int32_t dataLen = getJsonValueLen(pStart);
pStart += dataLen;
} else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
pStart += varDataTLen(pStart);
} else {
int32_t bytes = pColInfo->info.bytes;
pStart += bytes;
}
} else {
colDataSetNULL(pColInfo, pBlock->info.rows);
}
}
if (*(int32_t*)pStart != pStart - buf) {
qError("table merge scan row buf deserialization. length error %d != %d ", *(int32_t*)pStart,
(int32_t)(pStart - buf));
};
releaseBufPage(pHandle->pExtRowsBuf, page);
pBlock->info.dataLoad = 1;
pBlock->info.scanFlag = ((SDataBlockInfo*)tsortGetBlockInfo(pTupleHandle))->scanFlag;
pBlock->info.rows += 1;
} else {
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
bool isNull = tsortIsNullVal(pTupleHandle, i);
if (isNull) {
colDataSetNULL(pColInfo, pBlock->info.rows);
} else {
char* pData = tsortGetValue(pTupleHandle, i);
if (pData != NULL) {
colDataSetVal(pColInfo, pBlock->info.rows, pData, false);
}
}
}
pBlock->info.dataLoad = 1;
pBlock->info.scanFlag = ((SDataBlockInfo*)tsortGetBlockInfo(pTupleHandle))->scanFlag;
pBlock->info.rows += 1;
}
}
static int32_t getPageFromExtSrcRowsBuf(SDiskbasedBuf* pResultBuf, int32_t rowBytes, int32_t* pPageId, SFilePage** ppFilePage) {
SFilePage* pFilePage = NULL;
int32_t pageId = -1;
SArray* list = getDataBufPagesIdList(pResultBuf);
if (taosArrayGetSize(list) == 0) {
pFilePage = getNewBufPage(pResultBuf, &pageId);
pFilePage->num = sizeof(SFilePage);
} else {
SPageInfo* pi = getLastPageInfo(list);
pFilePage = getBufPage(pResultBuf, getPageId(pi));
if (pFilePage == NULL) {
qError("failed to get buffer, code:%s", tstrerror(terrno));
return terrno;
}
pageId = getPageId(pi);
if (pFilePage->num + rowBytes > getBufPageSize(pResultBuf)) {
releaseBufPageInfo(pResultBuf, pi);
pFilePage = getNewBufPage(pResultBuf, &pageId);
if (pFilePage != NULL) {
pFilePage->num = sizeof(SFilePage);
}
}
}
if (pFilePage == NULL) {
qError("failed to get buffer, code:%s", tstrerror(terrno));
return terrno;
}
*pPageId = pageId;
*ppFilePage = pFilePage;
return TSDB_CODE_SUCCESS;
}
static int32_t blockRowToBuf(SSDataBlock* pBlock, int32_t rowIdx, char* buf) {
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
char* isNull = (char*)buf;
char* pStart = (char*)buf + sizeof(int8_t) * numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
if (colDataIsNull_s(pCol, rowIdx)) {
isNull[i] = 1;
continue;
}
isNull[i] = 0;
char* pData = colDataGetData(pCol, rowIdx);
if (pCol->info.type == TSDB_DATA_TYPE_JSON) {
if (pCol->pData) {
int32_t dataLen = getJsonValueLen(pData);
memcpy(pStart, pData, dataLen);
pStart += dataLen;
} else {
// the column that is pre-allocated has no data and has offset
*pStart = 0;
pStart += 1;
}
} else if (IS_VAR_DATA_TYPE(pCol->info.type)) {
if (pCol->pData) {
varDataCopy(pStart, pData);
pStart += varDataTLen(pData);
} else {
// the column that is pre-allocated has no data and has offset
*(VarDataLenT*)(pStart) = 0;
pStart += VARSTR_HEADER_SIZE;
}
} else {
int32_t bytes = pCol->info.bytes;
memcpy(pStart, pData, bytes);
pStart += bytes;
}
}
*(int32_t*)pStart = (char*)pStart - (char*)buf;
pStart += sizeof(int32_t);
return (int32_t)(pStart - (char*)buf);
}
static int32_t saveBlockRowToExtRowsBuf(SSortHandle* pHandle, SSDataBlock* pBlock, int32_t rowIdx, int32_t* pPageId, int32_t* pOffset, int32_t* pLength) {
SDiskbasedBuf* pResultBuf = pHandle->pExtRowsBuf;
int32_t rowBytes = blockDataGetRowSize(pBlock) + taosArrayGetSize(pBlock->pDataBlock) + sizeof(int32_t);
int32_t pageId = -1;
SFilePage* pFilePage = NULL;
int32_t code = getPageFromExtSrcRowsBuf(pResultBuf, rowBytes, &pageId, &pFilePage);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
*pPageId = pageId;
*pOffset = pFilePage->num;
*pLength = blockRowToBuf(pBlock, rowIdx, (char*)pFilePage + (*pOffset));
pFilePage->num += (*pLength);
setBufPageDirty(pFilePage, true);
releaseBufPage(pResultBuf, pFilePage);
return 0;
}
static void appendToRowIndexDataBlock(SSortHandle* pHandle, SSDataBlock* pSource, int32_t* rowIndex) {
int32_t pageId = -1;
int32_t offset = -1;
int32_t length = -1;
saveBlockRowToExtRowsBuf(pHandle, pSource, *rowIndex, &pageId, &offset, &length);
SSDataBlock* pBlock = pHandle->pDataBlock;
SColumnInfoData* pSrcTsCol = taosArrayGet(pSource->pDataBlock, pHandle->extRowsOrderInfo.slotId);
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, 0);
char* pData = colDataGetData(pSrcTsCol, *rowIndex);
colDataSetVal(pTsCol, pBlock->info.rows, pData, false);
SColumnInfoData* pPageIdCol = taosArrayGet(pBlock->pDataBlock, 1);
colDataSetInt32(pPageIdCol, pBlock->info.rows, &pageId);
SColumnInfoData* pOffsetCol = taosArrayGet(pBlock->pDataBlock, 2);
colDataSetInt32(pOffsetCol, pBlock->info.rows, &offset);
pBlock->info.rows += 1;
*rowIndex += 1;
}
static void initRowIdSort(SSortHandle* pHandle) {
blockDataDestroy(pHandle->pDataBlock);
SSDataBlock* pSortInput = createDataBlock();
SColumnInfoData tsCol = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, 8, 1);
blockDataAppendColInfo(pSortInput, &tsCol);
SColumnInfoData pageIdCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 2);
blockDataAppendColInfo(pSortInput, &pageIdCol);
SColumnInfoData offsetCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 3);
blockDataAppendColInfo(pSortInput, &offsetCol);
pHandle->pDataBlock = pSortInput;
SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0);
SBlockOrderInfo bi = {0};
bi.order = pOrder->order;
bi.slotId = 0;
bi.nullFirst = NULL_ORDER_FIRST;
SArray* aOrder = taosArrayInit(1, sizeof(SBlockOrderInfo));
taosArrayPush(aOrder, &bi);
taosArrayDestroy(pHandle->pSortInfo);
pHandle->pSortInfo = aOrder;
return;
}
int32_t tsortSetSortByRowId(SSortHandle* pHandle, int32_t extRowsPageSize, int32_t extRowsMemSize) {
int32_t code = createDiskbasedBuf(&pHandle->pExtRowsBuf, extRowsPageSize, extRowsMemSize, "sort-ext-rows", tsTempDir);
pHandle->extRowsPageSize = extRowsPageSize;
pHandle->extRowsMemSize = extRowsMemSize;
SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0);
pHandle->extRowsOrderInfo = *pOrder;
initRowIdSort(pHandle);
pHandle->bSortByRowId = true;
return code;
}
typedef struct SBlkMergeSupport {
int64_t** aTs;
int32_t* aRowIdx;
@ -919,7 +1152,7 @@ static int32_t getPageBufIncForRow(SSDataBlock* blk, int32_t row, int32_t rowIdx
return sz;
}
static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockOrderInfo* order, SArray* aExtSrc) {
static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* aExtSrc) {
int32_t code = TSDB_CODE_SUCCESS;
int pgHeaderSz = sizeof(int32_t) + sizeof(int32_t) * taosArrayGetSize(pHandle->pDataBlock->pDataBlock);
int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pgHeaderSz);
@ -927,13 +1160,15 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
blockDataCleanup(pHandle->pDataBlock);
int32_t numBlks = taosArrayGetSize(aBlk);
SBlockOrderInfo* pOrigBlockOrder = (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 0) : &pHandle->extRowsOrderInfo;
SBlockOrderInfo* pHandleBlockOrder = taosArrayGet(pHandle->pSortInfo, 0);
SBlkMergeSupport sup;
sup.aRowIdx = taosMemoryCalloc(numBlks, sizeof(int32_t));
sup.aTs = taosMemoryCalloc(numBlks, sizeof(int64_t*));
sup.order = order->order;
sup.order = pOrigBlockOrder->order;
for (int i = 0; i < numBlks; ++i) {
SSDataBlock* blk = taosArrayGetP(aBlk, i);
SColumnInfoData* col = taosArrayGet(blk->pDataBlock, order->slotId);
SColumnInfoData* col = taosArrayGet(blk->pDataBlock, pOrigBlockOrder->slotId);
sup.aTs[i] = (int64_t*)col->pData;
sup.aRowIdx[i] = 0;
}
@ -958,8 +1193,8 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
int32_t nMergedRows = 0;
bool mergeLimitReached = false;
size_t blkPgSz = pgHeaderSz;
int64_t lastPageBufTs = (order->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN;
int64_t currTs = (order->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN;
int64_t lastPageBufTs = (pHandleBlockOrder->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN;
int64_t currTs = (pHandleBlockOrder->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN;
while (nRows < totalRows) {
int32_t minIdx = tMergeTreeGetChosenIndex(pTree);
SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx);
@ -967,7 +1202,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
int32_t bufInc = getPageBufIncForRow(minBlk, minRow, pHandle->pDataBlock->info.rows);
if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) {
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, order->slotId);
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockOrder->slotId);
lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
nMergedRows += pHandle->pDataBlock->info.rows;
@ -977,15 +1212,19 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
mergeLimitReached = true;
if ((lastPageBufTs < pHandle->currMergeLimitTs && order->order == TSDB_ORDER_ASC) ||
(lastPageBufTs > pHandle->currMergeLimitTs && order->order == TSDB_ORDER_DESC)) {
if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockOrder->order == TSDB_ORDER_ASC) ||
(lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockOrder->order == TSDB_ORDER_DESC)) {
pHandle->currMergeLimitTs = lastPageBufTs;
}
break;
}
}
blockDataEnsureCapacity(pHandle->pDataBlock, pHandle->pDataBlock->info.rows + 1);
appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow);
if (!pHandle->bSortByRowId) {
appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow);
} else {
appendToRowIndexDataBlock(pHandle, minBlk, &minRow);
}
blkPgSz += bufInc;
++nRows;
@ -999,14 +1238,14 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
}
if (pHandle->pDataBlock->info.rows > 0) {
if (!mergeLimitReached) {
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, order->slotId);
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockOrder->slotId);
lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
nMergedRows += pHandle->pDataBlock->info.rows;
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
mergeLimitReached = true;
if ((lastPageBufTs < pHandle->currMergeLimitTs && order->order == TSDB_ORDER_ASC) ||
(lastPageBufTs > pHandle->currMergeLimitTs && order->order == TSDB_ORDER_DESC)) {
if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockOrder->order == TSDB_ORDER_ASC) ||
(lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockOrder->order == TSDB_ORDER_DESC)) {
pHandle->currMergeLimitTs = lastPageBufTs;
}
}
@ -1025,7 +1264,6 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
}
static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0);
size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource);
SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES);
@ -1040,7 +1278,8 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, 0);
int32_t szSort = 0;
if (pOrder->order == TSDB_ORDER_ASC) {
SBlockOrderInfo* pOrigOrder = (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 0) : &pHandle->extRowsOrderInfo;
if (pOrigOrder->order == TSDB_ORDER_ASC) {
pHandle->currMergeLimitTs = INT64_MAX;
} else {
pHandle->currMergeLimitTs = INT64_MIN;
@ -1051,10 +1290,10 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
while (1) {
SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param);
if (pBlk != NULL) {
SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrder->slotId);
SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrigOrder->slotId);
int64_t firstRowTs = *(int64_t*)tsCol->pData;
if ((pOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) ||
(pOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) {
if ((pOrigOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) ||
(pOrigOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) {
continue;
}
}
@ -1076,7 +1315,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
tSimpleHashClear(mUidBlk);
int64_t p = taosGetTimestampUs();
code = sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc);
code = sortBlocksToExtSource(pHandle, aBlkSort, aExtSrc);
if (code != TSDB_CODE_SUCCESS) {
tSimpleHashCleanup(mUidBlk);
taosArrayDestroy(aBlkSort);