|
|
|
@ -179,6 +179,7 @@ static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
|
|
|
|
|
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
|
|
|
|
|
static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput);
|
|
|
|
|
static void destroyArithOperatorInfo(void* param, int32_t numOfOutput);
|
|
|
|
|
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput);
|
|
|
|
|
|
|
|
|
|
static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock);
|
|
|
|
|
|
|
|
|
@ -1665,20 +1666,25 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
|
|
|
|
|
*(int64_t*) pRuntimeEnv->prevRow[0] = INT64_MIN;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
qDebug("QInfo:%p init runtime completed", pRuntimeEnv->qinfo);
|
|
|
|
|
qDebug("QInfo:%p init runtime environment completed", pRuntimeEnv->qinfo);
|
|
|
|
|
|
|
|
|
|
// group by normal column, sliding window query, interval query are handled by interval query processor
|
|
|
|
|
// interval (down sampling operation)
|
|
|
|
|
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
|
|
|
|
if (onlyQueryTags(pQuery)) { // do nothing for tags query
|
|
|
|
|
|
|
|
|
|
} else if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
|
|
|
|
if (pQuery->stableQuery) {
|
|
|
|
|
pRuntimeEnv->proot = createMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
|
|
|
|
|
pRuntimeEnv->proot = createMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner,
|
|
|
|
|
pQuery->pExpr1, pQuery->numOfOutput);
|
|
|
|
|
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
|
|
|
|
|
} else {
|
|
|
|
|
pRuntimeEnv->proot = createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
|
|
|
|
|
pRuntimeEnv->proot =
|
|
|
|
|
createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
|
|
|
|
|
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
|
|
|
|
|
|
|
|
|
|
if (pQuery->pExpr2 != NULL) {
|
|
|
|
|
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr2, pQuery->numOfExpr2);
|
|
|
|
|
pRuntimeEnv->proot =
|
|
|
|
|
createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr2, pQuery->numOfExpr2);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) {
|
|
|
|
@ -1688,7 +1694,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
} else if (pQuery->groupbyColumn) {
|
|
|
|
|
pRuntimeEnv->proot = createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
|
|
|
|
|
pRuntimeEnv->proot =
|
|
|
|
|
createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
|
|
|
|
|
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
|
|
|
|
|
|
|
|
|
|
if (pQuery->pExpr2 != NULL) {
|
|
|
|
@ -1696,9 +1703,11 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
|
|
|
|
|
}
|
|
|
|
|
} else if (isFixedOutputQuery(pQuery)) {
|
|
|
|
|
if (pQuery->stableQuery && !isTsCompQuery(pQuery)) {
|
|
|
|
|
pRuntimeEnv->proot = createMultiTableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
|
|
|
|
|
pRuntimeEnv->proot =
|
|
|
|
|
createMultiTableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
|
|
|
|
|
} else {
|
|
|
|
|
pRuntimeEnv->proot = createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
|
|
|
|
|
pRuntimeEnv->proot =
|
|
|
|
|
createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
|
|
|
|
@ -1709,7 +1718,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
|
|
|
|
|
} else { // diff/add/multiply/subtract/division
|
|
|
|
|
assert(pQuery->checkResultBuf == 1);
|
|
|
|
|
if (!onlyQueryTags(pQuery)) {
|
|
|
|
|
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
|
|
|
|
|
pRuntimeEnv->proot =
|
|
|
|
|
createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
|
|
|
|
|
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -3881,9 +3891,6 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
|
|
|
|
|
// TODO set the tags scan handle
|
|
|
|
|
if (onlyQueryTags(pQuery)) {
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
// } else if (isTsCompQuery(pQuery)) {
|
|
|
|
|
// setTableQueryHandle(pRuntimeEnv, 0);
|
|
|
|
|
// return terrno;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window);
|
|
|
|
@ -4011,22 +4018,13 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t ps = DEFAULT_PAGE_SIZE;
|
|
|
|
|
int32_t rowsize = 0;
|
|
|
|
|
getIntermediateBufInfo(pRuntimeEnv, &ps, &pQuery->intermediateResultRowSize);
|
|
|
|
|
int32_t TENMB = 1024*1024*10;
|
|
|
|
|
|
|
|
|
|
if (isSTableQuery && !onlyQueryTags(pQuery)) {
|
|
|
|
|
int32_t TENMB = 1024*1024*10;
|
|
|
|
|
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, ps, TENMB, pQInfo);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
} else if (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery) || (!isSTableQuery)) {
|
|
|
|
|
getIntermediateBufInfo(pRuntimeEnv, &ps, &rowsize);
|
|
|
|
|
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, ps, TENMB, pQInfo);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// create runtime environment
|
|
|
|
|
int32_t numOfTables = pQuery->tableGroupInfo.numOfTables;
|
|
|
|
@ -4916,6 +4914,11 @@ static void destroyArithOperatorInfo(void* param, int32_t numOfOutput) {
|
|
|
|
|
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
|
|
|
|
|
STagScanInfo* pInfo = (STagScanInfo*) param;
|
|
|
|
|
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
|
|
|
|
|
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
|
|
|
|
|
|
|
|
|
@ -5191,13 +5194,14 @@ static SSDataBlock* doTagScan(void* param) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
qDebug("QInfo:%p create (tableId, tag) info completed, rows:%d", pRuntimeEnv->qinfo, count);
|
|
|
|
|
} /*else if (functionId == TSDB_FUNC_COUNT) {// handle the "count(tbname)" query
|
|
|
|
|
*(int64_t*) pQuery->sdata[0]->data = num;
|
|
|
|
|
|
|
|
|
|
} else if (functionId == TSDB_FUNC_COUNT) {// handle the "count(tbname)" query
|
|
|
|
|
SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0);
|
|
|
|
|
*(int64_t*)pColInfo->pData = pInfo->totalTables;
|
|
|
|
|
count = 1;
|
|
|
|
|
SET_STABLE_QUERY_OVER(pRuntimeEnv);
|
|
|
|
|
|
|
|
|
|
pOperator->status = OP_EXEC_DONE;
|
|
|
|
|
qDebug("QInfo:%p create count(tbname) query, res:%d rows:1", pRuntimeEnv->qinfo, count);
|
|
|
|
|
}*/ else { // return only the tags|table name etc.
|
|
|
|
|
} else { // return only the tags|table name etc.
|
|
|
|
|
SExprInfo* pExprInfo = pOperator->pExpr; // todo use the column list instead of exprinfo
|
|
|
|
|
|
|
|
|
|
count = 0;
|
|
|
|
@ -5231,6 +5235,10 @@ static SSDataBlock* doTagScan(void* param) {
|
|
|
|
|
count += 1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pInfo->currentIndex >= pInfo->totalTables) {
|
|
|
|
|
pOperator->status = OP_EXEC_DONE;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
qDebug("QInfo:%p create tag values results completed, rows:%d", pRuntimeEnv->qinfo, count);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -5258,6 +5266,7 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf
|
|
|
|
|
pOperator->pExpr = pExpr;
|
|
|
|
|
pOperator->numOfOutput = numOfOutput;
|
|
|
|
|
pOperator->pRuntimeEnv = pRuntimeEnv;
|
|
|
|
|
pOperator->cleanup = destroyTagScanOperatorInfo;
|
|
|
|
|
|
|
|
|
|
return pOperator;
|
|
|
|
|
}
|
|
|
|
|