Merge pull request #13766 from taosdata/szhou/feature/multiwaymerge
feature: explain mulitway merge and merge interval optimization
This commit is contained in:
commit
5507931dee
|
@ -3157,6 +3157,7 @@ static bool loadDataBlockFromTableSeq(STsdbReadHandle* pTsdbReadHandle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// handle data in cache situation
|
// handle data in cache situation
|
||||||
|
// bool tsdbNextDataBlock(tsdbReaderT pHandle, uint64_t uid)
|
||||||
bool tsdbNextDataBlock(tsdbReaderT pHandle) {
|
bool tsdbNextDataBlock(tsdbReaderT pHandle) {
|
||||||
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle;
|
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle;
|
||||||
|
|
||||||
|
|
|
@ -1034,6 +1034,39 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
||||||
EXPLAIN_ROW_END();
|
EXPLAIN_ROW_END();
|
||||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
||||||
|
|
||||||
|
if (EXPLAIN_MODE_ANALYZE == ctx->mode) {
|
||||||
|
// sort key
|
||||||
|
EXPLAIN_ROW_NEW(level + 1, "Merge Key: ");
|
||||||
|
if (pResNode->pExecInfo) {
|
||||||
|
for (int32_t i = 0; i < LIST_LENGTH(pMergeNode->pMergeKeys); ++i) {
|
||||||
|
SOrderByExprNode *ptn = nodesListGetNode(pMergeNode->pMergeKeys, i);
|
||||||
|
EXPLAIN_ROW_APPEND("%s ", nodesGetNameFromColumnNode(ptn->pExpr));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
EXPLAIN_ROW_END();
|
||||||
|
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
||||||
|
|
||||||
|
// sort method
|
||||||
|
EXPLAIN_ROW_NEW(level + 1, "Sort Method: ");
|
||||||
|
|
||||||
|
int32_t nodeNum = taosArrayGetSize(pResNode->pExecInfo);
|
||||||
|
SExplainExecInfo *execInfo = taosArrayGet(pResNode->pExecInfo, 0);
|
||||||
|
SSortExecInfo *pExecInfo = (SSortExecInfo *)execInfo->verboseInfo;
|
||||||
|
EXPLAIN_ROW_APPEND("%s", pExecInfo->sortMethod == SORT_QSORT_T ? "quicksort" : "merge sort");
|
||||||
|
if (pExecInfo->sortBuffer > 1024 * 1024) {
|
||||||
|
EXPLAIN_ROW_APPEND(" Buffers:%.2f Mb", pExecInfo->sortBuffer / (1024 * 1024.0));
|
||||||
|
} else if (pExecInfo->sortBuffer > 1024) {
|
||||||
|
EXPLAIN_ROW_APPEND(" Buffers:%.2f Kb", pExecInfo->sortBuffer / (1024.0));
|
||||||
|
} else {
|
||||||
|
EXPLAIN_ROW_APPEND(" Buffers:%d b", pExecInfo->sortBuffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
EXPLAIN_ROW_APPEND(" loops:%d", pExecInfo->loops);
|
||||||
|
EXPLAIN_ROW_END();
|
||||||
|
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
||||||
|
}
|
||||||
|
|
||||||
if (verbose) {
|
if (verbose) {
|
||||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
|
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,
|
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,
|
||||||
|
|
|
@ -3195,7 +3195,6 @@ _error:
|
||||||
typedef struct SMergeIntervalAggOperatorInfo {
|
typedef struct SMergeIntervalAggOperatorInfo {
|
||||||
SIntervalAggOperatorInfo intervalAggOperatorInfo;
|
SIntervalAggOperatorInfo intervalAggOperatorInfo;
|
||||||
|
|
||||||
SHashObj* groupIntervalHash;
|
|
||||||
bool hasGroupId;
|
bool hasGroupId;
|
||||||
uint64_t groupId;
|
uint64_t groupId;
|
||||||
SSDataBlock* prefetchedBlock;
|
SSDataBlock* prefetchedBlock;
|
||||||
|
@ -3204,39 +3203,24 @@ typedef struct SMergeIntervalAggOperatorInfo {
|
||||||
|
|
||||||
void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) {
|
void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
|
SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
|
||||||
taosHashCleanup(miaInfo->groupIntervalHash);
|
|
||||||
destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput);
|
destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock,
|
static int32_t outputMergeIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock, TSKEY wstartTs) {
|
||||||
STimeWindow* newWin) {
|
|
||||||
SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
|
SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
|
||||||
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
|
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
|
||||||
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
|
||||||
bool ascScan = (iaInfo->order == TSDB_ORDER_ASC);
|
bool ascScan = (iaInfo->order == TSDB_ORDER_ASC);
|
||||||
|
|
||||||
STimeWindow* prevWin = taosHashGet(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId));
|
SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &wstartTs, TSDB_KEYSIZE, tableGroupId);
|
||||||
if (prevWin == NULL) {
|
SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf,
|
||||||
taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), newWin, sizeof(STimeWindow));
|
GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
|
||||||
return 0;
|
ASSERT(p1 != NULL);
|
||||||
}
|
|
||||||
|
|
||||||
if (newWin == NULL || (ascScan && newWin->skey > prevWin->ekey || (!ascScan) && newWin->skey < prevWin->ekey) ) {
|
finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, iaInfo->binfo.pCtx, pOperatorInfo->pExpr,
|
||||||
SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &prevWin->skey, TSDB_KEYSIZE, tableGroupId);
|
pOperatorInfo->numOfExprs, iaInfo->binfo.rowCellInfoOffset, pResultBlock,
|
||||||
SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf,
|
pTaskInfo);
|
||||||
GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
|
taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
|
||||||
ASSERT(p1 != NULL);
|
|
||||||
|
|
||||||
finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, iaInfo->binfo.pCtx, pOperatorInfo->pExpr,
|
|
||||||
pOperatorInfo->numOfExprs, iaInfo->binfo.rowCellInfoOffset, pResultBlock,
|
|
||||||
pTaskInfo);
|
|
||||||
taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
|
|
||||||
if (newWin == NULL) {
|
|
||||||
taosHashRemove(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId));
|
|
||||||
} else {
|
|
||||||
taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), newWin, sizeof(STimeWindow));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -3252,13 +3236,14 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
|
||||||
int32_t numOfOutput = pOperatorInfo->numOfExprs;
|
int32_t numOfOutput = pOperatorInfo->numOfExprs;
|
||||||
int64_t* tsCols = extractTsCol(pBlock, iaInfo);
|
int64_t* tsCols = extractTsCol(pBlock, iaInfo);
|
||||||
uint64_t tableGroupId = pBlock->info.groupId;
|
uint64_t tableGroupId = pBlock->info.groupId;
|
||||||
bool ascScan = (iaInfo->order == TSDB_ORDER_ASC);
|
|
||||||
TSKEY blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
|
TSKEY blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
|
||||||
SResultRow* pResult = NULL;
|
SResultRow* pResult = NULL;
|
||||||
|
|
||||||
STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
|
STimeWindow win;
|
||||||
iaInfo->interval.precision, &iaInfo->win);
|
win.skey = blockStartTs;
|
||||||
|
win.ekey = taosTimeAdd(win.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit, iaInfo->interval.precision) - 1;
|
||||||
|
|
||||||
|
//TODO: remove the hash table usage (groupid + winkey => result row position)
|
||||||
int32_t ret =
|
int32_t ret =
|
||||||
setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, iaInfo->binfo.pCtx,
|
setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, iaInfo->binfo.pCtx,
|
||||||
numOfOutput, iaInfo->binfo.rowCellInfoOffset, &iaInfo->aggSup, pTaskInfo);
|
numOfOutput, iaInfo->binfo.rowCellInfoOffset, &iaInfo->aggSup, pTaskInfo);
|
||||||
|
@ -3266,72 +3251,39 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
|
||||||
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
TSKEY ekey = ascScan ? win.ekey : win.skey;
|
TSKEY currTs = blockStartTs;
|
||||||
int32_t forwardRows =
|
TSKEY currPos = startPos;
|
||||||
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->order);
|
STimeWindow currWin = win;
|
||||||
ASSERT(forwardRows > 0);
|
|
||||||
|
|
||||||
// prev time window not interpolation yet.
|
|
||||||
if (iaInfo->timeWindowInterpo) {
|
|
||||||
SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult);
|
|
||||||
doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
|
|
||||||
|
|
||||||
// restore current time window
|
|
||||||
ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
|
|
||||||
iaInfo->binfo.pCtx, numOfOutput, iaInfo->binfo.rowCellInfoOffset, &iaInfo->aggSup,
|
|
||||||
pTaskInfo);
|
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
|
||||||
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
|
||||||
}
|
|
||||||
|
|
||||||
// window start key interpolation
|
|
||||||
doWindowBorderInterpolation(iaInfo, pBlock, numOfOutput, iaInfo->binfo.pCtx, pResult, &win, startPos, forwardRows);
|
|
||||||
}
|
|
||||||
|
|
||||||
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, true);
|
|
||||||
doApplyFunctions(pTaskInfo, iaInfo->binfo.pCtx, &win, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols,
|
|
||||||
pBlock->info.rows, numOfOutput, iaInfo->order);
|
|
||||||
doCloseWindow(pResultRowInfo, iaInfo, pResult);
|
|
||||||
|
|
||||||
// output previous interval results after this interval (&win) is closed
|
|
||||||
outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &win);
|
|
||||||
|
|
||||||
STimeWindow nextWin = win;
|
|
||||||
while (1) {
|
while (1) {
|
||||||
int32_t prevEndPos = forwardRows - 1 + startPos;
|
++currPos;
|
||||||
startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, iaInfo->order);
|
if (currPos >= pBlock->info.rows) {
|
||||||
if (startPos < 0) {
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
if (tsCols[currPos] == currTs) {
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
|
||||||
|
doApplyFunctions(pTaskInfo, iaInfo->binfo.pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos,
|
||||||
|
currPos - startPos, tsCols, pBlock->info.rows, numOfOutput, iaInfo->order);
|
||||||
|
|
||||||
// null data, failed to allocate more memory buffer
|
outputMergeIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs);
|
||||||
int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
|
|
||||||
iaInfo->binfo.pCtx, numOfOutput, iaInfo->binfo.rowCellInfoOffset,
|
currTs = tsCols[currPos];
|
||||||
&iaInfo->aggSup, pTaskInfo);
|
currWin.skey = currTs;
|
||||||
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
currWin.ekey = taosTimeAdd(currWin.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit, iaInfo->interval.precision) - 1;
|
||||||
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
startPos = currPos;
|
||||||
|
ret = setTimeWindowOutputBuf(pResultRowInfo, &currWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, iaInfo->binfo.pCtx,
|
||||||
|
numOfOutput, iaInfo->binfo.rowCellInfoOffset, &iaInfo->aggSup, pTaskInfo);
|
||||||
|
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
|
||||||
|
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ekey = ascScan ? nextWin.ekey : nextWin.skey;
|
|
||||||
forwardRows =
|
|
||||||
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->order);
|
|
||||||
|
|
||||||
// window start(end) key interpolation
|
|
||||||
doWindowBorderInterpolation(iaInfo, pBlock, numOfOutput, iaInfo->binfo.pCtx, pResult, &nextWin, startPos,
|
|
||||||
forwardRows);
|
|
||||||
|
|
||||||
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, true);
|
|
||||||
doApplyFunctions(pTaskInfo, iaInfo->binfo.pCtx, &nextWin, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
|
|
||||||
tsCols, pBlock->info.rows, numOfOutput, iaInfo->order);
|
|
||||||
doCloseWindow(pResultRowInfo, iaInfo, pResult);
|
|
||||||
|
|
||||||
// output previous interval results after this interval (&nextWin) is closed
|
|
||||||
outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &nextWin);
|
|
||||||
}
|
}
|
||||||
|
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
|
||||||
|
doApplyFunctions(pTaskInfo, iaInfo->binfo.pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos,
|
||||||
|
currPos - startPos, tsCols, pBlock->info.rows, numOfOutput, iaInfo->order);
|
||||||
|
|
||||||
if (iaInfo->timeWindowInterpo) {
|
outputMergeIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs);
|
||||||
saveDataBlockLastRow(iaInfo->pPrevValues, pBlock, iaInfo->pInterpCols);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
|
static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
|
@ -3385,13 +3337,6 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pRes->info.groupId = miaInfo->groupId;
|
pRes->info.groupId = miaInfo->groupId;
|
||||||
} else {
|
|
||||||
void* p = taosHashIterate(miaInfo->groupIntervalHash, NULL);
|
|
||||||
if (p != NULL) {
|
|
||||||
size_t len = 0;
|
|
||||||
uint64_t* pKey = taosHashGetKey(p, &len);
|
|
||||||
outputPrevIntervalResult(pOperator, *pKey, pRes, NULL);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRes->info.rows == 0) {
|
if (pRes->info.rows == 0) {
|
||||||
|
@ -3421,7 +3366,6 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
|
||||||
iaInfo->execModel = pTaskInfo->execModel;
|
iaInfo->execModel = pTaskInfo->execModel;
|
||||||
|
|
||||||
iaInfo->primaryTsIndex = primaryTsSlotId;
|
iaInfo->primaryTsIndex = primaryTsSlotId;
|
||||||
miaInfo->groupIntervalHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK);
|
|
||||||
|
|
||||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||||
initResultSizeInfo(pOperator, 4096);
|
initResultSizeInfo(pOperator, 4096);
|
||||||
|
|
Loading…
Reference in New Issue