From 44123ec8f0bf0946fafde605d436b18c3c415d60 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Thu, 11 Aug 2022 10:19:06 +0800 Subject: [PATCH] feat(stream):optimzie stream interval --- source/libs/executor/inc/executorimpl.h | 1 + source/libs/executor/src/executorimpl.c | 6 + source/libs/executor/src/scanoperator.c | 9 +- source/libs/executor/src/timewindowoperator.c | 103 ++++++++++++------ source/libs/stream/src/streamUpdate.c | 2 +- 5 files changed, 83 insertions(+), 38 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index d460644633..d5711087e7 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -1017,6 +1017,7 @@ bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap); int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TSKEY* pEndTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted); bool functionNeedToExecute(SqlFunctionCtx* pCtx); +bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup); bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup); bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup); void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 6de77f63f4..3ac08bc20e 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1277,8 +1277,12 @@ void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo) { } void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) { + bool init = false; for (int32_t i = 0; i < numOfOutput; ++i) { pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset); + if (init) { + continue; + } struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo; if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) { @@ -1295,6 +1299,8 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO } else { pResInfo->initialized = true; } + } else { + init = true; } } } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 2f6d04f684..d8de8df163 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1174,10 +1174,15 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) { SResultRowInfo dumyInfo; dumyInfo.cur.pageId = -1; - STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC); + bool isClosed = false; + STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX}; + if (isOverdue(tsCol[rowId], &pInfo->twAggSup)) { + win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC); + isClosed = isCloseWindow(&win, &pInfo->twAggSup); + } // must check update info first. bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]); - if ((update || (isSignleIntervalWindow(pInfo) && isCloseWindow(&win, &pInfo->twAggSup) && + if ((update || (isSignleIntervalWindow(pInfo) && isClosed && isDeletedWindow(&win, pBlock->info.groupId, pInfo->sessionSup.pIntervalAggSup))) && out) { appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid); } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 610303dc50..548bc046c2 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -851,23 +851,34 @@ static int32_t saveResult(int64_t ts, int32_t pageId, int32_t offset, uint64_t g return TSDB_CODE_SUCCESS; } +static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_t groupId, SHashObj* pUpdatedMap) { + SResKeyPos* newPos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t)); + if (newPos == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + newPos->groupId = groupId; + newPos->pos = (SResultRowPosition){.pageId = pageId, .offset = offset}; + *(int64_t*)newPos->key = ts; + SWinRes key = {.ts = ts, .groupId = groupId}; + if (taosHashPut(pUpdatedMap, &key, sizeof(SWinRes), &newPos, sizeof(void*)) != TSDB_CODE_SUCCESS) { + taosMemoryFree(newPos); + } + return TSDB_CODE_SUCCESS; +} + +static int32_t saveWinResultRow(SResultRow* result, uint64_t groupId, SHashObj* pUpdatedMap) { + return saveWinResult(result->win.skey, result->pageId, result->offset, groupId, pUpdatedMap);; +} + static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpdated) { return saveResult(result->win.skey, result->pageId, result->offset, groupId, pUpdated); } -static void removeResult(SArray* pUpdated, SWinRes* pKey) { - int32_t size = taosArrayGetSize(pUpdated); - int32_t index = binarySearchCom(pUpdated, size, pKey, TSDB_ORDER_DESC, compareResKey); - if (index >= 0 && 0 == compareResKey(pKey, pUpdated, index)) { - taosArrayRemove(pUpdated, index); - } -} - -static void removeResults(SArray* pWins, SArray* pUpdated) { +static void removeResults(SArray* pWins, SHashObj* pUpdatedMap) { int32_t size = taosArrayGetSize(pWins); for (int32_t i = 0; i < size; i++) { SWinRes* pW = taosArrayGet(pWins, i); - removeResult(pUpdated, pW); + taosHashRemove(pUpdatedMap, pW, sizeof(SWinRes)); } } @@ -894,11 +905,14 @@ int32_t compareWinRes(void* pKey, void* data, int32_t index) { return -1; } -static void removeDeleteResults(SArray* pUpdated, SArray* pDelWins) { - int32_t upSize = taosArrayGetSize(pUpdated); +static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) { + if (!pUpdatedMap || taosHashGetSize(pUpdatedMap) == 0) { + return; + } int32_t delSize = taosArrayGetSize(pDelWins); - for (int32_t i = 0; i < upSize; i++) { - SResKeyPos* pResKey = taosArrayGetP(pUpdated, i); + void* pIte = NULL; + while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) { + SResKeyPos* pResKey = (SResKeyPos*)pIte; int32_t index = binarySearchCom(pDelWins, delSize, pResKey, TSDB_ORDER_DESC, compareWinRes); if (index >= 0 && 0 == compareWinRes(pResKey, pDelWins, index)) { taosArrayRemove(pDelWins, index); @@ -914,7 +928,7 @@ bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup) { bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup) { return isOverdue(pWin->ekey, pSup); } static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, - int32_t scanFlag, SArray* pUpdated) { + int32_t scanFlag, SHashObj* pUpdatedMap) { SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info; SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; @@ -940,7 +954,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul } if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { - saveResultRow(pResult, tableGroupId, pUpdated); + saveWinResultRow(pResult, tableGroupId, pUpdatedMap); setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur); } } @@ -997,7 +1011,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul } if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { - saveResultRow(pResult, tableGroupId, pUpdated); + saveWinResultRow(pResult, tableGroupId, pUpdatedMap); setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur); } @@ -1437,7 +1451,7 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* } } -static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) { +static int32_t getAllIntervalWindow(SHashObj* pHashMap, SHashObj* resWins) { void* pIte = NULL; size_t keyLen = 0; while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) { @@ -1446,7 +1460,7 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) { ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))); TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t)); SResultRowPosition* pPos = (SResultRowPosition*)pIte; - int32_t code = saveResult(ts, pPos->pageId, pPos->offset, groupId, resWins); + int32_t code = saveWinResult(ts, pPos->pageId, pPos->offset, groupId, resWins); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1455,7 +1469,7 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) { } static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval, - SHashObj* pPullDataMap, SArray* closeWins, SArray* pRecyPages, + SHashObj* pPullDataMap, SHashObj* closeWins, SArray* pRecyPages, SDiskbasedBuf* pDiscBuf) { qDebug("===stream===close interval window"); void* pIte = NULL; @@ -1487,7 +1501,7 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, } SResultRowPosition* pPos = (SResultRowPosition*)pIte; if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { - int32_t code = saveResult(ts, pPos->pageId, pPos->offset, groupId, closeWins); + int32_t code = saveWinResult(ts, pPos->pageId, pPos->offset, groupId, closeWins); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1577,11 +1591,14 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); // SResKeyPos + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP); + SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { break; } + // qInfo("===stream===%ld", pBlock->info.version); printDataBlock(pBlock, "single interval recv"); if (pBlock->info.type == STREAM_CLEAR) { @@ -1594,7 +1611,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { doDeleteSpecifyIntervalWindow(&pInfo->aggSup, pBlock, pInfo->pDelWins, &pInfo->interval); continue; } else if (pBlock->info.type == STREAM_GET_ALL) { - getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated); + getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap); continue; } @@ -1617,17 +1634,24 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { } pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); - hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdated); + hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdatedMap); } pOperator->status = OP_RES_TO_RETURN; - closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdated, + closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap, pInfo->pRecycledPages, pInfo->aggSup.pResultBuf); + void* pIte = NULL; + while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) { + taosArrayPush(pUpdated, pIte); + } + taosHashCleanup(pUpdatedMap); + taosArraySort(pUpdated, resultrowComparAsc); + finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); - removeDeleteResults(pUpdated, pInfo->pDelWins); + removeDeleteResults(pUpdatedMap, pInfo->pDelWins); doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); if (pInfo->pDelRes->info.rows > 0) { return pInfo->pDelRes; @@ -2831,7 +2855,7 @@ STimeWindow getFinalTimeWindow(int64_t ts, SInterval* pInterval) { } static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t tableGroupId, - SArray* pUpdated) { + SHashObj* pUpdatedMap) { SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info; SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo); SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; @@ -2913,8 +2937,8 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); } - if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) { - saveResultRow(pResult, tableGroupId, pUpdated); + if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) { + saveWinResultRow(pResult, tableGroupId, pUpdatedMap); setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur); } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); @@ -3020,6 +3044,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info; SOperatorInfo* downstream = pOperator->pDownstream[0]; SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP); + SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); TSKEY maxTs = INT64_MIN; SExprSupp* pSup = &pOperator->exprSupp; @@ -3077,7 +3103,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { clearSpecialDataBlock(pInfo->pUpdateRes); - removeDeleteResults(pUpdated, pInfo->pDelWins); + removeDeleteResults(pUpdatedMap, pInfo->pDelWins); pOperator->status = OP_RES_TO_RETURN; qDebug("%s return data", IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); break; @@ -3104,7 +3130,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { taosArrayDestroy(pUpWins); continue; } - removeResults(pUpWins, pUpdated); + removeResults(pUpWins, pUpdatedMap); copyDataBlock(pInfo->pUpdateRes, pBlock); // copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex); pInfo->returnUpdate = true; @@ -3122,15 +3148,15 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { pOperator->exprSupp.numOfExprs, pOperator->pTaskInfo, pUpdated); continue; } - removeResults(pInfo->pDelWins, pUpdated); + removeResults(pInfo->pDelWins, pUpdatedMap); break; } else if (pBlock->info.type == STREAM_GET_ALL && IS_FINAL_OP(pInfo)) { - getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated); + getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap); continue; } else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) { SArray* pUpWins = taosArrayInit(8, sizeof(SWinRes)); doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pUpWins); - removeResults(pUpWins, pUpdated); + removeResults(pUpWins, pUpdatedMap); taosArrayDestroy(pUpWins); if (taosArrayGetSize(pUpdated) > 0) { break; @@ -3146,7 +3172,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); } setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->order, MAIN_SCAN, true); - doHashInterval(pOperator, pBlock, pBlock->info.groupId, pUpdated); + doHashInterval(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap); if (IS_FINAL_OP(pInfo)) { int32_t chIndex = getChildIndex(pBlock); int32_t size = taosArrayGetSize(pInfo->pChildren); @@ -3171,12 +3197,19 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); if (IS_FINAL_OP(pInfo)) { closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pInfo->pPullDataMap, - pUpdated, pInfo->pRecycledPages, pInfo->aggSup.pResultBuf); + pUpdatedMap, pInfo->pRecycledPages, pInfo->aggSup.pResultBuf); closeChildIntervalWindow(pInfo->pChildren, pInfo->twAggSup.maxTs); } else { pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs; } + void* pIte = NULL; + while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) { + taosArrayPush(pUpdated, pIte); + } + taosHashCleanup(pUpdatedMap); + taosArraySort(pUpdated, resultrowComparAsc); + finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index c686fa05ce..0b1ce27b77 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -124,7 +124,7 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma } pInfo->numBuckets = DEFAULT_BUCKET_SIZE; pInfo->pCloseWinSBF = NULL; - _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT); pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK); pInfo->maxVersion = 0; pInfo->scanGroupId = 0;