refactor(query): refactor the executor module of query.

This commit is contained in:
Haojun Liao 2022-04-15 23:02:26 +08:00
parent 774e4ad62c
commit e6a4af8ed8
2 changed files with 48 additions and 95 deletions

View File

@ -81,9 +81,9 @@ typedef struct SResultInfo { // TODO refactor
} SResultInfo; } SResultInfo;
typedef struct STableQueryInfo { typedef struct STableQueryInfo {
TSKEY lastKey; // last check ts TSKEY lastKey; // last check ts, todo remove it later
uint64_t uid; // table uid uint64_t uid; // table uid
int32_t groupIndex; // group id in table list // int32_t groupIndex; // group id in table list
// SVariant tag; // SVariant tag;
SResultRowInfo resInfo; // result info SResultRowInfo resInfo; // result info
} STableQueryInfo; } STableQueryInfo;
@ -645,8 +645,6 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOf
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SExprInfo* pExpr, int32_t numOfOutput); SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createTagScanOperatorInfo(SReaderHandle* pReaderHandle, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createTagScanOperatorInfo(SReaderHandle* pReaderHandle, SExprInfo* pExpr, int32_t numOfOutput);
#endif #endif

View File

@ -3173,17 +3173,14 @@ int32_t setTimestampListJoinInfo(STaskRuntimeEnv* pRuntimeEnv, SVariant* pTag, S
* merged during merge stage. In this case, we need the pTableQueryInfo->lastResRows to decide if there * merged during merge stage. In this case, we need the pTableQueryInfo->lastResRows to decide if there
* is a previous result generated or not. * is a previous result generated or not.
*/ */
void setIntervalQueryRange(STaskRuntimeEnv* pRuntimeEnv, TSKEY key) { void setIntervalQueryRange(STableQueryInfo* pTableQueryInfo, TSKEY key, STimeWindow* pQRange) {
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
SResultRowInfo* pResultRowInfo = &pTableQueryInfo->resInfo; SResultRowInfo* pResultRowInfo = &pTableQueryInfo->resInfo;
if (pResultRowInfo->curPos != -1) { if (pResultRowInfo->curPos != -1) {
return; return;
} }
// pTableQueryInfo->win.skey = key; // pTableQueryInfo->win.skey = key;
STimeWindow win = {.skey = key, .ekey = pQueryAttr->window.ekey}; STimeWindow win = {.skey = key, .ekey = pQRange->ekey};
/** /**
* In handling the both ascending and descending order super table query, we need to find the first qualified * In handling the both ascending and descending order super table query, we need to find the first qualified
@ -5082,8 +5079,8 @@ static SSDataBlock* doMultiTableAggregate(SOperatorInfo* pOperator, bool* newgro
TSKEY_MIN_SUB(key, -1); TSKEY_MIN_SUB(key, -1);
} }
setExecutionContext(pOperator->numOfOutput, pAggInfo->current->groupIndex, key, pTaskInfo, pAggInfo->current, // setExecutionContext(pOperator->numOfOutput, pAggInfo->current->groupIndex, key, pTaskInfo, pAggInfo->current,
pAggInfo); // pAggInfo);
doAggregateImpl(pOperator, 0, pInfo->pCtx); doAggregateImpl(pOperator, 0, pInfo->pCtx);
} }
@ -5288,21 +5285,22 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
return pOperator->getStreamResFn(pOperator, newgroup); return pOperator->getStreamResFn(pOperator, newgroup);
} else {
pTaskInfo->code = pOperator->_openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
return NULL;
}
blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity);
toSDatablock(pInfo->binfo.pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr,
pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
}
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
} }
pTaskInfo->code = pOperator->_openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
return NULL;
}
blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity);
toSDatablock(pInfo->binfo.pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
}
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
} }
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) { static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) {
@ -5411,20 +5409,18 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup
return NULL; return NULL;
} }
STableIntervalOperatorInfo* pIntervalInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STableIntervalOperatorInfo* pInfo = pOperator->info;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
// copyToSDataBlock(NULL, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
if (pIntervalInfo->binfo.pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
return pIntervalInfo->binfo.pRes;
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
} }
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) { while (1) {
@ -5437,25 +5433,30 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup
} }
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
// setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); // setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
setInputDataBlock(pOperator, pIntervalInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC); setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC);
setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey); STableQueryInfo* pTableQueryInfo = pInfo->pCurrent;
hashIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pBlock, pTableQueryInfo->groupIndex); setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window);
hashIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pBlock, pBlock->info.groupId);
} }
pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pInfo->binfo.resultRowInfo);
doCloseAllTimeWindow(pRuntimeEnv); finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf,
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
// copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
if (pIntervalInfo->binfo.pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { OPTR_SET_OPENED(pOperator);
pOperator->status = OP_EXEC_DONE;
blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity);
toSDatablock(pInfo->binfo.pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr,
pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
} }
return pIntervalInfo->binfo.pRes; return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
} }
static SSDataBlock* doAllSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup) { static SSDataBlock* doAllSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup) {
@ -5491,9 +5492,9 @@ static SSDataBlock* doAllSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgr
// setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); // setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
// setInputDataBlock(pOperator, pIntervalInfo->binfo.pCtx, pBlock, pQueryAttr->order.order); // setInputDataBlock(pOperator, pIntervalInfo->binfo.pCtx, pBlock, pQueryAttr->order.order);
setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey); // setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey);
hashAllIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pBlock, pTableQueryInfo->groupIndex); // hashAllIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pBlock, pTableQueryInfo->groupIndex);
} }
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
@ -5874,7 +5875,7 @@ static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInf
STableQueryInfo* pTQueryInfo = &pTableQueryInfo[index++]; STableQueryInfo* pTQueryInfo = &pTableQueryInfo[index++];
pTQueryInfo->uid = pk->uid; pTQueryInfo->uid = pk->uid;
pTQueryInfo->lastKey = pk->lastKey; pTQueryInfo->lastKey = pk->lastKey;
pTQueryInfo->groupIndex = i; // pTQueryInfo->groupIndex = i;
} }
} }
@ -6276,52 +6277,6 @@ _error:
return NULL; return NULL;
} }
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SExprInfo* pExpr, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo));
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
pOperator->name = "MultiTableTimeIntervalOperator";
// pOperator->operatorType = OP_MultiTableTimeInterval;
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->getNextFn = doSTableIntervalAgg;
pOperator->closeFn = destroyBasicOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
}
SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SExprInfo* pExpr, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo));
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
pOperator->name = "AllMultiTableTimeIntervalOperator";
// pOperator->operatorType = OP_AllMultiTableTimeInterval;
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->getNextFn = doAllSTableIntervalAgg;
pOperator->closeFn = destroyBasicOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
}
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, int64_t* fillVal, static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, int64_t* fillVal,
STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType) { STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType) {
struct SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, (int64_t*)fillVal); struct SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, (int64_t*)fillVal);
@ -7480,7 +7435,7 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOf
pOperator->name = "JoinOperator"; pOperator->name = "JoinOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_JOIN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_JOIN;
pOperator->blockingOptr = true; pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo; pOperator->pExpr = pExprInfo;
pOperator->numOfOutput = numOfCols; pOperator->numOfOutput = numOfCols;