From 08744dfb496c3ae34c769685e9aa55ece12ed802 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 6 Apr 2021 11:09:19 +0800 Subject: [PATCH] [td-2859]remove one operator. --- src/query/inc/qExecutor.h | 13 +++-- src/query/src/qExecutor.c | 100 ++++++++++++-------------------------- src/query/src/qPlan.c | 6 +-- 3 files changed, 37 insertions(+), 82 deletions(-) diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index b66385add4..c91b835980 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -283,13 +283,12 @@ enum OPERATOR_TYPE_E { OP_Arithmetic = 7, OP_Groupby = 8, OP_Limit = 9, - OP_Offset = 10, - OP_TimeWindow = 11, - OP_SessionWindow = 12, - OP_Fill = 13, - OP_MultiTableAggregate = 14, - OP_MultiTableTimeInterval = 15, - OP_DummyInput = 16, //TODO remove it after fully refactor. + OP_TimeWindow = 10, + OP_SessionWindow = 11, + OP_Fill = 12, + OP_MultiTableAggregate = 13, + OP_MultiTableTimeInterval = 14, + OP_DummyInput = 15, //TODO remove it after fully refactor. }; typedef struct SOperatorInfo { diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index d80912db48..0c5686b216 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -163,7 +163,6 @@ static SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryR static SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); static SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); static SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); -static SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); static SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); static SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); static SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); @@ -1739,11 +1738,6 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf break; } - case OP_Offset: { - pRuntimeEnv->proot = createOffsetOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot); - break; - } - case OP_Fill: { SOperatorInfo* pInfo = pRuntimeEnv->proot; pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput); @@ -4535,23 +4529,45 @@ static SSDataBlock* doArithmeticOperation(void* param) { } static SSDataBlock* doLimit(void* param) { - SOperatorInfo* pOperator = (SOperatorInfo*) param; + SOperatorInfo* pOperator = (SOperatorInfo*)param; if (pOperator->status == OP_EXEC_DONE) { return NULL; } SLimitOperatorInfo* pInfo = pOperator->info; + SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; - SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream); - if (pBlock == NULL) { - setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); - pOperator->status = OP_EXEC_DONE; - return NULL; + SSDataBlock* pBlock = NULL; + while (1) { + pBlock = pOperator->upstream->exec(pOperator->upstream); + if (pBlock == NULL) { + setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); + pOperator->status = OP_EXEC_DONE; + return NULL; + } + + if (pRuntimeEnv->currentOffset == 0) { + break; + } else if (pRuntimeEnv->currentOffset >= pBlock->info.rows) { + pRuntimeEnv->currentOffset -= pBlock->info.rows; + } else { + int32_t remain = (int32_t)(pBlock->info.rows - pRuntimeEnv->currentOffset); + pBlock->info.rows = remain; + + for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + + int16_t bytes = pColInfoData->info.bytes; + memmove(pColInfoData->pData, pColInfoData->pData + bytes * pRuntimeEnv->currentOffset, remain * bytes); + } + + pRuntimeEnv->currentOffset = 0; + break; + } } if (pInfo->total + pBlock->info.rows >= pInfo->limit) { - pBlock->info.rows = (int32_t) (pInfo->limit - pInfo->total); - + pBlock->info.rows = (int32_t)(pInfo->limit - pInfo->total); pInfo->total = pInfo->limit; setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); @@ -4563,44 +4579,6 @@ static SSDataBlock* doLimit(void* param) { return pBlock; } -// TODO add log -static SSDataBlock* doOffset(void* param) { - SOperatorInfo *pOperator = (SOperatorInfo *)param; - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } - - SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; - - while (1) { - SSDataBlock *pBlock = pOperator->upstream->exec(pOperator->upstream); - if (pBlock == NULL) { - setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); - pOperator->status = OP_EXEC_DONE; - return NULL; - } - - if (pRuntimeEnv->currentOffset == 0) { - return pBlock; - } else if (pRuntimeEnv->currentOffset >= pBlock->info.rows) { - pRuntimeEnv->currentOffset -= pBlock->info.rows; - } else { - int32_t remain = (int32_t)(pBlock->info.rows - pRuntimeEnv->currentOffset); - pBlock->info.rows = remain; - - for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, i); - - int16_t bytes = pColInfoData->info.bytes; - memmove(pColInfoData->pData, pColInfoData->pData + bytes * pRuntimeEnv->currentOffset, remain * bytes); - } - - pRuntimeEnv->currentOffset = 0; - return pBlock; - } - } -} - static SSDataBlock* doIntervalAgg(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { @@ -5025,24 +5003,6 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI return pOperator; } -SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { - SOffsetOperatorInfo* pInfo = calloc(1, sizeof(SOffsetOperatorInfo)); - - pInfo->offset = pRuntimeEnv->pQueryAttr->limit.offset; - SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - - pOperator->name = "OffsetOperator"; - pOperator->operatorType = OP_Offset; - pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; - pOperator->upstream = upstream; - pOperator->exec = doOffset; - pOperator->info = pInfo; - pOperator->pRuntimeEnv = pRuntimeEnv; - - return pOperator; -} - SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); diff --git a/src/query/src/qPlan.c b/src/query/src/qPlan.c index 7429004289..8ddb2bbd61 100644 --- a/src/query/src/qPlan.c +++ b/src/query/src/qPlan.c @@ -116,12 +116,8 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) { taosArrayPush(plan, &op); } - if (pQueryAttr->limit.offset > 0) { - op = OP_Offset; - taosArrayPush(plan, &op); - } - if (pQueryAttr->limit.limit > 0) { + if (pQueryAttr->limit.limit > 0 || pQueryAttr->limit.offset > 0) { op = OP_Limit; taosArrayPush(plan, &op); }