[td-2859]remove one operator.

This commit is contained in:
Haojun Liao 2021-04-06 11:09:19 +08:00
parent 0146d60cbb
commit 08744dfb49
3 changed files with 37 additions and 82 deletions

View File

@ -283,13 +283,12 @@ enum OPERATOR_TYPE_E {
OP_Arithmetic = 7,
OP_Groupby = 8,
OP_Limit = 9,
OP_Offset = 10,
OP_TimeWindow = 11,
OP_SessionWindow = 12,
OP_Fill = 13,
OP_MultiTableAggregate = 14,
OP_MultiTableTimeInterval = 15,
OP_DummyInput = 16, //TODO remove it after fully refactor.
OP_TimeWindow = 10,
OP_SessionWindow = 11,
OP_Fill = 12,
OP_MultiTableAggregate = 13,
OP_MultiTableTimeInterval = 14,
OP_DummyInput = 15, //TODO remove it after fully refactor.
};
typedef struct SOperatorInfo {

View File

@ -163,7 +163,6 @@ static SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryR
static SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
@ -1739,11 +1738,6 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
break;
}
case OP_Offset: {
pRuntimeEnv->proot = createOffsetOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot);
break;
}
case OP_Fill: {
SOperatorInfo* pInfo = pRuntimeEnv->proot;
pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput);
@ -4535,23 +4529,45 @@ static SSDataBlock* doArithmeticOperation(void* param) {
}
static SSDataBlock* doLimit(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
SOperatorInfo* pOperator = (SOperatorInfo*)param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SLimitOperatorInfo* pInfo = pOperator->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
return NULL;
SSDataBlock* pBlock = NULL;
while (1) {
pBlock = pOperator->upstream->exec(pOperator->upstream);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
return NULL;
}
if (pRuntimeEnv->currentOffset == 0) {
break;
} else if (pRuntimeEnv->currentOffset >= pBlock->info.rows) {
pRuntimeEnv->currentOffset -= pBlock->info.rows;
} else {
int32_t remain = (int32_t)(pBlock->info.rows - pRuntimeEnv->currentOffset);
pBlock->info.rows = remain;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
int16_t bytes = pColInfoData->info.bytes;
memmove(pColInfoData->pData, pColInfoData->pData + bytes * pRuntimeEnv->currentOffset, remain * bytes);
}
pRuntimeEnv->currentOffset = 0;
break;
}
}
if (pInfo->total + pBlock->info.rows >= pInfo->limit) {
pBlock->info.rows = (int32_t) (pInfo->limit - pInfo->total);
pBlock->info.rows = (int32_t)(pInfo->limit - pInfo->total);
pInfo->total = pInfo->limit;
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
@ -4563,44 +4579,6 @@ static SSDataBlock* doLimit(void* param) {
return pBlock;
}
// TODO add log
static SSDataBlock* doOffset(void* param) {
SOperatorInfo *pOperator = (SOperatorInfo *)param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
while (1) {
SSDataBlock *pBlock = pOperator->upstream->exec(pOperator->upstream);
if (pBlock == NULL) {
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
return NULL;
}
if (pRuntimeEnv->currentOffset == 0) {
return pBlock;
} else if (pRuntimeEnv->currentOffset >= pBlock->info.rows) {
pRuntimeEnv->currentOffset -= pBlock->info.rows;
} else {
int32_t remain = (int32_t)(pBlock->info.rows - pRuntimeEnv->currentOffset);
pBlock->info.rows = remain;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
int16_t bytes = pColInfoData->info.bytes;
memmove(pColInfoData->pData, pColInfoData->pData + bytes * pRuntimeEnv->currentOffset, remain * bytes);
}
pRuntimeEnv->currentOffset = 0;
return pBlock;
}
}
}
static SSDataBlock* doIntervalAgg(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
@ -5025,24 +5003,6 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
return pOperator;
}
SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
SOffsetOperatorInfo* pInfo = calloc(1, sizeof(SOffsetOperatorInfo));
pInfo->offset = pRuntimeEnv->pQueryAttr->limit.offset;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "OffsetOperator";
pOperator->operatorType = OP_Offset;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->upstream = upstream;
pOperator->exec = doOffset;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
return pOperator;
}
SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));

View File

@ -116,12 +116,8 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
taosArrayPush(plan, &op);
}
if (pQueryAttr->limit.offset > 0) {
op = OP_Offset;
taosArrayPush(plan, &op);
}
if (pQueryAttr->limit.limit > 0) {
if (pQueryAttr->limit.limit > 0 || pQueryAttr->limit.offset > 0) {
op = OP_Limit;
taosArrayPush(plan, &op);
}