[td-4529] diff support super table query.
This commit is contained in:
parent
7082d8a5e0
commit
b0281578d5
|
@ -123,6 +123,7 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i
|
||||||
*/
|
*/
|
||||||
bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo);
|
bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo);
|
||||||
bool tscIsTWAQuery(SQueryInfo* pQueryInfo);
|
bool tscIsTWAQuery(SQueryInfo* pQueryInfo);
|
||||||
|
bool tscIsDiffQuery(SQueryInfo* pQueryInfo);
|
||||||
bool tscIsSessionWindowQuery(SQueryInfo* pQueryInfo);
|
bool tscIsSessionWindowQuery(SQueryInfo* pQueryInfo);
|
||||||
bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo);
|
bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo);
|
||||||
bool tsIsArithmeticQueryOnAggResult(SQueryInfo* pQueryInfo);
|
bool tsIsArithmeticQueryOnAggResult(SQueryInfo* pQueryInfo);
|
||||||
|
@ -132,7 +133,6 @@ bool hasTagValOutput(SQueryInfo* pQueryInfo);
|
||||||
bool timeWindowInterpoRequired(SQueryInfo *pQueryInfo);
|
bool timeWindowInterpoRequired(SQueryInfo *pQueryInfo);
|
||||||
bool isStabledev(SQueryInfo* pQueryInfo);
|
bool isStabledev(SQueryInfo* pQueryInfo);
|
||||||
bool isTsCompQuery(SQueryInfo* pQueryInfo);
|
bool isTsCompQuery(SQueryInfo* pQueryInfo);
|
||||||
bool isSimpleAggregate(SQueryInfo* pQueryInfo);
|
|
||||||
bool isBlockDistQuery(SQueryInfo* pQueryInfo);
|
bool isBlockDistQuery(SQueryInfo* pQueryInfo);
|
||||||
bool isSimpleAggregateRv(SQueryInfo* pQueryInfo);
|
bool isSimpleAggregateRv(SQueryInfo* pQueryInfo);
|
||||||
|
|
||||||
|
@ -331,7 +331,7 @@ SVgroupsInfo* tscVgroupsInfoDup(SVgroupsInfo* pVgroupsInfo);
|
||||||
int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAttr, void* addr);
|
int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAttr, void* addr);
|
||||||
|
|
||||||
void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchema* pSchema);
|
void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchema* pSchema);
|
||||||
void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr, int32_t stage);
|
void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr, int32_t stage, uint64_t qId);
|
||||||
|
|
||||||
void* malloc_throw(size_t size);
|
void* malloc_throw(size_t size);
|
||||||
void* calloc_throw(size_t nmemb, size_t size);
|
void* calloc_throw(size_t nmemb, size_t size);
|
||||||
|
|
|
@ -319,7 +319,7 @@ int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
|
||||||
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo);
|
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo);
|
||||||
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock);
|
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock);
|
||||||
|
|
||||||
void handleDownstreamOperator(SSqlObj** pSqlList, int32_t numOfUpstream, SQueryInfo* px, SSqlRes* pOutput);
|
void handleDownstreamOperator(SSqlObj** pSqlList, int32_t numOfUpstream, SQueryInfo* px, SSqlObj* pParent);
|
||||||
void destroyTableNameList(SInsertStatementParam* pInsertParam);
|
void destroyTableNameList(SInsertStatementParam* pInsertParam);
|
||||||
|
|
||||||
void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta);
|
void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta);
|
||||||
|
|
|
@ -144,7 +144,7 @@ static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// local merge has handle this situation during super table non-projection query.
|
// local merge has handle this situation during super table non-projection query.
|
||||||
if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE) {
|
if (pCmd->command != TSDB_SQL_RETRIEVE_GLOBALMERGE) {
|
||||||
pRes->numOfClauseTotal += pRes->numOfRows;
|
pRes->numOfClauseTotal += pRes->numOfRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -174,7 +174,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
|
||||||
}
|
}
|
||||||
|
|
||||||
pSql->fp = fp;
|
pSql->fp = fp;
|
||||||
if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
|
if (pCmd->command != TSDB_SQL_RETRIEVE_GLOBALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
|
||||||
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
|
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -257,14 +257,14 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
return;
|
return;
|
||||||
} else if (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE) {
|
} else if (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_GLOBALMERGE) {
|
||||||
// in case of show command, return no data
|
// in case of show command, return no data
|
||||||
(*pSql->fetchFp)(param, pSql, 0);
|
(*pSql->fetchFp)(param, pSql, 0);
|
||||||
} else {
|
} else {
|
||||||
assert(0);
|
assert(0);
|
||||||
}
|
}
|
||||||
} else { // current query is not completed, continue retrieve from node
|
} else { // current query is not completed, continue retrieve from node
|
||||||
if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
|
if (pCmd->command != TSDB_SQL_RETRIEVE_GLOBALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
|
||||||
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
|
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -323,7 +323,7 @@ TAOS_ROW tscFetchRow(void *param) {
|
||||||
// current data set are exhausted, fetch more data from node
|
// current data set are exhausted, fetch more data from node
|
||||||
if (pRes->row >= pRes->numOfRows && (pRes->completed != true || hasMoreVnodesToTry(pSql) || hasMoreClauseToTry(pSql)) &&
|
if (pRes->row >= pRes->numOfRows && (pRes->completed != true || hasMoreVnodesToTry(pSql) || hasMoreClauseToTry(pSql)) &&
|
||||||
(pCmd->command == TSDB_SQL_RETRIEVE ||
|
(pCmd->command == TSDB_SQL_RETRIEVE ||
|
||||||
pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE ||
|
pCmd->command == TSDB_SQL_RETRIEVE_GLOBALMERGE ||
|
||||||
pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE ||
|
pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE ||
|
||||||
pCmd->command == TSDB_SQL_FETCH ||
|
pCmd->command == TSDB_SQL_FETCH ||
|
||||||
pCmd->command == TSDB_SQL_SHOW ||
|
pCmd->command == TSDB_SQL_SHOW ||
|
||||||
|
|
|
@ -2928,8 +2928,8 @@ void tscRestoreFuncForSTableQuery(SQueryInfo* pQueryInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
|
bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
|
||||||
const char* msg1 = "TWA not allowed to apply to super table directly";
|
const char* msg1 = "TWA/Diff not allowed to apply to super table directly";
|
||||||
const char* msg2 = "TWA only support group by tbname for super table query";
|
const char* msg2 = "TWA/Diff only support group by tbname for super table query";
|
||||||
const char* msg3 = "function not support for super table query";
|
const char* msg3 = "function not support for super table query";
|
||||||
|
|
||||||
// filter sql function not supported by metric query yet.
|
// filter sql function not supported by metric query yet.
|
||||||
|
@ -2942,7 +2942,7 @@ bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tscIsTWAQuery(pQueryInfo)) {
|
if (tscIsTWAQuery(pQueryInfo) || tscIsDiffQuery(pQueryInfo)) {
|
||||||
if (pQueryInfo->groupbyExpr.numOfGroupCols == 0) {
|
if (pQueryInfo->groupbyExpr.numOfGroupCols == 0) {
|
||||||
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||||
return true;
|
return true;
|
||||||
|
@ -6276,7 +6276,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (IS_MULTIOUTPUT(aAggs[functId].status) && functId != TSDB_FUNC_TOP && functId != TSDB_FUNC_BOTTOM &&
|
if (IS_MULTIOUTPUT(aAggs[functId].status) && functId != TSDB_FUNC_TOP && functId != TSDB_FUNC_BOTTOM &&
|
||||||
functId != TSDB_FUNC_TAGPRJ && functId != TSDB_FUNC_PRJ) {
|
functId != TSDB_FUNC_DIFF && functId != TSDB_FUNC_TAGPRJ && functId != TSDB_FUNC_PRJ) {
|
||||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6758,7 +6758,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
|
||||||
const char* msg6 = "from missing in subclause";
|
const char* msg6 = "from missing in subclause";
|
||||||
const char* msg7 = "time interval is required";
|
const char* msg7 = "time interval is required";
|
||||||
const char* msg8 = "the first column should be primary timestamp column";
|
const char* msg8 = "the first column should be primary timestamp column";
|
||||||
|
|
||||||
SSqlCmd* pCmd = &pSql->cmd;
|
SSqlCmd* pCmd = &pSql->cmd;
|
||||||
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
|
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
|
||||||
assert(pQueryInfo->numOfTables == 1);
|
assert(pQueryInfo->numOfTables == 1);
|
||||||
|
@ -7719,6 +7719,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
|
||||||
|
|
||||||
pQueryInfo->arithmeticOnAgg = tsIsArithmeticQueryOnAggResult(pQueryInfo);
|
pQueryInfo->arithmeticOnAgg = tsIsArithmeticQueryOnAggResult(pQueryInfo);
|
||||||
pQueryInfo->orderProjectQuery = tscOrderedProjectionQueryOnSTable(pQueryInfo, 0);
|
pQueryInfo->orderProjectQuery = tscOrderedProjectionQueryOnSTable(pQueryInfo, 0);
|
||||||
|
pQueryInfo->diffQuery = tscIsDiffQuery(pQueryInfo);
|
||||||
|
|
||||||
SExprInfo** p = NULL;
|
SExprInfo** p = NULL;
|
||||||
int32_t numOfExpr = 0;
|
int32_t numOfExpr = 0;
|
||||||
|
|
|
@ -1588,7 +1588,7 @@ int tscProcessLocalRetrieveRsp(SSqlObj *pSql) {
|
||||||
return tscLocalResultCommonBuilder(pSql, numOfRes);
|
return tscLocalResultCommonBuilder(pSql, numOfRes);
|
||||||
}
|
}
|
||||||
|
|
||||||
int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
|
int tscProcessRetrieveGlobalMergeRsp(SSqlObj *pSql) {
|
||||||
SSqlRes *pRes = &pSql->res;
|
SSqlRes *pRes = &pSql->res;
|
||||||
SSqlCmd* pCmd = &pSql->cmd;
|
SSqlCmd* pCmd = &pSql->cmd;
|
||||||
|
|
||||||
|
@ -1615,10 +1615,11 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
|
||||||
taosArrayPush(group, &tableKeyInfo);
|
taosArrayPush(group, &tableKeyInfo);
|
||||||
taosArrayPush(tableGroupInfo.pGroupList, &group);
|
taosArrayPush(tableGroupInfo.pGroupList, &group);
|
||||||
|
|
||||||
pQueryInfo->pQInfo = createQInfoFromQueryNode(pQueryInfo, &tableGroupInfo, NULL, NULL, pRes->pMerger, MERGE_STAGE);
|
tscDebug("0x%"PRIx64" create QInfo 0x%"PRIx64" to execute query processing", pSql->self, pSql->self);
|
||||||
|
pQueryInfo->pQInfo = createQInfoFromQueryNode(pQueryInfo, &tableGroupInfo, NULL, NULL, pRes->pMerger, MERGE_STAGE, pSql->self);
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t localQueryId = 0;
|
uint64_t localQueryId = pSql->self;
|
||||||
qTableQuery(pQueryInfo->pQInfo, &localQueryId);
|
qTableQuery(pQueryInfo->pQInfo, &localQueryId);
|
||||||
convertQueryResult(pRes, pQueryInfo);
|
convertQueryResult(pRes, pQueryInfo);
|
||||||
|
|
||||||
|
@ -2689,7 +2690,7 @@ void tscInitMsgsFp() {
|
||||||
|
|
||||||
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;
|
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;
|
||||||
|
|
||||||
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_LOCALMERGE] = tscProcessRetrieveLocalMergeRsp;
|
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_GLOBALMERGE] = tscProcessRetrieveGlobalMergeRsp;
|
||||||
|
|
||||||
tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
|
tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
|
||||||
tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;
|
tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;
|
||||||
|
|
|
@ -456,7 +456,7 @@ static bool needToFetchNewBlock(SSqlObj* pSql) {
|
||||||
|
|
||||||
return (pRes->completed != true || hasMoreVnodesToTry(pSql) || hasMoreClauseToTry(pSql)) &&
|
return (pRes->completed != true || hasMoreVnodesToTry(pSql) || hasMoreClauseToTry(pSql)) &&
|
||||||
(pCmd->command == TSDB_SQL_RETRIEVE ||
|
(pCmd->command == TSDB_SQL_RETRIEVE ||
|
||||||
pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE ||
|
pCmd->command == TSDB_SQL_RETRIEVE_GLOBALMERGE ||
|
||||||
pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE ||
|
pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE ||
|
||||||
pCmd->command == TSDB_SQL_FETCH ||
|
pCmd->command == TSDB_SQL_FETCH ||
|
||||||
pCmd->command == TSDB_SQL_SHOW ||
|
pCmd->command == TSDB_SQL_SHOW ||
|
||||||
|
|
|
@ -2422,7 +2422,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
|
||||||
|
|
||||||
// pRes->code check only serves in launching metric sub-queries
|
// pRes->code check only serves in launching metric sub-queries
|
||||||
if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
|
if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
|
||||||
pCmd->command = TSDB_SQL_RETRIEVE_LOCALMERGE; // enable the abort of kill super table function.
|
pCmd->command = TSDB_SQL_RETRIEVE_GLOBALMERGE; // enable the abort of kill super table function.
|
||||||
return pRes->code;
|
return pRes->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2778,7 +2778,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
|
||||||
if (code == TSDB_CODE_SUCCESS && trsupport->pExtMemBuffer == NULL) {
|
if (code == TSDB_CODE_SUCCESS && trsupport->pExtMemBuffer == NULL) {
|
||||||
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; // no result, set the result empty
|
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; // no result, set the result empty
|
||||||
} else {
|
} else {
|
||||||
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE;
|
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_GLOBALMERGE;
|
||||||
}
|
}
|
||||||
|
|
||||||
tscCreateResPointerInfo(&pParentSql->res, pPQueryInfo);
|
tscCreateResPointerInfo(&pParentSql->res, pPQueryInfo);
|
||||||
|
@ -3500,7 +3500,7 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pSourceOperator,
|
void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pSourceOperator,
|
||||||
char* sql, void* merger, int32_t stage) {
|
char* sql, void* merger, int32_t stage, uint64_t qId) {
|
||||||
assert(pQueryInfo != NULL);
|
assert(pQueryInfo != NULL);
|
||||||
SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo));
|
SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo));
|
||||||
if (pQInfo == NULL) {
|
if (pQInfo == NULL) {
|
||||||
|
@ -3509,7 +3509,7 @@ void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGr
|
||||||
|
|
||||||
// to make sure third party won't overwrite this structure
|
// to make sure third party won't overwrite this structure
|
||||||
pQInfo->signature = pQInfo;
|
pQInfo->signature = pQInfo;
|
||||||
|
pQInfo->qId = qId;
|
||||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||||
SQueryAttr *pQueryAttr = &pQInfo->query;
|
SQueryAttr *pQueryAttr = &pQInfo->query;
|
||||||
|
|
||||||
|
|
|
@ -222,6 +222,8 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
|
||||||
functionId != TSDB_FUNC_TS &&
|
functionId != TSDB_FUNC_TS &&
|
||||||
functionId != TSDB_FUNC_ARITHM &&
|
functionId != TSDB_FUNC_ARITHM &&
|
||||||
functionId != TSDB_FUNC_TS_COMP &&
|
functionId != TSDB_FUNC_TS_COMP &&
|
||||||
|
functionId != TSDB_FUNC_DIFF &&
|
||||||
|
functionId != TSDB_FUNC_TS_DUMMY &&
|
||||||
functionId != TSDB_FUNC_TID_TAG) {
|
functionId != TSDB_FUNC_TID_TAG) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -434,6 +436,23 @@ bool tscIsTWAQuery(SQueryInfo* pQueryInfo) {
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool tscIsDiffQuery(SQueryInfo* pQueryInfo) {
|
||||||
|
size_t num = tscNumOfExprs(pQueryInfo);
|
||||||
|
for(int32_t i = 0; i < num; ++i) {
|
||||||
|
SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
|
||||||
|
if (pExpr == NULL || pExpr->base.functionId == TSDB_FUNC_TS_DUMMY) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pExpr->base.functionId == TSDB_FUNC_DIFF) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
bool tscIsSessionWindowQuery(SQueryInfo* pQueryInfo) {
|
bool tscIsSessionWindowQuery(SQueryInfo* pQueryInfo) {
|
||||||
return pQueryInfo->sessionWindow.gap > 0;
|
return pQueryInfo->sessionWindow.gap > 0;
|
||||||
}
|
}
|
||||||
|
@ -467,42 +486,12 @@ bool tscNeedReverseScan(SQueryInfo* pQueryInfo) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool isSimpleAggregate(SQueryInfo* pQueryInfo) {
|
|
||||||
if (pQueryInfo->interval.interval > 0) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Note:top/bottom query is fixed output query
|
|
||||||
if (tscIsTopBotQuery(pQueryInfo) || tscGroupbyColumn(pQueryInfo) || isTsCompQuery(pQueryInfo)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t numOfExprs = tscNumOfExprs(pQueryInfo);
|
|
||||||
for (int32_t i = 0; i < numOfExprs; ++i) {
|
|
||||||
SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
|
|
||||||
if (pExpr == NULL) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t functionId = pExpr->base.functionId;
|
|
||||||
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!IS_MULTIOUTPUT(aAggs[functionId].status)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool isSimpleAggregateRv(SQueryInfo* pQueryInfo) {
|
bool isSimpleAggregateRv(SQueryInfo* pQueryInfo) {
|
||||||
if (pQueryInfo->interval.interval > 0 || pQueryInfo->sessionWindow.gap > 0) {
|
if (pQueryInfo->interval.interval > 0 || pQueryInfo->sessionWindow.gap > 0) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tscGroupbyColumn(pQueryInfo) || isTsCompQuery(pQueryInfo)) {
|
if (tscGroupbyColumn(pQueryInfo) || isTsCompQuery(pQueryInfo) || tscIsTopBotQuery(pQueryInfo) || tscIsDiffQuery(pQueryInfo)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -812,6 +801,7 @@ static void fetchNextBlockIfCompleted(SOperatorInfo* pOperator, bool* newgroup)
|
||||||
for (int32_t i = 0; i < pOperator->numOfUpstream; ++i) {
|
for (int32_t i = 0; i < pOperator->numOfUpstream; ++i) {
|
||||||
SJoinStatus* pStatus = &pJoinInfo->status[i];
|
SJoinStatus* pStatus = &pJoinInfo->status[i];
|
||||||
if (pStatus->pBlock == NULL || pStatus->index >= pStatus->pBlock->info.rows) {
|
if (pStatus->pBlock == NULL || pStatus->index >= pStatus->pBlock->info.rows) {
|
||||||
|
tscDebug("Retrieve nest query result, index:%d, total:%d", i, pOperator->numOfUpstream);
|
||||||
pStatus->pBlock = pOperator->upstream[i]->exec(pOperator->upstream[i], newgroup);
|
pStatus->pBlock = pOperator->upstream[i]->exec(pOperator->upstream[i], newgroup);
|
||||||
pStatus->index = 0;
|
pStatus->index = 0;
|
||||||
|
|
||||||
|
@ -1088,7 +1078,9 @@ static void createInputDataFilterInfo(SQueryInfo* px, int32_t numOfCol1, int32_t
|
||||||
tfree(tableCols);
|
tfree(tableCols);
|
||||||
}
|
}
|
||||||
|
|
||||||
void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQueryInfo* px, SSqlRes* pOutput) {
|
void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQueryInfo* px, SSqlObj* pSql) {
|
||||||
|
SSqlRes* pOutput = &pSql->res;
|
||||||
|
|
||||||
// handle the following query process
|
// handle the following query process
|
||||||
if (px->pQInfo == NULL) {
|
if (px->pQInfo == NULL) {
|
||||||
SColumnInfo* pColumnInfo = extractColumnInfoFromResult(px->colList);
|
SColumnInfo* pColumnInfo = extractColumnInfoFromResult(px->colList);
|
||||||
|
@ -1166,7 +1158,9 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
px->pQInfo = createQInfoFromQueryNode(px, &tableGroupInfo, pSourceOperator, NULL, NULL, MASTER_SCAN);
|
tscDebug("0x%"PRIx64" create QInfo 0x%"PRIx64" to execute the main query while all nest query is ready", pSql->self, pSql->self);
|
||||||
|
px->pQInfo = createQInfoFromQueryNode(px, &tableGroupInfo, pSourceOperator, NULL, NULL, MASTER_SCAN, pSql->self);
|
||||||
|
|
||||||
tfree(pColumnInfo);
|
tfree(pColumnInfo);
|
||||||
tfree(schema);
|
tfree(schema);
|
||||||
|
|
||||||
|
@ -1174,7 +1168,7 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
|
||||||
pSourceOperator->pRuntimeEnv = &px->pQInfo->runtimeEnv;
|
pSourceOperator->pRuntimeEnv = &px->pQInfo->runtimeEnv;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t qId = 0;
|
uint64_t qId = pSql->self;
|
||||||
qTableQuery(px->pQInfo, &qId);
|
qTableQuery(px->pQInfo, &qId);
|
||||||
convertQueryResult(pOutput, px);
|
convertQueryResult(pOutput, px);
|
||||||
}
|
}
|
||||||
|
@ -1372,7 +1366,7 @@ void tscFreeSqlObj(SSqlObj* pSql) {
|
||||||
|
|
||||||
SSqlCmd* pCmd = &pSql->cmd;
|
SSqlCmd* pCmd = &pSql->cmd;
|
||||||
int32_t cmd = pCmd->command;
|
int32_t cmd = pCmd->command;
|
||||||
if (cmd < TSDB_SQL_INSERT || cmd == TSDB_SQL_RETRIEVE_LOCALMERGE || cmd == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
|
if (cmd < TSDB_SQL_INSERT || cmd == TSDB_SQL_RETRIEVE_GLOBALMERGE || cmd == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
|
||||||
cmd == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
|
cmd == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
|
||||||
tscRemoveFromSqlList(pSql);
|
tscRemoveFromSqlList(pSql);
|
||||||
}
|
}
|
||||||
|
@ -3436,7 +3430,7 @@ void doRetrieveSubqueryData(SSchedMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd);
|
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd);
|
||||||
handleDownstreamOperator(pSql->pSubs, pSql->subState.numOfSub, pQueryInfo, &pSql->res);
|
handleDownstreamOperator(pSql->pSubs, pSql->subState.numOfSub, pQueryInfo, pSql);
|
||||||
|
|
||||||
pSql->res.qId = -1;
|
pSql->res.qId = -1;
|
||||||
if (pSql->res.code == TSDB_CODE_SUCCESS) {
|
if (pSql->res.code == TSDB_CODE_SUCCESS) {
|
||||||
|
@ -4236,10 +4230,11 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
|
||||||
pQueryAttr->hasTagResults = hasTagValOutput(pQueryInfo);
|
pQueryAttr->hasTagResults = hasTagValOutput(pQueryInfo);
|
||||||
pQueryAttr->stabledev = isStabledev(pQueryInfo);
|
pQueryAttr->stabledev = isStabledev(pQueryInfo);
|
||||||
pQueryAttr->tsCompQuery = isTsCompQuery(pQueryInfo);
|
pQueryAttr->tsCompQuery = isTsCompQuery(pQueryInfo);
|
||||||
pQueryAttr->simpleAgg = isSimpleAggregate(pQueryInfo);
|
pQueryAttr->diffQuery = pQueryInfo->diffQuery;
|
||||||
|
pQueryAttr->simpleAgg = pQueryInfo->simpleAgg;
|
||||||
pQueryAttr->needReverseScan = tscNeedReverseScan(pQueryInfo);
|
pQueryAttr->needReverseScan = tscNeedReverseScan(pQueryInfo);
|
||||||
pQueryAttr->stableQuery = QUERY_IS_STABLE_QUERY(pQueryInfo->type);
|
pQueryAttr->stableQuery = QUERY_IS_STABLE_QUERY(pQueryInfo->type);
|
||||||
pQueryAttr->groupbyColumn = (!pQueryInfo->stateWindow) && tscGroupbyColumn(pQueryInfo);
|
pQueryAttr->groupbyColumn = (!pQueryInfo->stateWindow) && pQueryInfo->groupbyColumn;
|
||||||
pQueryAttr->queryBlockDist = isBlockDistQuery(pQueryInfo);
|
pQueryAttr->queryBlockDist = isBlockDistQuery(pQueryInfo);
|
||||||
pQueryAttr->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo);
|
pQueryAttr->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo);
|
||||||
pQueryAttr->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo);
|
pQueryAttr->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo);
|
||||||
|
@ -4255,7 +4250,6 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
|
||||||
pQueryAttr->fillType = pQueryInfo->fillType;
|
pQueryAttr->fillType = pQueryInfo->fillType;
|
||||||
pQueryAttr->havingNum = pQueryInfo->havingFieldNum;
|
pQueryAttr->havingNum = pQueryInfo->havingFieldNum;
|
||||||
|
|
||||||
|
|
||||||
if (pQueryInfo->order.order == TSDB_ORDER_ASC) { // TODO refactor
|
if (pQueryInfo->order.order == TSDB_ORDER_ASC) { // TODO refactor
|
||||||
pQueryAttr->window = pQueryInfo->window;
|
pQueryAttr->window = pQueryInfo->window;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -76,7 +76,7 @@ enum {
|
||||||
// SQL below for client local
|
// SQL below for client local
|
||||||
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_LOCAL, "local" )
|
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_LOCAL, "local" )
|
||||||
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DESCRIBE_TABLE, "describe-table" )
|
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DESCRIBE_TABLE, "describe-table" )
|
||||||
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RETRIEVE_LOCALMERGE, "retrieve-localmerge" )
|
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RETRIEVE_GLOBALMERGE, "retrieve-globalmerge" )
|
||||||
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_TABLE_JOIN_RETRIEVE, "join-retrieve" )
|
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_TABLE_JOIN_RETRIEVE, "join-retrieve" )
|
||||||
|
|
||||||
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SHOW_CREATE_TABLE, "show-create-table")
|
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SHOW_CREATE_TABLE, "show-create-table")
|
||||||
|
|
|
@ -185,6 +185,7 @@ typedef struct SQueryAttr {
|
||||||
bool queryBlockDist; // if query data block distribution
|
bool queryBlockDist; // if query data block distribution
|
||||||
bool stabledev; // super table stddev query
|
bool stabledev; // super table stddev query
|
||||||
bool tsCompQuery; // is tscomp query
|
bool tsCompQuery; // is tscomp query
|
||||||
|
bool diffQuery; // is diff query
|
||||||
bool simpleAgg;
|
bool simpleAgg;
|
||||||
bool pointInterpQuery; // point interpolation query
|
bool pointInterpQuery; // point interpolation query
|
||||||
bool needReverseScan; // need reverse scan
|
bool needReverseScan; // need reverse scan
|
||||||
|
@ -386,6 +387,7 @@ typedef struct STableScanInfo {
|
||||||
int64_t elapsedTime;
|
int64_t elapsedTime;
|
||||||
|
|
||||||
int32_t tableIndex;
|
int32_t tableIndex;
|
||||||
|
int32_t prevGroupId; // previous table group id
|
||||||
} STableScanInfo;
|
} STableScanInfo;
|
||||||
|
|
||||||
typedef struct STagScanInfo {
|
typedef struct STagScanInfo {
|
||||||
|
|
|
@ -138,6 +138,7 @@ typedef struct SQueryInfo {
|
||||||
bool hasFilter;
|
bool hasFilter;
|
||||||
bool onlyTagQuery;
|
bool onlyTagQuery;
|
||||||
bool orderProjectQuery;
|
bool orderProjectQuery;
|
||||||
|
bool diffQuery;
|
||||||
bool stateWindow;
|
bool stateWindow;
|
||||||
} SQueryInfo;
|
} SQueryInfo;
|
||||||
|
|
||||||
|
|
|
@ -3393,7 +3393,7 @@ enum {
|
||||||
};
|
};
|
||||||
|
|
||||||
static bool diff_function_setup(SQLFunctionCtx *pCtx) {
|
static bool diff_function_setup(SQLFunctionCtx *pCtx) {
|
||||||
if (function_setup(pCtx)) {
|
if (!function_setup(pCtx)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4606,7 +4606,7 @@ static void rate_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
||||||
|
|
||||||
pRateInfo->lastValue = v;
|
pRateInfo->lastValue = v;
|
||||||
pRateInfo->lastKey = primaryKey[index];
|
pRateInfo->lastKey = primaryKey[index];
|
||||||
|
|
||||||
SET_VAL(pCtx, 1, 1);
|
SET_VAL(pCtx, 1, 1);
|
||||||
|
|
||||||
// set has result flag
|
// set has result flag
|
||||||
|
@ -4630,7 +4630,7 @@ static void rate_func_copy(SQLFunctionCtx *pCtx) {
|
||||||
static void rate_finalizer(SQLFunctionCtx *pCtx) {
|
static void rate_finalizer(SQLFunctionCtx *pCtx) {
|
||||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||||
SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo);
|
SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
||||||
if (pRateInfo->hasResult != DATA_SET_FLAG) {
|
if (pRateInfo->hasResult != DATA_SET_FLAG) {
|
||||||
setNull(pCtx->pOutput, TSDB_DATA_TYPE_DOUBLE, sizeof(double));
|
setNull(pCtx->pOutput, TSDB_DATA_TYPE_DOUBLE, sizeof(double));
|
||||||
return;
|
return;
|
||||||
|
@ -5215,7 +5215,7 @@ SAggFunctionInfo aAggs[] = {{
|
||||||
"diff",
|
"diff",
|
||||||
TSDB_FUNC_DIFF,
|
TSDB_FUNC_DIFF,
|
||||||
TSDB_FUNC_INVALID_ID,
|
TSDB_FUNC_INVALID_ID,
|
||||||
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_NEED_TS,
|
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS,
|
||||||
diff_function_setup,
|
diff_function_setup,
|
||||||
diff_function,
|
diff_function,
|
||||||
diff_function_f,
|
diff_function_f,
|
||||||
|
|
|
@ -2652,7 +2652,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
|
||||||
pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) {
|
pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) {
|
||||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
} else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery)) { // stable aggregate, not interval aggregate or normal column aggregate
|
} else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery)) { // stable aggregate, not interval aggregate or normal column aggregate
|
||||||
doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx,
|
doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx,
|
||||||
pTableScanInfo->rowCellInfoOffset, pTableScanInfo->numOfOutput,
|
pTableScanInfo->rowCellInfoOffset, pTableScanInfo->numOfOutput,
|
||||||
pRuntimeEnv->current->groupIndex);
|
pRuntimeEnv->current->groupIndex);
|
||||||
|
@ -4273,6 +4273,12 @@ static SSDataBlock* doTableScanImpl(void* param, bool* newgroup) {
|
||||||
|
|
||||||
pRuntimeEnv->current = *pTableQueryInfo;
|
pRuntimeEnv->current = *pTableQueryInfo;
|
||||||
doTableQueryInfoTimeWindowCheck(pQueryAttr, *pTableQueryInfo);
|
doTableQueryInfoTimeWindowCheck(pQueryAttr, *pTableQueryInfo);
|
||||||
|
|
||||||
|
if (pTableScanInfo->prevGroupId != -1 && pTableScanInfo->prevGroupId != (*pTableQueryInfo)->groupIndex) {
|
||||||
|
*newgroup = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
pTableScanInfo->prevGroupId = (*pTableQueryInfo)->groupIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
// this function never returns error?
|
// this function never returns error?
|
||||||
|
@ -4419,6 +4425,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
|
||||||
pInfo->reverseTimes = 0;
|
pInfo->reverseTimes = 0;
|
||||||
pInfo->order = pRuntimeEnv->pQueryAttr->order.order;
|
pInfo->order = pRuntimeEnv->pQueryAttr->order.order;
|
||||||
pInfo->current = 0;
|
pInfo->current = 0;
|
||||||
|
// pInfo->prevGroupId = -1;
|
||||||
|
|
||||||
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
||||||
pOperator->name = "TableScanOperator";
|
pOperator->name = "TableScanOperator";
|
||||||
|
@ -4441,6 +4448,7 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE
|
||||||
pInfo->reverseTimes = 0;
|
pInfo->reverseTimes = 0;
|
||||||
pInfo->order = pRuntimeEnv->pQueryAttr->order.order;
|
pInfo->order = pRuntimeEnv->pQueryAttr->order.order;
|
||||||
pInfo->current = 0;
|
pInfo->current = 0;
|
||||||
|
pInfo->prevGroupId = -1;
|
||||||
|
|
||||||
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
||||||
pOperator->name = "TableSeqScanOperator";
|
pOperator->name = "TableSeqScanOperator";
|
||||||
|
@ -4545,6 +4553,7 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime
|
||||||
pInfo->reverseTimes = reverseTime;
|
pInfo->reverseTimes = reverseTime;
|
||||||
pInfo->current = 0;
|
pInfo->current = 0;
|
||||||
pInfo->order = pRuntimeEnv->pQueryAttr->order.order;
|
pInfo->order = pRuntimeEnv->pQueryAttr->order.order;
|
||||||
|
// pInfo->prevGroupId = -1;
|
||||||
|
|
||||||
SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo));
|
||||||
pOptr->name = "DataBlocksOptimizedScanOperator";
|
pOptr->name = "DataBlocksOptimizedScanOperator";
|
||||||
|
@ -4923,10 +4932,17 @@ static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return result of the previous group in the firstly.
|
// Return result of the previous group in the firstly.
|
||||||
if (*newgroup && pRes->info.rows > 0) {
|
if (*newgroup) {
|
||||||
pArithInfo->existDataBlock = pBlock;
|
if (pRes->info.rows > 0) {
|
||||||
clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput);
|
pArithInfo->existDataBlock = pBlock;
|
||||||
return pInfo->pRes;
|
clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput);
|
||||||
|
return pInfo->pRes;
|
||||||
|
} else { // init output buffer for a new group data
|
||||||
|
for (int32_t j = 0; j < pOperator->numOfOutput; ++j) {
|
||||||
|
aAggs[pInfo->pCtx[j].functionId].xFinalize(&pInfo->pCtx[j]);
|
||||||
|
}
|
||||||
|
initCtxOutputBuffer(pInfo->pCtx, pOperator->numOfOutput);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
|
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
|
||||||
|
@ -7442,7 +7458,6 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
|
||||||
doCopyQueryResultToMsg(pQInfo, (int32_t)pRuntimeEnv->outputBuf->info.rows, data);
|
doCopyQueryResultToMsg(pQInfo, (int32_t)pRuntimeEnv->outputBuf->info.rows, data);
|
||||||
}
|
}
|
||||||
|
|
||||||
pRuntimeEnv->resultInfo.total += pRuntimeEnv->outputBuf->info.rows;
|
|
||||||
qDebug("QInfo:0x%"PRIx64" current numOfRes rows:%d, total:%" PRId64, pQInfo->qId,
|
qDebug("QInfo:0x%"PRIx64" current numOfRes rows:%d, total:%" PRId64, pQInfo->qId,
|
||||||
pRuntimeEnv->outputBuf->info.rows, pRuntimeEnv->resultInfo.total);
|
pRuntimeEnv->outputBuf->info.rows, pRuntimeEnv->resultInfo.total);
|
||||||
|
|
||||||
|
|
|
@ -531,7 +531,7 @@ SArray* createTableScanPlan(SQueryAttr* pQueryAttr) {
|
||||||
} else {
|
} else {
|
||||||
if (pQueryAttr->queryBlockDist) {
|
if (pQueryAttr->queryBlockDist) {
|
||||||
op = OP_TableBlockInfoScan;
|
op = OP_TableBlockInfoScan;
|
||||||
} else if (pQueryAttr->tsCompQuery || pQueryAttr->pointInterpQuery) {
|
} else if (pQueryAttr->tsCompQuery || pQueryAttr->pointInterpQuery || pQueryAttr->diffQuery) {
|
||||||
op = OP_TableSeqScan;
|
op = OP_TableSeqScan;
|
||||||
} else if (pQueryAttr->needReverseScan) {
|
} else if (pQueryAttr->needReverseScan) {
|
||||||
op = OP_DataBlocksOptScan;
|
op = OP_DataBlocksOptScan;
|
||||||
|
@ -605,7 +605,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
|
||||||
taosArrayPush(plan, &op);
|
taosArrayPush(plan, &op);
|
||||||
}
|
}
|
||||||
} else if (pQueryAttr->simpleAgg) {
|
} else if (pQueryAttr->simpleAgg) {
|
||||||
if (pQueryAttr->stableQuery && !pQueryAttr->tsCompQuery) {
|
if (pQueryAttr->stableQuery && !pQueryAttr->tsCompQuery && !pQueryAttr->diffQuery) {
|
||||||
op = OP_MultiTableAggregate;
|
op = OP_MultiTableAggregate;
|
||||||
} else {
|
} else {
|
||||||
op = OP_Aggregate;
|
op = OP_Aggregate;
|
||||||
|
|
|
@ -241,15 +241,16 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
|
||||||
|
|
||||||
bool newgroup = false;
|
bool newgroup = false;
|
||||||
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot, &newgroup);
|
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot, &newgroup);
|
||||||
|
pRuntimeEnv->resultInfo.total += GET_NUM_OF_RESULTS(pRuntimeEnv);
|
||||||
|
|
||||||
if (isQueryKilled(pQInfo)) {
|
if (isQueryKilled(pQInfo)) {
|
||||||
qDebug("QInfo:0x%"PRIx64" query is killed", pQInfo->qId);
|
qDebug("QInfo:0x%"PRIx64" query is killed", pQInfo->qId);
|
||||||
} else if (GET_NUM_OF_RESULTS(pRuntimeEnv) == 0) {
|
} else if (GET_NUM_OF_RESULTS(pRuntimeEnv) == 0) {
|
||||||
qDebug("QInfo:0x%"PRIx64" over, %u tables queried, %"PRId64" rows are returned", pQInfo->qId, pRuntimeEnv->tableqinfoGroupInfo.numOfTables,
|
qDebug("QInfo:0x%"PRIx64" over, %u tables queried, total %"PRId64" rows returned", pQInfo->qId, pRuntimeEnv->tableqinfoGroupInfo.numOfTables,
|
||||||
pRuntimeEnv->resultInfo.total);
|
pRuntimeEnv->resultInfo.total);
|
||||||
} else {
|
} else {
|
||||||
qDebug("QInfo:0x%"PRIx64" query paused, %d rows returned, numOfTotal:%" PRId64 " rows",
|
qDebug("QInfo:0x%"PRIx64" query paused, %d rows returned, total:%" PRId64 " rows", pQInfo->qId,
|
||||||
pQInfo->qId, GET_NUM_OF_RESULTS(pRuntimeEnv), pRuntimeEnv->resultInfo.total + GET_NUM_OF_RESULTS(pRuntimeEnv));
|
GET_NUM_OF_RESULTS(pRuntimeEnv), pRuntimeEnv->resultInfo.total);
|
||||||
}
|
}
|
||||||
|
|
||||||
return doBuildResCheck(pQInfo);
|
return doBuildResCheck(pQInfo);
|
||||||
|
|
Loading…
Reference in New Issue