diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 22af458933..9cf5e0c2bf 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -81,9 +81,9 @@ typedef struct SResultInfo { // TODO refactor } SResultInfo; typedef struct STableQueryInfo { - TSKEY lastKey; // last check ts + TSKEY lastKey; // last check ts, todo remove it later uint64_t uid; // table uid - int32_t groupIndex; // group id in table list +// int32_t groupIndex; // group id in table list // SVariant tag; SResultRowInfo resInfo; // result info } STableQueryInfo; @@ -645,8 +645,6 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOf SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, - SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createTagScanOperatorInfo(SReaderHandle* pReaderHandle, SExprInfo* pExpr, int32_t numOfOutput); #endif diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 3667cecc51..fbed927dca 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3173,17 +3173,14 @@ int32_t setTimestampListJoinInfo(STaskRuntimeEnv* pRuntimeEnv, SVariant* pTag, S * merged during merge stage. In this case, we need the pTableQueryInfo->lastResRows to decide if there * is a previous result generated or not. */ -void setIntervalQueryRange(STaskRuntimeEnv* pRuntimeEnv, TSKEY key) { - STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; +void setIntervalQueryRange(STableQueryInfo* pTableQueryInfo, TSKEY key, STimeWindow* pQRange) { SResultRowInfo* pResultRowInfo = &pTableQueryInfo->resInfo; - if (pResultRowInfo->curPos != -1) { return; } - // pTableQueryInfo->win.skey = key; - STimeWindow win = {.skey = key, .ekey = pQueryAttr->window.ekey}; +// pTableQueryInfo->win.skey = key; + STimeWindow win = {.skey = key, .ekey = pQRange->ekey}; /** * In handling the both ascending and descending order super table query, we need to find the first qualified @@ -5082,8 +5079,8 @@ static SSDataBlock* doMultiTableAggregate(SOperatorInfo* pOperator, bool* newgro TSKEY_MIN_SUB(key, -1); } - setExecutionContext(pOperator->numOfOutput, pAggInfo->current->groupIndex, key, pTaskInfo, pAggInfo->current, - pAggInfo); +// setExecutionContext(pOperator->numOfOutput, pAggInfo->current->groupIndex, key, pTaskInfo, pAggInfo->current, +// pAggInfo); doAggregateImpl(pOperator, 0, pInfo->pCtx); } @@ -5288,21 +5285,22 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { return pOperator->getStreamResFn(pOperator, newgroup); + } else { + pTaskInfo->code = pOperator->_openFn(pOperator); + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + return NULL; + } + + blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity); + toSDatablock(pInfo->binfo.pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, + pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); + + if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { + doSetOperatorCompleted(pOperator); + } + + return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; } - - pTaskInfo->code = pOperator->_openFn(pOperator); - if (pTaskInfo->code != TSDB_CODE_SUCCESS) { - return NULL; - } - - blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity); - toSDatablock(pInfo->binfo.pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); - - if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { - doSetOperatorCompleted(pOperator); - } - - return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; } static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) { @@ -5411,20 +5409,18 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup return NULL; } - STableIntervalOperatorInfo* pIntervalInfo = pOperator->info; - STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + STableIntervalOperatorInfo* pInfo = pOperator->info; if (pOperator->status == OP_RES_TO_RETURN) { int64_t st = taosGetTimestampUs(); - - // copyToSDataBlock(NULL, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); - if (pIntervalInfo->binfo.pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { + if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } - return pIntervalInfo->binfo.pRes; + + return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; } - STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { @@ -5437,25 +5433,30 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup } // the pDataBlock are always the same one, no need to call this again - STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; - // setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); - setInputDataBlock(pOperator, pIntervalInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC); - setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey); + setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC); + STableQueryInfo* pTableQueryInfo = pInfo->pCurrent; - hashIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pBlock, pTableQueryInfo->groupIndex); + setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window); + hashIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pBlock, pBlock->info.groupId); } - pOperator->status = OP_RES_TO_RETURN; - doCloseAllTimeWindow(pRuntimeEnv); - setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); + closeAllResultRows(&pInfo->binfo.resultRowInfo); + finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, + &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); - // copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); - if (pIntervalInfo->binfo.pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { - pOperator->status = OP_EXEC_DONE; + initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo); + OPTR_SET_OPENED(pOperator); + + blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity); + toSDatablock(pInfo->binfo.pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, + pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); + + if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { + doSetOperatorCompleted(pOperator); } - return pIntervalInfo->binfo.pRes; + return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; } static SSDataBlock* doAllSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup) { @@ -5491,9 +5492,9 @@ static SSDataBlock* doAllSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgr // setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); // setInputDataBlock(pOperator, pIntervalInfo->binfo.pCtx, pBlock, pQueryAttr->order.order); - setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey); +// setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey); - hashAllIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pBlock, pTableQueryInfo->groupIndex); +// hashAllIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pBlock, pTableQueryInfo->groupIndex); } pOperator->status = OP_RES_TO_RETURN; @@ -5874,7 +5875,7 @@ static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInf STableQueryInfo* pTQueryInfo = &pTableQueryInfo[index++]; pTQueryInfo->uid = pk->uid; pTQueryInfo->lastKey = pk->lastKey; - pTQueryInfo->groupIndex = i; +// pTQueryInfo->groupIndex = i; } } @@ -6276,52 +6277,6 @@ _error: return NULL; } -SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, - SExprInfo* pExpr, int32_t numOfOutput) { - STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo)); - - initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); - - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - pOperator->name = "MultiTableTimeIntervalOperator"; - // pOperator->operatorType = OP_MultiTableTimeInterval; - pOperator->blockingOptr = true; - pOperator->status = OP_NOT_OPENED; - pOperator->pExpr = pExpr; - pOperator->numOfOutput = numOfOutput; - pOperator->info = pInfo; - - pOperator->getNextFn = doSTableIntervalAgg; - pOperator->closeFn = destroyBasicOperatorInfo; - - int32_t code = appendDownstream(pOperator, &downstream, 1); - return pOperator; -} - -SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, - SExprInfo* pExpr, int32_t numOfOutput) { - STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo)); - - initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); - - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - pOperator->name = "AllMultiTableTimeIntervalOperator"; - // pOperator->operatorType = OP_AllMultiTableTimeInterval; - pOperator->blockingOptr = true; - - pOperator->status = OP_NOT_OPENED; - pOperator->pExpr = pExpr; - pOperator->numOfOutput = numOfOutput; - pOperator->info = pInfo; - - pOperator->getNextFn = doAllSTableIntervalAgg; - pOperator->closeFn = destroyBasicOperatorInfo; - - int32_t code = appendDownstream(pOperator, &downstream, 1); - - return pOperator; -} - static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, int64_t* fillVal, STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType) { struct SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, (int64_t*)fillVal); @@ -7480,7 +7435,7 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOf pOperator->name = "JoinOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_JOIN; - pOperator->blockingOptr = true; + pOperator->blockingOptr = false; pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExprInfo; pOperator->numOfOutput = numOfCols;