From f86ce16e36f33b8823b8b60e9d1e2cb5b5e65158 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Sat, 23 Jul 2022 15:06:53 +0800 Subject: [PATCH] fix(stream): push retrive datablock --- source/libs/executor/src/timewindowoperator.c | 88 +++++++++++++------ 1 file changed, 60 insertions(+), 28 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index d1ecab8e4a..0f1272c964 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -772,6 +772,41 @@ int32_t binarySearch(void* keyList, int num, TSKEY key, int order, __get_value_f return midPos; } +int32_t comparePullWinKey(void* pKey, void* data, int32_t index) { + SArray* res = (SArray*)data; + SPullWindowInfo* pos = taosArrayGet(res, index); + SPullWindowInfo* pData = (SPullWindowInfo*) pKey; + if (pData->window.skey == pos->window.skey) { + if (pData->groupId > pos->groupId) { + return 1; + } else if (pData->groupId < pos->groupId) { + return -1; + } + return 0; + } else if (pData->window.skey > pos->window.skey) { + return 1; + } + return -1; +} + +static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) { + int32_t size = taosArrayGetSize(pPullWins); + int32_t index = binarySearchCom(pPullWins, size, pPullInfo, TSDB_ORDER_DESC, comparePullWinKey); + if (index == -1) { + index = 0; + } else { + if (comparePullWinKey(pPullInfo, pPullWins, index) > 0) { + index++; + } else { + return TSDB_CODE_SUCCESS; + } + } + if (taosArrayInsert(pPullWins, index, pPullInfo) == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + return TSDB_CODE_SUCCESS; +} + int32_t compareResKey(void* pKey, void* data, int32_t index) { SArray* res = (SArray*)data; SResKeyPos* pos = taosArrayGetP(res, index); @@ -2586,7 +2621,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc if (isDeletedWindow(&nextWin, tableGroupId, &pInfo->aggSup) && !chIds) { SPullWindowInfo pull = {.window = nextWin, .groupId = tableGroupId}; // add pull data request - taosArrayPush(pInfo->pPullWins, &pull); + savePullWindow(&pull, pInfo->pPullWins); int32_t size = taosArrayGetSize(pInfo->pChildren); addPullWindow(pInfo->pPullDataMap, &winRes, size); qDebug("===stream===prepare retrive %" PRId64 ", size:%d", winRes.ts, size); @@ -2674,14 +2709,6 @@ void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsCol blockDataUpdateTsWindow(pDest, 0); } -static bool needBreak(SStreamFinalIntervalOperatorInfo* pInfo) { - int32_t size = taosArrayGetSize(pInfo->pPullWins); - if (pInfo->pullIndex < size) { - return true; - } - return false; -} - static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pBlock) { clearSpecialDataBlock(pBlock); int32_t size = taosArrayGetSize(array); @@ -2748,6 +2775,14 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } else if (pOperator->status == OP_RES_TO_RETURN) { + doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes); + if (pInfo->pPullDataRes->info.rows != 0) { + // process the rest of the data + ASSERT(IS_FINAL_OP(pInfo)); + printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); + return pInfo->pPullDataRes; + } + doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); if (pInfo->binfo.pRes->info.rows == 0) { pOperator->status = OP_EXEC_DONE; @@ -2776,13 +2811,13 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { // process the rest of the data return pInfo->pUpdateRes; } - doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes); - if (pInfo->pPullDataRes->info.rows != 0) { - // process the rest of the data - ASSERT(IS_FINAL_OP(pInfo)); - printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); - return pInfo->pPullDataRes; - } + // doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes); + // if (pInfo->pPullDataRes->info.rows != 0) { + // // process the rest of the data + // ASSERT(IS_FINAL_OP(pInfo)); + // printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); + // return pInfo->pPullDataRes; + // } doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); if (pInfo->pDelRes->info.rows != 0) { // process the rest of the data @@ -2882,10 +2917,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info; setInputDataBlock(pChildOp, pChildOp->exprSupp.pCtx, pBlock, pChInfo->order, MAIN_SCAN, true); doHashInterval(pChildOp, pBlock, pBlock->info.groupId, NULL); - - if (needBreak(pInfo)) { - break; - } } } @@ -2899,6 +2930,15 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); + + doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes); + if (pInfo->pPullDataRes->info.rows != 0) { + // process the rest of the data + ASSERT(IS_FINAL_OP(pInfo)); + printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); + return pInfo->pPullDataRes; + } + doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); if (pInfo->binfo.pRes->info.rows != 0) { printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); @@ -2913,14 +2953,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { return pInfo->pUpdateRes; } - doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes); - if (pInfo->pPullDataRes->info.rows != 0) { - // process the rest of the data - ASSERT(IS_FINAL_OP(pInfo)); - printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); - return pInfo->pPullDataRes; - } - doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); if (pInfo->pDelRes->info.rows != 0) { // process the rest of the data