Merge pull request #22111 from taosdata/szhou/tms-dev-2

enhance: table merge scan optimization
This commit is contained in:
dapan1121 2023-07-20 16:28:29 +08:00 committed by GitHub
commit fde8eabfd7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 484 additions and 265 deletions

View File

@ -632,7 +632,10 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) {
pStart += colSize;
}
} else {
if (dataSize != 0) {
// ubsan reports error if pCol->pData==NULL && dataSize==0
memcpy(pStart, pCol->pData, dataSize);
}
pStart += dataSize;
}
}
@ -684,8 +687,10 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
return TSDB_CODE_FAILED;
}
}
if (colLength != 0) {
// ubsan reports error if colLength==0 && pCol->pData == 0
memcpy(pCol->pData, pStart, colLength);
}
pStart += colLength;
}

View File

@ -232,19 +232,20 @@ typedef struct STableMergeScanInfo {
int32_t tableEndIndex;
bool hasGroupId;
uint64_t groupId;
SArray* queryConds; // array of queryTableDataCond
STableScanBase base;
int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort
SArray* pSortInfo;
SSortHandle* pSortHandle;
SSDataBlock* pSortInputBlock;
SSDataBlock* pReaderBlock;
int64_t startTs; // sort start time
SArray* sortSourceParams;
SLimitInfo limitInfo;
int64_t numOfRows;
SScanInfo scanInfo;
int32_t scanTimes;
int32_t readIdx;
SSDataBlock* pResBlock;
SSampleExecInfo sample; // sample execution info
SSortExecInfo sortExecInfo;

View File

@ -26,6 +26,7 @@ extern "C" {
enum {
SORT_MULTISOURCE_MERGE = 0x1,
SORT_SINGLESOURCE_SORT = 0x2,
SORT_BLOCK_TS_MERGE = 0x3
};
typedef struct SMultiMergeSource {
@ -53,6 +54,12 @@ typedef struct SMsortComparParam {
int32_t numOfSources;
SArray* orderInfo; // SArray<SBlockOrderInfo>
bool cmpGroupId;
int32_t sortType;
// the following field to speed up when sortType == SORT_BLOCK_TS_MERGE
int32_t tsSlotId;
int32_t order;
__compar_fn_t cmpFn;
} SMsortComparParam;
typedef struct SSortHandle SSortHandle;
@ -70,8 +77,8 @@ typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void*
* @return
*/
SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, int32_t type, int32_t pageSize, int32_t numOfPages,
SSDataBlock* pBlock, const char* idstr, uint64_t maxRows, uint32_t maxTupleLength,
uint32_t sortBufSize);
SSDataBlock* pBlock, const char* idstr, uint64_t pqMaxRows, uint32_t pqMaxTupleLength,
uint32_t pqSortBufSize);
void tsortSetForceUsePQSort(SSortHandle* pHandle);
@ -110,6 +117,10 @@ int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetc
*/
int32_t tsortSetComparFp(SSortHandle* pHandle, _sort_merge_compar_fn_t fp);
/**
*
*/
void tsortSetMergeLimit(SSortHandle* pHandle, int64_t mergeLimit);
/**
*
*/

View File

@ -13,8 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// clang-format off
#include "executorInt.h"
#include "filter.h"
#include "function.h"
@ -55,8 +53,7 @@ typedef struct STableMergeScanSortSourceParam {
SOperatorInfo* pOperator;
int32_t readerIdx;
uint64_t uid;
SSDataBlock* inputBlock;
STsdbReader* dataReader;
STsdbReader* reader;
} STableMergeScanSortSourceParam;
typedef struct STableCountScanOperatorInfo {
@ -2734,32 +2731,17 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
int32_t readIdx = source->readerIdx;
SSDataBlock* pBlock = source->inputBlock;
SSDataBlock* pBlock = pInfo->pReaderBlock;
int32_t code = 0;
SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx);
int64_t st = taosGetTimestampUs();
void* p = tableListGetInfo(pInfo->base.pTableListInfo, readIdx + pInfo->tableStartIndex);
SReadHandle* pHandle = &pInfo->base.readHandle;
if (NULL == source->dataReader) {
code = pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, (void**)&source->dataReader, GET_TASKID(pTaskInfo), false, NULL);
if (code != 0) {
T_LONG_JMP(pTaskInfo->env, code);
}
}
pInfo->base.dataReader = source->dataReader;
STsdbReader* reader = pInfo->base.dataReader;
bool hasNext = false;
qTrace("tsdb/read-table-data: %p, enter next reader", reader);
STsdbReader* reader = pInfo->base.dataReader;
while (true) {
code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext);
if (code != 0) {
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
pInfo->base.dataReader = NULL;
T_LONG_JMP(pTaskInfo->env, code);
}
@ -2769,7 +2751,6 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
if (isTaskKilled(pTaskInfo)) {
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
pInfo->base.dataReader = NULL;
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
@ -2779,12 +2760,6 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
continue;
}
if (pQueryCond->order == TSDB_ORDER_ASC) {
pQueryCond->twindows.skey = pBlock->info.window.ekey + 1;
} else {
pQueryCond->twindows.ekey = pBlock->info.window.skey - 1;
}
uint32_t status = 0;
code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
// code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
@ -2806,16 +2781,9 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
pOperator->resultInfo.totalRows += pBlock->info.rows;
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
qTrace("tsdb/read-table-data: %p, close reader", reader);
pInfo->base.dataReader = NULL;
return pBlock;
}
pAPI->tsdReader.tsdReaderClose(source->dataReader);
source->dataReader = NULL;
pInfo->base.dataReader = NULL;
blockDataDestroy(source->inputBlock);
source->inputBlock = NULL;
return NULL;
}
@ -2851,6 +2819,8 @@ int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond*
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SReadHandle* pHandle = &pInfo->base.readHandle;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
{
size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
@ -2867,53 +2837,29 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
int32_t tableStartIdx = pInfo->tableStartIndex;
int32_t tableEndIdx = pInfo->tableEndIndex;
pInfo->base.dataReader = NULL;
// todo the total available buffer should be determined by total capacity of buffer of this task.
// the additional one is reserved for merge result
// pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
int32_t kWay = (TSDB_MAX_BYTES_PER_ROW * 2) / (pInfo->pResBlock->info.rowSize);
if (kWay >= 128) {
kWay = 128;
} else if (kWay <= 2) {
kWay = 2;
} else {
int i = 2;
while (i * 2 <= kWay) i = i * 2;
kWay = i;
}
pInfo->sortBufSize = pInfo->bufPageSize * (kWay + 1);
pInfo->sortBufSize = 2048 * pInfo->bufPageSize;
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
int64_t mergeLimit = -1;
if (pInfo->limitInfo.limit.limit != -1 || pInfo->limitInfo.limit.offset != -1) {
mergeLimit = pInfo->limitInfo.limit.limit + pInfo->limitInfo.limit.offset;
}
tsortSetMergeLimit(pInfo->pSortHandle, mergeLimit);
tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL);
// one table has one data block
int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
pInfo->queryConds = taosArrayInit(numOfTable, sizeof(SQueryTableDataCond));
for (int32_t i = 0; i < numOfTable; ++i) {
STableMergeScanSortSourceParam param = {0};
param.readerIdx = i;
param.pOperator = pOperator;
param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx);
pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock, (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, NULL);
taosArrayPush(pInfo->sortSourceParams, &param);
SQueryTableDataCond cond;
dumpQueryTableCond(&pInfo->base.cond, &cond);
taosArrayPush(pInfo->queryConds, &cond);
}
for (int32_t i = 0; i < numOfTable; ++i) {
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
ps->param = param;
ps->param = &param;
ps->onlyRef = true;
tsortAddSource(pInfo->pSortHandle, ps);
}
int32_t code = tsortOpen(pInfo->pSortHandle);
@ -2929,8 +2875,6 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
int32_t numOfTable = taosArrayGetSize(pInfo->queryConds);
SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
@ -2938,24 +2882,14 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;
for (int32_t i = 0; i < numOfTable; ++i) {
STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
blockDataDestroy(param->inputBlock);
pAPI->tsdReader.tsdReaderClose(param->dataReader);
param->dataReader = NULL;
if (pInfo->base.dataReader != NULL) {
pAPI->tsdReader.tsdReaderClose(pInfo->base.dataReader);
pInfo->base.dataReader = NULL;
}
taosArrayClear(pInfo->sortSourceParams);
tsortDestroySortHandle(pInfo->pSortHandle);
pInfo->pSortHandle = NULL;
for (int32_t i = 0; i < taosArrayGetSize(pInfo->queryConds); i++) {
SQueryTableDataCond* cond = taosArrayGet(pInfo->queryConds, i);
taosMemoryFree(cond->colList);
}
taosArrayDestroy(pInfo->queryConds);
pInfo->queryConds = NULL;
resetLimitInfoForNextGroup(&pInfo->limitInfo);
return TSDB_CODE_SUCCESS;
}
@ -2968,9 +2902,10 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
blockDataCleanup(pResBlock);
STupleHandle* pTupleHandle = NULL;
while (1) {
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
while (1) {
pTupleHandle = tsortNextTuple(pHandle);
if (pTupleHandle == NULL) {
break;
}
@ -2989,7 +2924,10 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
qDebug("%s get sorted row block, rows:%" PRId64 ", limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
pInfo->limitInfo.numOfOutputRows);
if (pTupleHandle == NULL || limitReached || pResBlock->info.rows > 0) {
break;
}
}
return (pResBlock->info.rows > 0) ? pResBlock : NULL;
}
@ -3053,14 +2991,7 @@ void destroyTableMergeScanOperatorInfo(void* param) {
STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
int32_t numOfTable = taosArrayGetSize(pTableScanInfo->queryConds);
for (int32_t i = 0; i < numOfTable; i++) {
STableMergeScanSortSourceParam* p = taosArrayGet(pTableScanInfo->sortSourceParams, i);
blockDataDestroy(p->inputBlock);
pTableScanInfo->base.readerAPI.tsdReaderClose(p->dataReader);
p->dataReader = NULL;
}
int32_t numOfTable = taosArrayGetSize(pTableScanInfo->sortSourceParams);
pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
pTableScanInfo->base.dataReader = NULL;
@ -3069,16 +3000,11 @@ void destroyTableMergeScanOperatorInfo(void* param) {
tsortDestroySortHandle(pTableScanInfo->pSortHandle);
pTableScanInfo->pSortHandle = NULL;
for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) {
SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i);
taosMemoryFree(pCond->colList);
}
taosArrayDestroy(pTableScanInfo->queryConds);
destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI);
pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);
pTableScanInfo->pReaderBlock = blockDataDestroy(pTableScanInfo->pReaderBlock);
taosArrayDestroy(pTableScanInfo->pSortInfo);
taosMemoryFreeClear(param);
@ -3140,6 +3066,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->base.scanFlag = MAIN_SCAN;
pInfo->base.readHandle = *readHandle;
pInfo->readIdx = -1;
pInfo->base.limitInfo.limit.limit = -1;
pInfo->base.limitInfo.slimit.limit = -1;
pInfo->base.pTableListInfo = pTableListInfo;
@ -3162,6 +3090,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
int32_t rowSize = pInfo->pResBlock->info.rowSize;
uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock);
pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
@ -3571,5 +3501,3 @@ static void destoryTableCountScanOperator(void* param) {
taosArrayDestroy(pTableCountScanInfo->stbUidList);
taosMemoryFreeClear(param);
}
// clang-format on

