From 253cd955a1db5a73dfcc3472e24c384cef04ab48 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Mon, 13 Jun 2022 09:56:39 +0800 Subject: [PATCH 1/2] feat: add explain analyze to multiway merge operator --- source/libs/command/src/explain.c | 33 +++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 347fb326bb..14d4adc98b 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -878,6 +878,39 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT); EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); + + if (EXPLAIN_MODE_ANALYZE == ctx->mode) { + // sort key + EXPLAIN_ROW_NEW(level + 1, "Sort Key: "); + if (pResNode->pExecInfo) { + for (int32_t i = 0; i < LIST_LENGTH(pMergeNode->pMergeKeys); ++i) { + SOrderByExprNode *ptn = nodesListGetNode(pMergeNode->pMergeKeys, i); + EXPLAIN_ROW_APPEND("%s ", nodesGetNameFromColumnNode(ptn->pExpr)); + } + } + + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); + + // sort method + EXPLAIN_ROW_NEW(level + 1, "Sort Method: "); + + int32_t nodeNum = taosArrayGetSize(pResNode->pExecInfo); + SExplainExecInfo *execInfo = taosArrayGet(pResNode->pExecInfo, 0); + SSortExecInfo *pExecInfo = (SSortExecInfo *)execInfo->verboseInfo; + EXPLAIN_ROW_APPEND("%s", pExecInfo->sortMethod == SORT_QSORT_T ? "quicksort" : "merge sort"); + if (pExecInfo->sortBuffer > 1024 * 1024) { + EXPLAIN_ROW_APPEND(" Buffers:%.2f Mb", pExecInfo->sortBuffer / (1024 * 1024.0)); + } else if (pExecInfo->sortBuffer > 1024) { + EXPLAIN_ROW_APPEND(" Buffers:%.2f Kb", pExecInfo->sortBuffer / (1024.0)); + } else { + EXPLAIN_ROW_APPEND(" Buffers:%d b", pExecInfo->sortBuffer); + } + + EXPLAIN_ROW_APPEND(" loops:%d", pExecInfo->loops); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); + } if (verbose) { EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT); From 140e15bc8d742ab5e8bf226f093f9eece9fba402 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Mon, 13 Jun 2022 15:07:54 +0800 Subject: [PATCH 2/2] fix: merge interval operator optimization and explain analyze multimerge operator --- source/dnode/vnode/src/tsdb/tsdbRead.c | 1 + source/libs/command/src/explain.c | 2 +- source/libs/executor/src/timewindowoperator.c | 134 +++++------------- 3 files changed, 41 insertions(+), 96 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index c07aed86b0..595b2e117b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3157,6 +3157,7 @@ static bool loadDataBlockFromTableSeq(STsdbReadHandle* pTsdbReadHandle) { } // handle data in cache situation +// bool tsdbNextDataBlock(tsdbReaderT pHandle, uint64_t uid) bool tsdbNextDataBlock(tsdbReaderT pHandle) { STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle; diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index d894491775..960f8dc9ac 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -1036,7 +1036,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i if (EXPLAIN_MODE_ANALYZE == ctx->mode) { // sort key - EXPLAIN_ROW_NEW(level + 1, "Sort Key: "); + EXPLAIN_ROW_NEW(level + 1, "Merge Key: "); if (pResNode->pExecInfo) { for (int32_t i = 0; i < LIST_LENGTH(pMergeNode->pMergeKeys); ++i) { SOrderByExprNode *ptn = nodesListGetNode(pMergeNode->pMergeKeys, i); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index e70a4c413c..9a7045110c 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3195,7 +3195,6 @@ _error: typedef struct SMergeIntervalAggOperatorInfo { SIntervalAggOperatorInfo intervalAggOperatorInfo; - SHashObj* groupIntervalHash; bool hasGroupId; uint64_t groupId; SSDataBlock* prefetchedBlock; @@ -3204,39 +3203,24 @@ typedef struct SMergeIntervalAggOperatorInfo { void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) { SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param; - taosHashCleanup(miaInfo->groupIntervalHash); destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput); } -static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock, - STimeWindow* newWin) { +static int32_t outputMergeIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock, TSKEY wstartTs) { SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info; SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; bool ascScan = (iaInfo->order == TSDB_ORDER_ASC); - STimeWindow* prevWin = taosHashGet(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId)); - if (prevWin == NULL) { - taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), newWin, sizeof(STimeWindow)); - return 0; - } + SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &wstartTs, TSDB_KEYSIZE, tableGroupId); + SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, + GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); + ASSERT(p1 != NULL); - if (newWin == NULL || (ascScan && newWin->skey > prevWin->ekey || (!ascScan) && newWin->skey < prevWin->ekey) ) { - SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &prevWin->skey, TSDB_KEYSIZE, tableGroupId); - SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, - GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); - ASSERT(p1 != NULL); - - finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, iaInfo->binfo.pCtx, pOperatorInfo->pExpr, - pOperatorInfo->numOfExprs, iaInfo->binfo.rowCellInfoOffset, pResultBlock, - pTaskInfo); - taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); - if (newWin == NULL) { - taosHashRemove(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId)); - } else { - taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), newWin, sizeof(STimeWindow)); - } - } + finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, iaInfo->binfo.pCtx, pOperatorInfo->pExpr, + pOperatorInfo->numOfExprs, iaInfo->binfo.rowCellInfoOffset, pResultBlock, + pTaskInfo); + taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); return 0; } @@ -3252,13 +3236,14 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* int32_t numOfOutput = pOperatorInfo->numOfExprs; int64_t* tsCols = extractTsCol(pBlock, iaInfo); uint64_t tableGroupId = pBlock->info.groupId; - bool ascScan = (iaInfo->order == TSDB_ORDER_ASC); TSKEY blockStartTs = getStartTsKey(&pBlock->info.window, tsCols); SResultRow* pResult = NULL; - STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval, - iaInfo->interval.precision, &iaInfo->win); + STimeWindow win; + win.skey = blockStartTs; + win.ekey = taosTimeAdd(win.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit, iaInfo->interval.precision) - 1; + //TODO: remove the hash table usage (groupid + winkey => result row position) int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, iaInfo->binfo.pCtx, numOfOutput, iaInfo->binfo.rowCellInfoOffset, &iaInfo->aggSup, pTaskInfo); @@ -3266,72 +3251,39 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } - TSKEY ekey = ascScan ? win.ekey : win.skey; - int32_t forwardRows = - getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->order); - ASSERT(forwardRows > 0); - - // prev time window not interpolation yet. - if (iaInfo->timeWindowInterpo) { - SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult); - doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos); - - // restore current time window - ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, - iaInfo->binfo.pCtx, numOfOutput, iaInfo->binfo.rowCellInfoOffset, &iaInfo->aggSup, - pTaskInfo); - if (ret != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - // window start key interpolation - doWindowBorderInterpolation(iaInfo, pBlock, numOfOutput, iaInfo->binfo.pCtx, pResult, &win, startPos, forwardRows); - } - - updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, true); - doApplyFunctions(pTaskInfo, iaInfo->binfo.pCtx, &win, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, - pBlock->info.rows, numOfOutput, iaInfo->order); - doCloseWindow(pResultRowInfo, iaInfo, pResult); - - // output previous interval results after this interval (&win) is closed - outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &win); - - STimeWindow nextWin = win; + TSKEY currTs = blockStartTs; + TSKEY currPos = startPos; + STimeWindow currWin = win; while (1) { - int32_t prevEndPos = forwardRows - 1 + startPos; - startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, iaInfo->order); - if (startPos < 0) { + ++currPos; + if (currPos >= pBlock->info.rows) { break; } + if (tsCols[currPos] == currTs) { + continue; + } else { + updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); + doApplyFunctions(pTaskInfo, iaInfo->binfo.pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, + currPos - startPos, tsCols, pBlock->info.rows, numOfOutput, iaInfo->order); - // null data, failed to allocate more memory buffer - int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, - iaInfo->binfo.pCtx, numOfOutput, iaInfo->binfo.rowCellInfoOffset, - &iaInfo->aggSup, pTaskInfo); - if (code != TSDB_CODE_SUCCESS || pResult == NULL) { - longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + outputMergeIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs); + + currTs = tsCols[currPos]; + currWin.skey = currTs; + currWin.ekey = taosTimeAdd(currWin.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit, iaInfo->interval.precision) - 1; + startPos = currPos; + ret = setTimeWindowOutputBuf(pResultRowInfo, &currWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, iaInfo->binfo.pCtx, + numOfOutput, iaInfo->binfo.rowCellInfoOffset, &iaInfo->aggSup, pTaskInfo); + if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { + longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } } - - ekey = ascScan ? nextWin.ekey : nextWin.skey; - forwardRows = - getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->order); - - // window start(end) key interpolation - doWindowBorderInterpolation(iaInfo, pBlock, numOfOutput, iaInfo->binfo.pCtx, pResult, &nextWin, startPos, - forwardRows); - - updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, true); - doApplyFunctions(pTaskInfo, iaInfo->binfo.pCtx, &nextWin, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, - tsCols, pBlock->info.rows, numOfOutput, iaInfo->order); - doCloseWindow(pResultRowInfo, iaInfo, pResult); - - // output previous interval results after this interval (&nextWin) is closed - outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &nextWin); } + updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); + doApplyFunctions(pTaskInfo, iaInfo->binfo.pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, + currPos - startPos, tsCols, pBlock->info.rows, numOfOutput, iaInfo->order); - if (iaInfo->timeWindowInterpo) { - saveDataBlockLastRow(iaInfo->pPrevValues, pBlock, iaInfo->pInterpCols); - } + outputMergeIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs); } static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { @@ -3385,13 +3337,6 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { } pRes->info.groupId = miaInfo->groupId; - } else { - void* p = taosHashIterate(miaInfo->groupIntervalHash, NULL); - if (p != NULL) { - size_t len = 0; - uint64_t* pKey = taosHashGetKey(p, &len); - outputPrevIntervalResult(pOperator, *pKey, pRes, NULL); - } } if (pRes->info.rows == 0) { @@ -3421,7 +3366,6 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI iaInfo->execModel = pTaskInfo->execModel; iaInfo->primaryTsIndex = primaryTsSlotId; - miaInfo->groupIntervalHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK); size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(pOperator, 4096);