View File

@ -24,6 +24,7 @@
#include "tpagedbuf.h"
#include "tsort.h"
#include "tutil.h"
#include "tsimplehash.h"
struct STupleHandle {
SSDataBlock* pBlock;
@ -42,13 +43,15 @@ struct SSortHandle {
int64_t startTs;
uint64_t totalElapsed;
uint64_t maxRows;
uint32_t maxTupleLength;
uint32_t sortBufSize;
uint64_t pqMaxRows;
uint32_t pqMaxTupleLength;
uint32_t pqSortBufSize;
bool forceUsePQSort;
BoundedQueue* pBoundedQueue;
uint32_t tmpRowIdx;
int64_t mergeLimit;
int32_t sourceId;
SSDataBlock* pDataBlock;
SMsortComparParam cmpParam;
@ -173,8 +176,8 @@ void destroyTuple(void* t) {
* @return
*/
SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, int32_t numOfPages,
SSDataBlock* pBlock, const char* idstr, uint64_t maxRows, uint32_t maxTupleLength,
uint32_t sortBufSize) {
SSDataBlock* pBlock, const char* idstr, uint64_t pqMaxRows, uint32_t pqMaxTupleLength,
uint32_t pqSortBufSize) {
SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle));
pSortHandle->type = type;
@ -183,10 +186,10 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page
pSortHandle->pSortInfo = pSortInfo;
pSortHandle->loops = 0;
pSortHandle->maxTupleLength = maxTupleLength;
if (maxRows != 0) {
pSortHandle->sortBufSize = sortBufSize;
pSortHandle->maxRows = maxRows;
pSortHandle->pqMaxTupleLength = pqMaxTupleLength;
if (pqMaxRows != 0) {
pSortHandle->pqSortBufSize = pqSortBufSize;
pSortHandle->pqMaxRows = pqMaxRows;
}
pSortHandle->forceUsePQSort = false;
@ -194,10 +197,18 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page
pSortHandle->pDataBlock = createOneDataBlock(pBlock, false);
}
pSortHandle->mergeLimit = -1;
pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES);
pSortHandle->cmpParam.orderInfo = pSortInfo;
pSortHandle->cmpParam.cmpGroupId = false;
pSortHandle->cmpParam.sortType = type;
if (type == SORT_BLOCK_TS_MERGE) {
SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pSortInfo, 0);
pSortHandle->cmpParam.tsSlotId = pOrder->slotId;
pSortHandle->cmpParam.order = pOrder->order;
pSortHandle->cmpParam.cmpFn = (pOrder->order == TSDB_ORDER_ASC) ? compareInt64Val : compareInt64ValDesc;
}
tsortSetComparFp(pSortHandle, msortComparFn);
if (idstr != NULL) {
@ -469,11 +480,14 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
pSource->pageIndex++;
if (pSource->pageIndex >= taosArrayGetSize(pSource->pageIdList)) {
qDebug("adjust merge tree. %d source completed %d", *numOfCompleted, pSource->pageIndex);
(*numOfCompleted) += 1;
pSource->src.rowIndex = -1;
pSource->pageIndex = -1;
pSource->src.pBlock = blockDataDestroy(pSource->src.pBlock);
} else {
if (pSource->pageIndex % 512 == 0) qDebug("begin source %p page %d", pSource, pSource->pageIndex);
int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex);
void* pPage = getBufPage(pHandle->pBuf, *pPgId);
@ -486,7 +500,6 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT
if (code != TSDB_CODE_SUCCESS) {
return code;
}
releaseBufPage(pHandle->pBuf, pPage);
}
} else {
@ -497,6 +510,7 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT
if (pSource->src.pBlock == NULL) {
(*numOfCompleted) += 1;
pSource->src.rowIndex = -1;
qDebug("adjust merge tree. %d source completed", *numOfCompleted);
}
}
}
@ -577,21 +591,30 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) {
}
}
if (pParam->sortType == SORT_BLOCK_TS_MERGE) {
SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pParam->tsSlotId);
SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pParam->tsSlotId);
int64_t* left1 = (int64_t*)(pLeftColInfoData->pData) + pLeftSource->src.rowIndex;
int64_t* right1 = (int64_t*)(pRightColInfoData->pData) + pRightSource->src.rowIndex;
int ret = pParam->cmpFn(left1, right1);
return ret;
} else {
for (int32_t i = 0; i < pInfo->size; ++i) {
SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i);
SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId);
SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
bool leftNull = false;
if (pLeftColInfoData->hasNull) {
if (pLeftBlock->pBlockAgg == NULL) {
leftNull = colDataIsNull_s(pLeftColInfoData, pLeftSource->src.rowIndex);
} else {
leftNull =
colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg[i]);
leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex,
pLeftBlock->pBlockAgg[i]);
}
}
SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
bool rightNull = false;
if (pRightColInfoData->hasNull) {
if (pRightBlock->pBlockAgg == NULL) {
@ -626,6 +649,7 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) {
return ret;
}
}
}
return 0;
}
@ -668,6 +692,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
// Only *numOfInputSources* can be loaded into buffer to perform the external sort.
for (int32_t i = 0; i < sortGroup; ++i) {
qDebug("internal merge sort pass %d group %d. num input sources %d ", t, i, numOfInputSources);
pHandle->sourceId += 1;
int32_t end = (i + 1) * numOfInputSources - 1;
@ -690,6 +715,8 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
return code;
}
int nMergedRows = 0;
SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t));
while (1) {
if (tsortIsClosed(pHandle)) {
@ -720,8 +747,12 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
setBufPageDirty(pPage, true);
releaseBufPage(pHandle->pBuf, pPage);
nMergedRows += pDataBlock->info.rows;
blockDataCleanup(pDataBlock);
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
break;
}
}
sortComparCleanup(&pHandle->cmpParam);
@ -769,11 +800,240 @@ int32_t getProperSortPageSize(size_t rowSize, uint32_t numOfCols) {
return pgSize;
}
static int32_t createInitialSources(SSortHandle* pHandle) {
size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
int32_t code = 0;
static int32_t createPageBuf(SSortHandle* pHandle) {
if (pHandle->pBuf == NULL) {
if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_DISKSPACE;
qError("create page buf failed since %s, tempDir:%s", terrstr(), tsTempDir);
return terrno;
}
int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize,
"tableBlocksBuf", tsTempDir);
dBufSetPrintInfo(pHandle->pBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
return 0;
}
typedef struct SBlkMergeSupport {
int64_t** aTs;
int32_t* aRowIdx;
int32_t order;
} SBlkMergeSupport;
static int32_t blockCompareTsFn(const void* pLeft, const void* pRight, void* param) {
int32_t left = *(int32_t*)pLeft;
int32_t right = *(int32_t*)pRight;
SBlkMergeSupport* pSup = (SBlkMergeSupport*)param;
if (pSup->aRowIdx[left] == -1) {
return 1;
} else if (pSup->aRowIdx[right] == -1) {
return -1;
}
int64_t leftTs = pSup->aTs[left][pSup->aRowIdx[left]];
int64_t rightTs = pSup->aTs[right][pSup->aRowIdx[right]];
int32_t ret = leftTs>rightTs ? 1 : ((leftTs < rightTs) ? -1 : 0);
if (pSup->order == TSDB_ORDER_DESC) {
ret = -1 * ret;
}
return ret;
}
static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk, SArray* aPgId) {
int32_t pageId = -1;
void* pPage = getNewBufPage(pHandle->pBuf, &pageId);
taosArrayPush(aPgId, &pageId);
int32_t size = blockDataGetSize(blk) + sizeof(int32_t) + taosArrayGetSize(blk->pDataBlock) * sizeof(int32_t);
ASSERT(size <= getBufPageSize(pHandle->pBuf));
blockDataToBuf(pPage, blk);
setBufPageDirty(pPage, true);
releaseBufPage(pHandle->pBuf, pPage);
return 0;
}
static int32_t getPageBufIncForRow(SSDataBlock* blk, int32_t row, int32_t rowIdxInPage) {
int sz = 0;
int numCols = taosArrayGetSize(blk->pDataBlock);
if (!blk->info.hasVarCol) {
sz += numCols * ((rowIdxInPage & 0x7) == 0 ? 1: 0);
sz += blockDataGetRowSize(blk);
} else {
for (int32_t i = 0; i < numCols; ++i) {
SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(blk->pDataBlock, i);
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
if (pColInfoData->varmeta.offset[row] != -1) {
char* p = colDataGetData(pColInfoData, row);
sz += varDataTLen(p);
}
sz += sizeof(pColInfoData->varmeta.offset[0]);
} else {
sz += pColInfoData->info.bytes;
if (((rowIdxInPage) & 0x07) == 0) {
sz += 1; // bitmap
}
}
}
}
return sz;
}
static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockOrderInfo* order, SArray* aExtSrc) {
int pgHeaderSz = sizeof(int32_t) + sizeof(int32_t) * taosArrayGetSize(pHandle->pDataBlock->pDataBlock);
int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pgHeaderSz);
blockDataEnsureCapacity(pHandle->pDataBlock, rowCap);
blockDataCleanup(pHandle->pDataBlock);
int32_t numBlks = taosArrayGetSize(aBlk);
SBlkMergeSupport sup;
sup.aRowIdx = taosMemoryCalloc(numBlks, sizeof(int32_t));
sup.aTs = taosMemoryCalloc(numBlks, sizeof(int64_t*));
sup.order = order->order;
for (int i = 0; i < numBlks; ++i) {
SSDataBlock* blk = taosArrayGetP(aBlk, i);
SColumnInfoData* col = taosArrayGet(blk->pDataBlock, order->slotId);
sup.aTs[i] = (int64_t*)col->pData;
sup.aRowIdx[i] = 0;
}
int32_t totalRows = 0;
for (int i = 0; i < numBlks; ++i) {
SSDataBlock* blk = taosArrayGetP(aBlk, i);
totalRows += blk->info.rows;
}
SArray* aPgId = taosArrayInit(8, sizeof(int32_t));
SMultiwayMergeTreeInfo* pTree = NULL;
tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, blockCompareTsFn);
int32_t nRows = 0;
int32_t nMergedRows = 0;
bool mergeLimitReached = false;
size_t blkPgSz = pgHeaderSz;
while (nRows < totalRows) {
int32_t minIdx = tMergeTreeGetChosenIndex(pTree);
SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx);
int32_t minRow = sup.aRowIdx[minIdx];
int32_t bufInc = getPageBufIncForRow(minBlk, minRow, pHandle->pDataBlock->info.rows);
if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) {
appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
nMergedRows += pHandle->pDataBlock->info.rows;
blockDataCleanup(pHandle->pDataBlock);
blkPgSz = pgHeaderSz;
bufInc = getPageBufIncForRow(minBlk, minRow, 0);
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
mergeLimitReached = true;
break;
}
}
blockDataEnsureCapacity(pHandle->pDataBlock, pHandle->pDataBlock->info.rows + 1);
appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow);
blkPgSz += bufInc;
++nRows;
if (sup.aRowIdx[minIdx] == minBlk->info.rows - 1) {
sup.aRowIdx[minIdx] = -1;
} else {
++sup.aRowIdx[minIdx];
}
tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
}
if (pHandle->pDataBlock->info.rows > 0) {
if (!mergeLimitReached) {
appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
nMergedRows += pHandle->pDataBlock->info.rows;
}
blockDataCleanup(pHandle->pDataBlock);
}
SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false);
doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId);
taosMemoryFree(sup.aRowIdx);
taosMemoryFree(sup.aTs);
tMergeTreeDestroy(&pTree);
return 0;
}
static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0);
size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource);
SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES);
size_t maxBufSize = pHandle->numOfPages * pHandle->pageSize;
createPageBuf(pHandle);
SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, 0);
int32_t szSort = 0;
SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES);
SSHashObj* mUidBlk = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
while (1) {
SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param);
if (pBlk != NULL) {
szSort += blockDataGetSize(pBlk);
void* ppBlk = tSimpleHashGet(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid));
if (ppBlk != NULL) {
SSDataBlock* tBlk = *(SSDataBlock**)(ppBlk);
blockDataMerge(tBlk, pBlk);
} else {
SSDataBlock* tBlk = createOneDataBlock(pBlk, true);
tSimpleHashPut(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid), &tBlk, POINTER_BYTES);
taosArrayPush(aBlkSort, &tBlk);
}
}
if ((pBlk != NULL && szSort > maxBufSize) || (pBlk == NULL && szSort > 0)) {
tSimpleHashClear(mUidBlk);
int64_t p = taosGetTimestampUs();
sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc);
int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el;
for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
blockDataDestroy(taosArrayGetP(aBlkSort, i));
}
taosArrayClear(aBlkSort);
szSort = 0;
qDebug("source %zu created", taosArrayGetSize(aExtSrc));
}
if (pBlk == NULL) {
break;
};
}
tSimpleHashCleanup(mUidBlk);
taosArrayDestroy(aBlkSort);
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
taosArrayAddAll(pHandle->pOrderedSource, aExtSrc);
taosArrayDestroy(aExtSrc);
pHandle->type = SORT_SINGLESOURCE_SORT;
return 0;
}
static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
int32_t code = 0;
size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0);
SSortSource* source = *pSource;
*pSource = NULL;
@ -833,7 +1093,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el;
if (pHandle->maxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->maxRows);
if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows);
code = doAddToBuf(pHandle->pDataBlock, pHandle);
if (code != TSDB_CODE_SUCCESS) {
return code;
@ -858,7 +1118,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
return code;
}
if (pHandle->maxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->maxRows);
if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows);
int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el;
@ -875,8 +1135,18 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
code = doAddToBuf(pHandle->pDataBlock, pHandle);
}
}
return code;
}
static int32_t createInitialSources(SSortHandle* pHandle) {
int32_t code = 0;
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
code = createBlocksQuickSortInitialSources(pHandle);
} else if (pHandle->type == SORT_BLOCK_TS_MERGE) {
code = createBlocksMergeSortInitialSources(pHandle);
}
qDebug("%zu sources created", taosArrayGetSize(pHandle->pOrderedSource));
return code;
}
@ -923,6 +1193,10 @@ void tsortSetClosed(SSortHandle* pHandle) {
atomic_store_8(&pHandle->closed, 2);
}
void tsortSetMergeLimit(SSortHandle* pHandle, int64_t mergeLimit) {
pHandle->mergeLimit = mergeLimit;
}
int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetchFp, void (*fp)(SSDataBlock*, void*),
void* param) {
pHandle->fetchfp = fetchFp;
@ -1002,8 +1276,8 @@ void tsortSetForceUsePQSort(SSortHandle* pHandle) {
static bool tsortIsPQSortApplicable(SSortHandle* pHandle) {
if (pHandle->type != SORT_SINGLESOURCE_SORT) return false;
if (tsortIsForceUsePQSort(pHandle)) return true;
uint64_t maxRowsFitInMemory = pHandle->sortBufSize / (pHandle->maxTupleLength + sizeof(char*));
return maxRowsFitInMemory > pHandle->maxRows;
uint64_t maxRowsFitInMemory = pHandle->pqSortBufSize / (pHandle->pqMaxTupleLength + sizeof(char*));
return maxRowsFitInMemory > pHandle->pqMaxRows;
}
static bool tsortPQCompFn(void* a, void* b, void* param) {
@ -1049,7 +1323,7 @@ static int32_t tupleComparFn(const void* pLeft, const void* pRight, void* param)
}
static int32_t tsortOpenForPQSort(SSortHandle* pHandle) {
pHandle->pBoundedQueue = createBoundedQueue(pHandle->maxRows, tsortPQCompFn, destroyTuple, pHandle);
pHandle->pBoundedQueue = createBoundedQueue(pHandle->pqMaxRows, tsortPQCompFn, destroyTuple, pHandle);
if (NULL == pHandle->pBoundedQueue) return TSDB_CODE_OUT_OF_MEMORY;
tsortSetComparFp(pHandle, tupleComparFn);