commit
7241b62b96
|
@ -123,6 +123,7 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i
|
|||
*/
|
||||
bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo);
|
||||
bool tscIsTWAQuery(SQueryInfo* pQueryInfo);
|
||||
bool tscIsDiffQuery(SQueryInfo* pQueryInfo);
|
||||
bool tscIsSessionWindowQuery(SQueryInfo* pQueryInfo);
|
||||
bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo);
|
||||
bool tsIsArithmeticQueryOnAggResult(SQueryInfo* pQueryInfo);
|
||||
|
@ -132,7 +133,6 @@ bool hasTagValOutput(SQueryInfo* pQueryInfo);
|
|||
bool timeWindowInterpoRequired(SQueryInfo *pQueryInfo);
|
||||
bool isStabledev(SQueryInfo* pQueryInfo);
|
||||
bool isTsCompQuery(SQueryInfo* pQueryInfo);
|
||||
bool isSimpleAggregate(SQueryInfo* pQueryInfo);
|
||||
bool isBlockDistQuery(SQueryInfo* pQueryInfo);
|
||||
bool isSimpleAggregateRv(SQueryInfo* pQueryInfo);
|
||||
|
||||
|
@ -214,7 +214,7 @@ void tscColumnListDestroy(SArray* pColList);
|
|||
void tscColumnListCopy(SArray* dst, const SArray* src, uint64_t tableUid);
|
||||
void tscColumnListCopyAll(SArray* dst, const SArray* src);
|
||||
|
||||
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo);
|
||||
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo, uint64_t objId);
|
||||
|
||||
void tscDequoteAndTrimToken(SStrToken* pToken);
|
||||
int32_t tscValidateName(SStrToken* pToken);
|
||||
|
@ -329,9 +329,7 @@ STableMeta* tscTableMetaDup(STableMeta* pTableMeta);
|
|||
SVgroupsInfo* tscVgroupsInfoDup(SVgroupsInfo* pVgroupsInfo);
|
||||
|
||||
int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAttr, void* addr);
|
||||
|
||||
void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchema* pSchema);
|
||||
void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr, int32_t stage);
|
||||
void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr, int32_t stage, uint64_t qId);
|
||||
|
||||
void* malloc_throw(size_t size);
|
||||
void* calloc_throw(size_t nmemb, size_t size);
|
||||
|
|
|
@ -320,7 +320,7 @@ int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
|
|||
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo);
|
||||
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock);
|
||||
|
||||
void handleDownstreamOperator(SSqlObj** pSqlList, int32_t numOfUpstream, SQueryInfo* px, SSqlRes* pOutput);
|
||||
void handleDownstreamOperator(SSqlObj** pSqlList, int32_t numOfUpstream, SQueryInfo* px, SSqlObj* pParent);
|
||||
void destroyTableNameList(SInsertStatementParam* pInsertParam);
|
||||
|
||||
void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta);
|
||||
|
|
|
@ -144,7 +144,7 @@ static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) {
|
|||
}
|
||||
|
||||
// local merge has handle this situation during super table non-projection query.
|
||||
if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE) {
|
||||
if (pCmd->command != TSDB_SQL_RETRIEVE_GLOBALMERGE) {
|
||||
pRes->numOfClauseTotal += pRes->numOfRows;
|
||||
}
|
||||
|
||||
|
@ -174,7 +174,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
|
|||
}
|
||||
|
||||
pSql->fp = fp;
|
||||
if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
|
||||
if (pCmd->command != TSDB_SQL_RETRIEVE_GLOBALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
|
||||
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
|
||||
}
|
||||
|
||||
|
@ -257,14 +257,14 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) {
|
|||
}
|
||||
|
||||
return;
|
||||
} else if (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE) {
|
||||
} else if (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_GLOBALMERGE) {
|
||||
// in case of show command, return no data
|
||||
(*pSql->fetchFp)(param, pSql, 0);
|
||||
} else {
|
||||
assert(0);
|
||||
}
|
||||
} else { // current query is not completed, continue retrieve from node
|
||||
if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
|
||||
if (pCmd->command != TSDB_SQL_RETRIEVE_GLOBALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
|
||||
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
|
||||
}
|
||||
|
||||
|
|
|
@ -323,7 +323,7 @@ TAOS_ROW tscFetchRow(void *param) {
|
|||
// current data set are exhausted, fetch more data from node
|
||||
if (pRes->row >= pRes->numOfRows && (pRes->completed != true || hasMoreVnodesToTry(pSql) || hasMoreClauseToTry(pSql)) &&
|
||||
(pCmd->command == TSDB_SQL_RETRIEVE ||
|
||||
pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE ||
|
||||
pCmd->command == TSDB_SQL_RETRIEVE_GLOBALMERGE ||
|
||||
pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE ||
|
||||
pCmd->command == TSDB_SQL_FETCH ||
|
||||
pCmd->command == TSDB_SQL_SHOW ||
|
||||
|
|
|
@ -2157,11 +2157,14 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
|
|||
case TSDB_FUNC_MIN:
|
||||
case TSDB_FUNC_MAX:
|
||||
case TSDB_FUNC_DIFF:
|
||||
case TSDB_FUNC_DERIVATIVE:
|
||||
case TSDB_FUNC_STDDEV:
|
||||
case TSDB_FUNC_LEASTSQR: {
|
||||
// 1. valid the number of parameters
|
||||
if (pItem->pNode->pParam == NULL || (functionId != TSDB_FUNC_LEASTSQR && taosArrayGetSize(pItem->pNode->pParam) != 1) ||
|
||||
(functionId == TSDB_FUNC_LEASTSQR && taosArrayGetSize(pItem->pNode->pParam) != 3)) {
|
||||
int32_t numOfParams = (pItem->pNode->pParam == NULL)? 0: (int32_t) taosArrayGetSize(pItem->pNode->pParam);
|
||||
if (pItem->pNode->pParam == NULL ||
|
||||
(functionId != TSDB_FUNC_LEASTSQR && functionId != TSDB_FUNC_DERIVATIVE && numOfParams != 1) ||
|
||||
((functionId == TSDB_FUNC_LEASTSQR || functionId == TSDB_FUNC_DERIVATIVE) && numOfParams != 3)) {
|
||||
/* no parameters or more than one parameter for function */
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
}
|
||||
|
@ -2182,11 +2185,13 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
|
|||
|
||||
// 2. check if sql function can be applied on this column data type
|
||||
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
|
||||
STableComInfo info = tscGetTableInfo(pTableMetaInfo->pTableMeta);
|
||||
|
||||
SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex);
|
||||
|
||||
if (!IS_NUMERIC_TYPE(pSchema->type)) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||
} else if (IS_UNSIGNED_NUMERIC_TYPE(pSchema->type) && functionId == TSDB_FUNC_DIFF) {
|
||||
} else if (IS_UNSIGNED_NUMERIC_TYPE(pSchema->type) && (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE)) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg9);
|
||||
}
|
||||
|
||||
|
@ -2200,11 +2205,11 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
|
|||
}
|
||||
|
||||
// set the first column ts for diff query
|
||||
if (functionId == TSDB_FUNC_DIFF) {
|
||||
if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) {
|
||||
colIndex += 1;
|
||||
SColumnIndex indexTS = {.tableIndex = index.tableIndex, .columnIndex = 0};
|
||||
SExprInfo* pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &indexTS, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE,
|
||||
getNewResColId(pCmd), TSDB_KEYSIZE, false);
|
||||
SExprInfo* pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &indexTS, TSDB_DATA_TYPE_TIMESTAMP,
|
||||
TSDB_KEYSIZE, getNewResColId(pCmd), TSDB_KEYSIZE, false);
|
||||
|
||||
SColumnList ids = createColumnList(1, 0, 0);
|
||||
insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS_DUMMY].name, pExpr);
|
||||
|
@ -2230,12 +2235,29 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
|
|||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||
}
|
||||
|
||||
tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_DOUBLE, sizeof(double));
|
||||
tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_DOUBLE, DOUBLE_BYTES);
|
||||
} else if (functionId == TSDB_FUNC_IRATE) {
|
||||
STableComInfo info = tscGetTableInfo(pTableMetaInfo->pTableMeta);
|
||||
int64_t prec = info.precision;
|
||||
|
||||
tscExprAddParams(&pExpr->base, (char*)&prec, TSDB_DATA_TYPE_BIGINT, LONG_BYTES);
|
||||
} else if (functionId == TSDB_FUNC_DERIVATIVE) {
|
||||
char val[8] = {0};
|
||||
|
||||
int64_t tickPerSec = 0;
|
||||
if (tVariantDump(&pParamElem[1].pNode->value, (char*) &tickPerSec, TSDB_DATA_TYPE_BIGINT, true) < 0) {
|
||||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||
}
|
||||
|
||||
if (info.precision == TSDB_TIME_PRECISION_MILLI) {
|
||||
tickPerSec /= 1000;
|
||||
}
|
||||
|
||||
tscExprAddParams(&pExpr->base, (char*) &tickPerSec, TSDB_DATA_TYPE_BIGINT, LONG_BYTES);
|
||||
memset(val, 0, tListLen(val));
|
||||
if (tVariantDump(&pParamElem[2].pNode->value, val, TSDB_DATA_TYPE_BIGINT, true) < 0) {
|
||||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||
}
|
||||
|
||||
tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_BIGINT, LONG_BYTES);
|
||||
}
|
||||
|
||||
SColumnList ids = createColumnList(1, index.tableIndex, index.columnIndex);
|
||||
|
@ -2935,8 +2957,8 @@ void tscRestoreFuncForSTableQuery(SQueryInfo* pQueryInfo) {
|
|||
}
|
||||
|
||||
bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
|
||||
const char* msg1 = "TWA not allowed to apply to super table directly";
|
||||
const char* msg2 = "TWA only support group by tbname for super table query";
|
||||
const char* msg1 = "TWA/Diff not allowed to apply to super table directly";
|
||||
const char* msg2 = "TWA/Diff only support group by tbname for super table query";
|
||||
const char* msg3 = "function not support for super table query";
|
||||
|
||||
// filter sql function not supported by metric query yet.
|
||||
|
@ -2949,7 +2971,7 @@ bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo)
|
|||
}
|
||||
}
|
||||
|
||||
if (tscIsTWAQuery(pQueryInfo)) {
|
||||
if (tscIsTWAQuery(pQueryInfo) || tscIsDiffQuery(pQueryInfo)) {
|
||||
if (pQueryInfo->groupbyExpr.numOfGroupCols == 0) {
|
||||
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||
return true;
|
||||
|
@ -6376,7 +6398,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
|
|||
}
|
||||
|
||||
if (IS_MULTIOUTPUT(aAggs[functId].status) && functId != TSDB_FUNC_TOP && functId != TSDB_FUNC_BOTTOM &&
|
||||
functId != TSDB_FUNC_TAGPRJ && functId != TSDB_FUNC_PRJ) {
|
||||
functId != TSDB_FUNC_DIFF && functId != TSDB_FUNC_TAGPRJ && functId != TSDB_FUNC_PRJ) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||
}
|
||||
|
||||
|
@ -7820,6 +7842,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
|
|||
|
||||
pQueryInfo->arithmeticOnAgg = tsIsArithmeticQueryOnAggResult(pQueryInfo);
|
||||
pQueryInfo->orderProjectQuery = tscOrderedProjectionQueryOnSTable(pQueryInfo, 0);
|
||||
pQueryInfo->diffQuery = tscIsDiffQuery(pQueryInfo);
|
||||
|
||||
SExprInfo** p = NULL;
|
||||
int32_t numOfExpr = 0;
|
||||
|
|
|
@ -1588,7 +1588,7 @@ int tscProcessLocalRetrieveRsp(SSqlObj *pSql) {
|
|||
return tscLocalResultCommonBuilder(pSql, numOfRes);
|
||||
}
|
||||
|
||||
int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
|
||||
int tscProcessRetrieveGlobalMergeRsp(SSqlObj *pSql) {
|
||||
SSqlRes *pRes = &pSql->res;
|
||||
SSqlCmd* pCmd = &pSql->cmd;
|
||||
|
||||
|
@ -1615,12 +1615,13 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
|
|||
taosArrayPush(group, &tableKeyInfo);
|
||||
taosArrayPush(tableGroupInfo.pGroupList, &group);
|
||||
|
||||
pQueryInfo->pQInfo = createQInfoFromQueryNode(pQueryInfo, &tableGroupInfo, NULL, NULL, pRes->pMerger, MERGE_STAGE);
|
||||
tscDebug("0x%"PRIx64" create QInfo 0x%"PRIx64" to execute query processing", pSql->self, pSql->self);
|
||||
pQueryInfo->pQInfo = createQInfoFromQueryNode(pQueryInfo, &tableGroupInfo, NULL, NULL, pRes->pMerger, MERGE_STAGE, pSql->self);
|
||||
}
|
||||
|
||||
uint64_t localQueryId = 0;
|
||||
uint64_t localQueryId = pSql->self;
|
||||
qTableQuery(pQueryInfo->pQInfo, &localQueryId);
|
||||
convertQueryResult(pRes, pQueryInfo);
|
||||
convertQueryResult(pRes, pQueryInfo, pSql->self);
|
||||
|
||||
code = pRes->code;
|
||||
if (pRes->code == TSDB_CODE_SUCCESS) {
|
||||
|
@ -2689,7 +2690,7 @@ void tscInitMsgsFp() {
|
|||
|
||||
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;
|
||||
|
||||
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_LOCALMERGE] = tscProcessRetrieveLocalMergeRsp;
|
||||
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_GLOBALMERGE] = tscProcessRetrieveGlobalMergeRsp;
|
||||
|
||||
tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
|
||||
tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;
|
||||
|
|
|
@ -456,7 +456,7 @@ static bool needToFetchNewBlock(SSqlObj* pSql) {
|
|||
|
||||
return (pRes->completed != true || hasMoreVnodesToTry(pSql) || hasMoreClauseToTry(pSql)) &&
|
||||
(pCmd->command == TSDB_SQL_RETRIEVE ||
|
||||
pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE ||
|
||||
pCmd->command == TSDB_SQL_RETRIEVE_GLOBALMERGE ||
|
||||
pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE ||
|
||||
pCmd->command == TSDB_SQL_FETCH ||
|
||||
pCmd->command == TSDB_SQL_SHOW ||
|
||||
|
|
|
@ -1977,9 +1977,8 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
|
|||
}
|
||||
|
||||
memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub);
|
||||
tscDebug("0x%"PRIx64" reset all sub states to 0", pSql->self);
|
||||
tscDebug("0x%"PRIx64" reset all sub states to 0, start subquery, total:%d", pSql->self, pQueryInfo->numOfTables);
|
||||
|
||||
tscDebug("0x%"PRIx64" start subquery, total:%d", pSql->self, pQueryInfo->numOfTables);
|
||||
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
|
||||
SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, i);
|
||||
if (pSupporter == NULL) { // failed to create support struct, abort current query
|
||||
|
@ -2424,7 +2423,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
|
|||
|
||||
// pRes->code check only serves in launching metric sub-queries
|
||||
if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
|
||||
pCmd->command = TSDB_SQL_RETRIEVE_LOCALMERGE; // enable the abort of kill super table function.
|
||||
pCmd->command = TSDB_SQL_RETRIEVE_GLOBALMERGE; // enable the abort of kill super table function.
|
||||
return pRes->code;
|
||||
}
|
||||
|
||||
|
@ -2780,7 +2779,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
|
|||
if (code == TSDB_CODE_SUCCESS && trsupport->pExtMemBuffer == NULL) {
|
||||
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; // no result, set the result empty
|
||||
} else {
|
||||
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE;
|
||||
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_GLOBALMERGE;
|
||||
}
|
||||
|
||||
tscCreateResPointerInfo(&pParentSql->res, pPQueryInfo);
|
||||
|
@ -3502,7 +3501,7 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
|
|||
}
|
||||
|
||||
void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pSourceOperator,
|
||||
char* sql, void* merger, int32_t stage) {
|
||||
char* sql, void* merger, int32_t stage, uint64_t qId) {
|
||||
assert(pQueryInfo != NULL);
|
||||
SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo));
|
||||
if (pQInfo == NULL) {
|
||||
|
@ -3511,7 +3510,7 @@ void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGr
|
|||
|
||||
// to make sure third party won't overwrite this structure
|
||||
pQInfo->signature = pQInfo;
|
||||
|
||||
pQInfo->qId = qId;
|
||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||
SQueryAttr *pQueryAttr = &pQInfo->query;
|
||||
|
||||
|
|
|
@ -222,6 +222,8 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
|
|||
functionId != TSDB_FUNC_TS &&
|
||||
functionId != TSDB_FUNC_ARITHM &&
|
||||
functionId != TSDB_FUNC_TS_COMP &&
|
||||
functionId != TSDB_FUNC_DIFF &&
|
||||
functionId != TSDB_FUNC_TS_DUMMY &&
|
||||
functionId != TSDB_FUNC_TID_TAG) {
|
||||
return false;
|
||||
}
|
||||
|
@ -434,6 +436,23 @@ bool tscIsTWAQuery(SQueryInfo* pQueryInfo) {
|
|||
|
||||
return false;
|
||||
}
|
||||
|
||||
bool tscIsDiffQuery(SQueryInfo* pQueryInfo) {
|
||||
size_t num = tscNumOfExprs(pQueryInfo);
|
||||
for(int32_t i = 0; i < num; ++i) {
|
||||
SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
|
||||
if (pExpr == NULL || pExpr->base.functionId == TSDB_FUNC_TS_DUMMY) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pExpr->base.functionId == TSDB_FUNC_DIFF) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
bool tscIsSessionWindowQuery(SQueryInfo* pQueryInfo) {
|
||||
return pQueryInfo->sessionWindow.gap > 0;
|
||||
}
|
||||
|
@ -467,42 +486,12 @@ bool tscNeedReverseScan(SQueryInfo* pQueryInfo) {
|
|||
return false;
|
||||
}
|
||||
|
||||
bool isSimpleAggregate(SQueryInfo* pQueryInfo) {
|
||||
if (pQueryInfo->interval.interval > 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Note:top/bottom query is fixed output query
|
||||
if (tscIsTopBotQuery(pQueryInfo) || tscGroupbyColumn(pQueryInfo) || isTsCompQuery(pQueryInfo)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
size_t numOfExprs = tscNumOfExprs(pQueryInfo);
|
||||
for (int32_t i = 0; i < numOfExprs; ++i) {
|
||||
SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
|
||||
if (pExpr == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t functionId = pExpr->base.functionId;
|
||||
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!IS_MULTIOUTPUT(aAggs[functionId].status)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
bool isSimpleAggregateRv(SQueryInfo* pQueryInfo) {
|
||||
if (pQueryInfo->interval.interval > 0 || pQueryInfo->sessionWindow.gap > 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (tscGroupbyColumn(pQueryInfo) || isTsCompQuery(pQueryInfo)) {
|
||||
if (tscIsDiffQuery(pQueryInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -518,13 +507,13 @@ bool isSimpleAggregateRv(SQueryInfo* pQueryInfo) {
|
|||
continue;
|
||||
}
|
||||
|
||||
if (!IS_MULTIOUTPUT(aAggs[functionId].status)) {
|
||||
if ((!IS_MULTIOUTPUT(aAggs[functionId].status)) ||
|
||||
(functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_TS_COMP)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
|
||||
}
|
||||
|
||||
bool isBlockDistQuery(SQueryInfo* pQueryInfo) {
|
||||
|
@ -812,6 +801,7 @@ static void fetchNextBlockIfCompleted(SOperatorInfo* pOperator, bool* newgroup)
|
|||
for (int32_t i = 0; i < pOperator->numOfUpstream; ++i) {
|
||||
SJoinStatus* pStatus = &pJoinInfo->status[i];
|
||||
if (pStatus->pBlock == NULL || pStatus->index >= pStatus->pBlock->info.rows) {
|
||||
tscDebug("Retrieve nest query result, index:%d, total:%d", i, pOperator->numOfUpstream);
|
||||
pStatus->pBlock = pOperator->upstream[i]->exec(pOperator->upstream[i], newgroup);
|
||||
pStatus->index = 0;
|
||||
|
||||
|
@ -1056,7 +1046,7 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUp
|
|||
return pOperator;
|
||||
}
|
||||
|
||||
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
|
||||
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo, uint64_t objId) {
|
||||
// set the correct result
|
||||
SSDataBlock* p = pQueryInfo->pQInfo->runtimeEnv.outputBuf;
|
||||
pRes->numOfRows = (p != NULL)? p->info.rows: 0;
|
||||
|
@ -1066,6 +1056,7 @@ void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
|
|||
tscSetResRawPtrRv(pRes, pQueryInfo, p);
|
||||
}
|
||||
|
||||
tscDebug("0x%"PRIx64" retrieve result in pRes, numOfRows:%d", objId, pRes->numOfRows);
|
||||
pRes->row = 0;
|
||||
pRes->completed = (pRes->numOfRows == 0);
|
||||
}
|
||||
|
@ -1088,7 +1079,9 @@ static void createInputDataFilterInfo(SQueryInfo* px, int32_t numOfCol1, int32_t
|
|||
tfree(tableCols);
|
||||
}
|
||||
|
||||
void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQueryInfo* px, SSqlRes* pOutput) {
|
||||
void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQueryInfo* px, SSqlObj* pSql) {
|
||||
SSqlRes* pOutput = &pSql->res;
|
||||
|
||||
// handle the following query process
|
||||
if (px->pQInfo == NULL) {
|
||||
SColumnInfo* pColumnInfo = extractColumnInfoFromResult(px->colList);
|
||||
|
@ -1168,7 +1161,9 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
|
|||
}
|
||||
}
|
||||
|
||||
px->pQInfo = createQInfoFromQueryNode(px, &tableGroupInfo, pSourceOperator, NULL, NULL, MASTER_SCAN);
|
||||
tscDebug("0x%"PRIx64" create QInfo 0x%"PRIx64" to execute the main query while all nest queries are ready", pSql->self, pSql->self);
|
||||
px->pQInfo = createQInfoFromQueryNode(px, &tableGroupInfo, pSourceOperator, NULL, NULL, MASTER_SCAN, pSql->self);
|
||||
|
||||
tfree(pColumnInfo);
|
||||
tfree(schema);
|
||||
|
||||
|
@ -1176,9 +1171,9 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
|
|||
pSourceOperator->pRuntimeEnv = &px->pQInfo->runtimeEnv;
|
||||
}
|
||||
|
||||
uint64_t qId = 0;
|
||||
uint64_t qId = pSql->self;
|
||||
qTableQuery(px->pQInfo, &qId);
|
||||
convertQueryResult(pOutput, px);
|
||||
convertQueryResult(pOutput, px, pSql->self);
|
||||
}
|
||||
|
||||
static void tscDestroyResPointerInfo(SSqlRes* pRes) {
|
||||
|
@ -1374,7 +1369,7 @@ void tscFreeSqlObj(SSqlObj* pSql) {
|
|||
|
||||
SSqlCmd* pCmd = &pSql->cmd;
|
||||
int32_t cmd = pCmd->command;
|
||||
if (cmd < TSDB_SQL_INSERT || cmd == TSDB_SQL_RETRIEVE_LOCALMERGE || cmd == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
|
||||
if (cmd < TSDB_SQL_INSERT || cmd == TSDB_SQL_RETRIEVE_GLOBALMERGE || cmd == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
|
||||
cmd == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
|
||||
tscRemoveFromSqlList(pSql);
|
||||
}
|
||||
|
@ -2177,6 +2172,7 @@ size_t tscNumOfExprs(SQueryInfo* pQueryInfo) {
|
|||
return taosArrayGetSize(pQueryInfo->exprList);
|
||||
}
|
||||
|
||||
// todo REFACTOR
|
||||
void tscExprAddParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes) {
|
||||
assert (pExpr != NULL || argument != NULL || bytes != 0);
|
||||
|
||||
|
@ -3278,7 +3274,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
|
|||
pNewQueryInfo->numOfTables = 0;
|
||||
pNewQueryInfo->pTableMetaInfo = NULL;
|
||||
pNewQueryInfo->bufLen = pQueryInfo->bufLen;
|
||||
|
||||
pNewQueryInfo->buf = malloc(pQueryInfo->bufLen);
|
||||
if (pNewQueryInfo->buf == NULL) {
|
||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
|
@ -3438,7 +3433,7 @@ void doRetrieveSubqueryData(SSchedMsg *pMsg) {
|
|||
}
|
||||
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd);
|
||||
handleDownstreamOperator(pSql->pSubs, pSql->subState.numOfSub, pQueryInfo, &pSql->res);
|
||||
handleDownstreamOperator(pSql->pSubs, pSql->subState.numOfSub, pQueryInfo, pSql);
|
||||
|
||||
pSql->res.qId = -1;
|
||||
if (pSql->res.code == TSDB_CODE_SUCCESS) {
|
||||
|
@ -3468,13 +3463,12 @@ static void tscSubqueryRetrieveCallback(void* param, TAOS_RES* tres, int code) {
|
|||
}
|
||||
|
||||
pParentSql->cmd.active = pParentSql->cmd.pQueryInfo;
|
||||
|
||||
SSchedMsg schedMsg = {0};
|
||||
schedMsg.fp = doRetrieveSubqueryData;
|
||||
schedMsg.ahandle = (void *)pParentSql;
|
||||
schedMsg.thandle = (void *)1;
|
||||
schedMsg.msg = 0;
|
||||
taosScheduleTask(tscQhandle, &schedMsg);
|
||||
pParentSql->res.qId = -1;
|
||||
if (pSql->res.code == TSDB_CODE_SUCCESS) {
|
||||
(*pSql->fp)(pParentSql->param, pParentSql, pParentSql->res.numOfRows);
|
||||
} else {
|
||||
tscAsyncResultOnError(pParentSql);
|
||||
}
|
||||
}
|
||||
|
||||
// todo handle the failure
|
||||
|
@ -4238,7 +4232,8 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
|
|||
pQueryAttr->hasTagResults = hasTagValOutput(pQueryInfo);
|
||||
pQueryAttr->stabledev = isStabledev(pQueryInfo);
|
||||
pQueryAttr->tsCompQuery = isTsCompQuery(pQueryInfo);
|
||||
pQueryAttr->simpleAgg = isSimpleAggregate(pQueryInfo);
|
||||
pQueryAttr->diffQuery = tscIsDiffQuery(pQueryInfo);
|
||||
pQueryAttr->simpleAgg = isSimpleAggregateRv(pQueryInfo);
|
||||
pQueryAttr->needReverseScan = tscNeedReverseScan(pQueryInfo);
|
||||
pQueryAttr->stableQuery = QUERY_IS_STABLE_QUERY(pQueryInfo->type);
|
||||
pQueryAttr->groupbyColumn = (!pQueryInfo->stateWindow) && tscGroupbyColumn(pQueryInfo);
|
||||
|
@ -4257,7 +4252,6 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
|
|||
pQueryAttr->fillType = pQueryInfo->fillType;
|
||||
pQueryAttr->havingNum = pQueryInfo->havingFieldNum;
|
||||
|
||||
|
||||
if (pQueryInfo->order.order == TSDB_ORDER_ASC) { // TODO refactor
|
||||
pQueryAttr->window = pQueryInfo->window;
|
||||
} else {
|
||||
|
|
|
@ -76,7 +76,7 @@ enum {
|
|||
// SQL below for client local
|
||||
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_LOCAL, "local" )
|
||||
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DESCRIBE_TABLE, "describe-table" )
|
||||
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RETRIEVE_LOCALMERGE, "retrieve-localmerge" )
|
||||
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RETRIEVE_GLOBALMERGE, "retrieve-globalmerge" )
|
||||
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_TABLE_JOIN_RETRIEVE, "join-retrieve" )
|
||||
|
||||
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SHOW_CREATE_TABLE, "show-create-table")
|
||||
|
|
|
@ -66,17 +66,19 @@ extern "C" {
|
|||
#define TSDB_FUNC_RATE 29
|
||||
#define TSDB_FUNC_IRATE 30
|
||||
#define TSDB_FUNC_TID_TAG 31
|
||||
#define TSDB_FUNC_BLKINFO 32
|
||||
#define TSDB_FUNC_DERIVATIVE 32
|
||||
#define TSDB_FUNC_BLKINFO 33
|
||||
|
||||
#define TSDB_FUNC_HISTOGRAM 33
|
||||
#define TSDB_FUNC_HLL 34
|
||||
#define TSDB_FUNC_MODE 35
|
||||
#define TSDB_FUNC_SAMPLE 36
|
||||
#define TSDB_FUNC_CEIL 37
|
||||
#define TSDB_FUNC_FLOOR 38
|
||||
#define TSDB_FUNC_ROUND 39
|
||||
#define TSDB_FUNC_MAVG 40
|
||||
#define TSDB_FUNC_CSUM 41
|
||||
|
||||
#define TSDB_FUNC_HISTOGRAM 34
|
||||
#define TSDB_FUNC_HLL 35
|
||||
#define TSDB_FUNC_MODE 36
|
||||
#define TSDB_FUNC_SAMPLE 37
|
||||
#define TSDB_FUNC_CEIL 38
|
||||
#define TSDB_FUNC_FLOOR 39
|
||||
#define TSDB_FUNC_ROUND 40
|
||||
#define TSDB_FUNC_MAVG 41
|
||||
#define TSDB_FUNC_CSUM 42
|
||||
|
||||
#define TSDB_FUNCSTATE_SO 0x1u // single output
|
||||
#define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM
|
||||
|
|
|
@ -185,6 +185,7 @@ typedef struct SQueryAttr {
|
|||
bool queryBlockDist; // if query data block distribution
|
||||
bool stabledev; // super table stddev query
|
||||
bool tsCompQuery; // is tscomp query
|
||||
bool diffQuery; // is diff query
|
||||
bool simpleAgg;
|
||||
bool pointInterpQuery; // point interpolation query
|
||||
bool needReverseScan; // need reverse scan
|
||||
|
@ -245,6 +246,7 @@ typedef struct SQueryRuntimeEnv {
|
|||
void* pQueryHandle;
|
||||
|
||||
int32_t prevGroupId; // previous executed group id
|
||||
bool enableGroupData;
|
||||
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
|
||||
SHashObj* pResultRowHashTable; // quick locate the window object for each result
|
||||
char* keyBuf; // window key buffer
|
||||
|
@ -386,6 +388,7 @@ typedef struct STableScanInfo {
|
|||
int64_t elapsedTime;
|
||||
|
||||
int32_t tableIndex;
|
||||
int32_t prevGroupId; // previous table group id
|
||||
} STableScanInfo;
|
||||
|
||||
typedef struct STagScanInfo {
|
||||
|
|
|
@ -138,6 +138,7 @@ typedef struct SQueryInfo {
|
|||
bool hasFilter;
|
||||
bool onlyTagQuery;
|
||||
bool orderProjectQuery;
|
||||
bool diffQuery;
|
||||
bool stateWindow;
|
||||
} SQueryInfo;
|
||||
|
||||
|
|
|
@ -161,6 +161,14 @@ typedef struct SRateInfo {
|
|||
bool isIRate; // true for IRate functions, false for Rate functions
|
||||
} SRateInfo;
|
||||
|
||||
typedef struct SDerivInfo {
|
||||
double prevValue; // previous value
|
||||
TSKEY prevTs; // previous timestamp
|
||||
bool ignoreNegative;// ignore the negative value
|
||||
int64_t tsWindow; // time window for derivative
|
||||
bool valueSet; // the value has been set already
|
||||
} SDerivInfo;
|
||||
|
||||
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
|
||||
int16_t *bytes, int32_t *interBytes, int16_t extLength, bool isSuperTable) {
|
||||
if (!isValidDataType(dataType)) {
|
||||
|
@ -189,7 +197,9 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
|
|||
*bytes = (int16_t)(dataBytes + sizeof(int16_t) + sizeof(int64_t) + sizeof(int32_t) + sizeof(int32_t) + VARSTR_HEADER_SIZE);
|
||||
*interBytes = 0;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else if (functionId == TSDB_FUNC_BLKINFO) {
|
||||
}
|
||||
|
||||
if (functionId == TSDB_FUNC_BLKINFO) {
|
||||
*type = TSDB_DATA_TYPE_BINARY;
|
||||
*bytes = 16384;
|
||||
*interBytes = 0;
|
||||
|
@ -217,6 +227,13 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (functionId == TSDB_FUNC_DERIVATIVE) {
|
||||
*type = TSDB_DATA_TYPE_DOUBLE;
|
||||
*bytes = sizeof(double); // this results is compressed ts data, only one byte
|
||||
*interBytes = sizeof(SDerivInfo);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (isSuperTable) {
|
||||
if (functionId == TSDB_FUNC_MIN || functionId == TSDB_FUNC_MAX) {
|
||||
*type = TSDB_DATA_TYPE_BINARY;
|
||||
|
@ -3393,7 +3410,7 @@ enum {
|
|||
};
|
||||
|
||||
static bool diff_function_setup(SQLFunctionCtx *pCtx) {
|
||||
if (function_setup(pCtx)) {
|
||||
if (!function_setup(pCtx)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -3402,6 +3419,227 @@ static bool diff_function_setup(SQLFunctionCtx *pCtx) {
|
|||
return false;
|
||||
}
|
||||
|
||||
static bool deriv_function_setup(SQLFunctionCtx *pCtx) {
|
||||
if (!function_setup(pCtx)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// diff function require the value is set to -1
|
||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
SDerivInfo* pDerivInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
pDerivInfo->ignoreNegative = pCtx->param[2].i64;
|
||||
pDerivInfo->prevTs = -1;
|
||||
pDerivInfo->tsWindow = pCtx->param[0].i64;
|
||||
pDerivInfo->valueSet = false;
|
||||
return false;
|
||||
}
|
||||
|
||||
static void deriv_function(SQLFunctionCtx *pCtx) {
|
||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
SDerivInfo* pDerivInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
void *data = GET_INPUT_DATA_LIST(pCtx);
|
||||
bool isFirstBlock = (pDerivInfo->valueSet == false);
|
||||
|
||||
int32_t notNullElems = 0;
|
||||
|
||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
|
||||
int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1;
|
||||
|
||||
TSKEY *pTimestamp = pCtx->ptsOutputBuf;
|
||||
TSKEY *tsList = GET_TS_LIST(pCtx);
|
||||
|
||||
double *pOutput = (double *)pCtx->pOutput;
|
||||
|
||||
switch (pCtx->inputType) {
|
||||
case TSDB_DATA_TYPE_INT: {
|
||||
int32_t *pData = (int32_t *)data;
|
||||
for (; i < pCtx->size && i >= 0; i += step) {
|
||||
if (pCtx->hasNull && isNull((const char *)&pData[i], pCtx->inputType)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!pDerivInfo->valueSet) { // initial value is not set yet
|
||||
pDerivInfo->valueSet = true;
|
||||
} else {
|
||||
*pOutput = ((pData[i] - pDerivInfo->prevValue) * pDerivInfo->tsWindow) / (tsList[i] - pDerivInfo->prevTs);
|
||||
if (pDerivInfo->ignoreNegative && *pOutput < 0) {
|
||||
} else {
|
||||
*pTimestamp = tsList[i];
|
||||
pOutput += 1;
|
||||
pTimestamp += 1;
|
||||
}
|
||||
}
|
||||
|
||||
pDerivInfo->prevValue = pData[i];
|
||||
pDerivInfo->prevTs = tsList[i];
|
||||
notNullElems++;
|
||||
}
|
||||
|
||||
break;
|
||||
};
|
||||
|
||||
case TSDB_DATA_TYPE_BIGINT: {
|
||||
int64_t *pData = (int64_t *)data;
|
||||
for (; i < pCtx->size && i >= 0; i += step) {
|
||||
if (pCtx->hasNull && isNull((const char *)&pData[i], pCtx->inputType)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!pDerivInfo->valueSet) { // initial value is not set yet
|
||||
pDerivInfo->valueSet = true;
|
||||
} else {
|
||||
*pOutput = ((pData[i] - pDerivInfo->prevValue) * pDerivInfo->tsWindow) / (tsList[i] - pDerivInfo->prevTs);
|
||||
if (pDerivInfo->ignoreNegative && *pOutput < 0) {
|
||||
} else {
|
||||
*pTimestamp = tsList[i];
|
||||
pOutput += 1;
|
||||
pTimestamp += 1;
|
||||
}
|
||||
}
|
||||
|
||||
pDerivInfo->prevValue = (double) pData[i];
|
||||
pDerivInfo->prevTs = tsList[i];
|
||||
notNullElems++;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_DOUBLE: {
|
||||
double *pData = (double *)data;
|
||||
|
||||
for (; i < pCtx->size && i >= 0; i += step) {
|
||||
if (pCtx->hasNull && isNull((const char *)&pData[i], pCtx->inputType)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!pDerivInfo->valueSet) { // initial value is not set yet
|
||||
pDerivInfo->valueSet = true;
|
||||
} else {
|
||||
*pOutput = ((pData[i] - pDerivInfo->prevValue) * pDerivInfo->tsWindow) / (tsList[i] - pDerivInfo->prevTs);
|
||||
if (pDerivInfo->ignoreNegative && *pOutput < 0) {
|
||||
} else {
|
||||
*pTimestamp = tsList[i];
|
||||
pOutput += 1;
|
||||
pTimestamp += 1;
|
||||
}
|
||||
}
|
||||
|
||||
pDerivInfo->prevValue = pData[i];
|
||||
pDerivInfo->prevTs = tsList[i];
|
||||
notNullElems++;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_FLOAT: {
|
||||
float *pData = (float *)data;
|
||||
|
||||
for (; i < pCtx->size && i >= 0; i += step) {
|
||||
if (pCtx->hasNull && isNull((const char *)&pData[i], pCtx->inputType)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!pDerivInfo->valueSet) { // initial value is not set yet
|
||||
pDerivInfo->valueSet = true;
|
||||
} else {
|
||||
*pOutput = ((pData[i] - pDerivInfo->prevValue) * pDerivInfo->tsWindow) / (tsList[i] - pDerivInfo->prevTs);
|
||||
if (pDerivInfo->ignoreNegative && *pOutput < 0) {
|
||||
} else {
|
||||
*pTimestamp = tsList[i];
|
||||
pOutput += 1;
|
||||
pTimestamp += 1;
|
||||
}
|
||||
}
|
||||
|
||||
pDerivInfo->prevValue = pData[i];
|
||||
pDerivInfo->prevTs = tsList[i];
|
||||
notNullElems++;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_SMALLINT: {
|
||||
int16_t *pData = (int16_t *)data;
|
||||
for (; i < pCtx->size && i >= 0; i += step) {
|
||||
if (pCtx->hasNull && isNull((const char *)&pData[i], pCtx->inputType)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!pDerivInfo->valueSet) { // initial value is not set yet
|
||||
pDerivInfo->valueSet = true;
|
||||
} else {
|
||||
*pOutput = ((pData[i] - pDerivInfo->prevValue) * pDerivInfo->tsWindow) / (tsList[i] - pDerivInfo->prevTs);
|
||||
if (pDerivInfo->ignoreNegative && *pOutput < 0) {
|
||||
} else {
|
||||
*pTimestamp = tsList[i];
|
||||
pOutput += 1;
|
||||
pTimestamp += 1;
|
||||
}
|
||||
}
|
||||
|
||||
pDerivInfo->prevValue = pData[i];
|
||||
pDerivInfo->prevTs = tsList[i];
|
||||
notNullElems++;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_TINYINT: {
|
||||
int8_t *pData = (int8_t *)data;
|
||||
for (; i < pCtx->size && i >= 0; i += step) {
|
||||
if (pCtx->hasNull && isNull((char *)&pData[i], pCtx->inputType)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!pDerivInfo->valueSet) { // initial value is not set yet
|
||||
pDerivInfo->valueSet = true;
|
||||
} else {
|
||||
*pOutput = ((pData[i] - pDerivInfo->prevValue) * pDerivInfo->tsWindow) / (tsList[i] - pDerivInfo->prevTs);
|
||||
if (pDerivInfo->ignoreNegative && *pOutput < 0) {
|
||||
} else {
|
||||
*pTimestamp = tsList[i];
|
||||
|
||||
pOutput += 1;
|
||||
pTimestamp += 1;
|
||||
}
|
||||
}
|
||||
|
||||
pDerivInfo->prevValue = pData[i];
|
||||
pDerivInfo->prevTs = tsList[i];
|
||||
notNullElems++;
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
qError("error input type");
|
||||
}
|
||||
|
||||
// initial value is not set yet, all data block are null
|
||||
if (!pDerivInfo->valueSet || notNullElems <= 0) {
|
||||
/*
|
||||
* 1. current block and blocks before are full of null
|
||||
* 2. current block may be null value
|
||||
*/
|
||||
assert(pCtx->hasNull);
|
||||
} else {
|
||||
int32_t forwardStep = (isFirstBlock) ? notNullElems - 1 : notNullElems;
|
||||
GET_RES_INFO(pCtx)->numOfRes += forwardStep;
|
||||
}
|
||||
}
|
||||
|
||||
#define DIFF_IMPL(ctx, d, type) \
|
||||
do { \
|
||||
if ((ctx)->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { \
|
||||
(ctx)->param[1].nType = (ctx)->inputType; \
|
||||
*(type *)&(ctx)->param[1].i64 = *(type *)(d); \
|
||||
} else { \
|
||||
*(type *)(ctx)->pOutput = *(type *)(d) - (*(type *)(&(ctx)->param[1].i64)); \
|
||||
*(type *)(&(ctx)->param[1].i64) = *(type *)(d); \
|
||||
*(int64_t *)(ctx)->ptsOutputBuf = GET_TS_DATA(ctx, index); \
|
||||
} \
|
||||
} while (0);
|
||||
|
||||
// TODO difference in date column
|
||||
static void diff_function(SQLFunctionCtx *pCtx) {
|
||||
void *data = GET_INPUT_DATA_LIST(pCtx);
|
||||
|
@ -3425,19 +3663,9 @@ static void diff_function(SQLFunctionCtx *pCtx) {
|
|||
continue;
|
||||
}
|
||||
|
||||
if (pCtx->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
|
||||
pCtx->param[1].i64 = pData[i];
|
||||
pCtx->param[1].nType = pCtx->inputType;
|
||||
} else if ((i == 0 && pCtx->order == TSDB_ORDER_ASC) || (i == pCtx->size - 1 && pCtx->order == TSDB_ORDER_DESC)) {
|
||||
*pOutput = (int32_t)(pData[i] - pCtx->param[1].i64);
|
||||
*pTimestamp = tsList[i];
|
||||
|
||||
pOutput += 1;
|
||||
pTimestamp += 1;
|
||||
} else {
|
||||
if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
|
||||
*pOutput = (int32_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null
|
||||
*pTimestamp = tsList[i];
|
||||
|
||||
pOutput += 1;
|
||||
pTimestamp += 1;
|
||||
}
|
||||
|
@ -3457,19 +3685,9 @@ static void diff_function(SQLFunctionCtx *pCtx) {
|
|||
continue;
|
||||
}
|
||||
|
||||
if (pCtx->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
|
||||
pCtx->param[1].i64 = pData[i];
|
||||
pCtx->param[1].nType = pCtx->inputType;
|
||||
} else if ((i == 0 && pCtx->order == TSDB_ORDER_ASC) || (i == pCtx->size - 1 && pCtx->order == TSDB_ORDER_DESC)) {
|
||||
*pOutput = pData[i] - pCtx->param[1].i64;
|
||||
if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
|
||||
*pOutput = pData[i] - pCtx->param[1].i64; // direct previous may be null
|
||||
*pTimestamp = tsList[i];
|
||||
|
||||
pOutput += 1;
|
||||
pTimestamp += 1;
|
||||
} else {
|
||||
*pOutput = pData[i] - pCtx->param[1].i64;
|
||||
*pTimestamp = tsList[i];
|
||||
|
||||
pOutput += 1;
|
||||
pTimestamp += 1;
|
||||
}
|
||||
|
@ -3489,16 +3707,8 @@ static void diff_function(SQLFunctionCtx *pCtx) {
|
|||
continue;
|
||||
}
|
||||
|
||||
if (pCtx->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
|
||||
pCtx->param[1].dKey = pData[i];
|
||||
pCtx->param[1].nType = pCtx->inputType;
|
||||
} else if ((i == 0 && pCtx->order == TSDB_ORDER_ASC) || (i == pCtx->size - 1 && pCtx->order == TSDB_ORDER_DESC)) {
|
||||
*pOutput = pData[i] - pCtx->param[1].dKey;
|
||||
*pTimestamp = tsList[i];
|
||||
pOutput += 1;
|
||||
pTimestamp += 1;
|
||||
} else {
|
||||
*pOutput = pData[i] - pCtx->param[1].dKey;
|
||||
if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
|
||||
*pOutput = pData[i] - pCtx->param[1].dKey; // direct previous may be null
|
||||
*pTimestamp = tsList[i];
|
||||
pOutput += 1;
|
||||
pTimestamp += 1;
|
||||
|
@ -3519,24 +3729,13 @@ static void diff_function(SQLFunctionCtx *pCtx) {
|
|||
continue;
|
||||
}
|
||||
|
||||
if (pCtx->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
|
||||
pCtx->param[1].dKey = pData[i];
|
||||
pCtx->param[1].nType = pCtx->inputType;
|
||||
} else if ((i == 0 && pCtx->order == TSDB_ORDER_ASC) || (i == pCtx->size - 1 && pCtx->order == TSDB_ORDER_DESC)) {
|
||||
*pOutput = (float)(pData[i] - pCtx->param[1].dKey);
|
||||
if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
|
||||
*pOutput = (float)(pData[i] - pCtx->param[1].dKey); // direct previous may be null
|
||||
*pTimestamp = tsList[i];
|
||||
|
||||
pOutput += 1;
|
||||
pTimestamp += 1;
|
||||
} else {
|
||||
*pOutput = (float)(pData[i] - pCtx->param[1].dKey);
|
||||
*pTimestamp = tsList[i];
|
||||
|
||||
pOutput += 1;
|
||||
pTimestamp += 1;
|
||||
}
|
||||
|
||||
// keep the last value, the remain may be all null
|
||||
pCtx->param[1].dKey = pData[i];
|
||||
pCtx->param[1].nType = pCtx->inputType;
|
||||
notNullElems++;
|
||||
|
@ -3552,18 +3751,9 @@ static void diff_function(SQLFunctionCtx *pCtx) {
|
|||
continue;
|
||||
}
|
||||
|
||||
if (pCtx->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
|
||||
pCtx->param[1].i64 = pData[i];
|
||||
pCtx->param[1].nType = pCtx->inputType;
|
||||
} else if ((i == 0 && pCtx->order == TSDB_ORDER_ASC) || (i == pCtx->size - 1 && pCtx->order == TSDB_ORDER_DESC)) {
|
||||
*pOutput = (int16_t)(pData[i] - pCtx->param[1].i64);
|
||||
if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
|
||||
*pOutput = (int16_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null
|
||||
*pTimestamp = tsList[i];
|
||||
pOutput += 1;
|
||||
pTimestamp += 1;
|
||||
} else {
|
||||
*pOutput = (int16_t)(pData[i] - pCtx->param[1].i64);
|
||||
*pTimestamp = tsList[i];
|
||||
|
||||
pOutput += 1;
|
||||
pTimestamp += 1;
|
||||
}
|
||||
|
@ -3574,6 +3764,7 @@ static void diff_function(SQLFunctionCtx *pCtx) {
|
|||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_TINYINT: {
|
||||
int8_t *pData = (int8_t *)data;
|
||||
int8_t *pOutput = (int8_t *)pCtx->pOutput;
|
||||
|
@ -3583,19 +3774,9 @@ static void diff_function(SQLFunctionCtx *pCtx) {
|
|||
continue;
|
||||
}
|
||||
|
||||
if (pCtx->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
|
||||
pCtx->param[1].i64 = pData[i];
|
||||
pCtx->param[1].nType = pCtx->inputType;
|
||||
} else if ((i == 0 && pCtx->order == TSDB_ORDER_ASC) || (i == pCtx->size - 1 && pCtx->order == TSDB_ORDER_DESC)) {
|
||||
*pOutput = (int8_t)(pData[i] - pCtx->param[1].i64);
|
||||
if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
|
||||
*pOutput = (int8_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null
|
||||
*pTimestamp = tsList[i];
|
||||
|
||||
pOutput += 1;
|
||||
pTimestamp += 1;
|
||||
} else {
|
||||
*pOutput = (int8_t)(pData[i] - pCtx->param[1].i64);
|
||||
*pTimestamp = tsList[i];
|
||||
|
||||
pOutput += 1;
|
||||
pTimestamp += 1;
|
||||
}
|
||||
|
@ -3624,18 +3805,6 @@ static void diff_function(SQLFunctionCtx *pCtx) {
|
|||
}
|
||||
}
|
||||
|
||||
#define DIFF_IMPL(ctx, d, type) \
|
||||
do { \
|
||||
if ((ctx)->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { \
|
||||
(ctx)->param[1].nType = (ctx)->inputType; \
|
||||
*(type *)&(ctx)->param[1].i64 = *(type *)(d); \
|
||||
} else { \
|
||||
*(type *)(ctx)->pOutput = *(type *)(d) - (*(type *)(&(ctx)->param[1].i64)); \
|
||||
*(type *)(&(ctx)->param[1].i64) = *(type *)(d); \
|
||||
*(int64_t *)(ctx)->ptsOutputBuf = GET_TS_DATA(ctx, index); \
|
||||
} \
|
||||
} while (0);
|
||||
|
||||
static void diff_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
||||
char *pData = GET_INPUT_DATA(pCtx, index);
|
||||
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
|
||||
|
@ -5215,7 +5384,7 @@ SAggFunctionInfo aAggs[] = {{
|
|||
"diff",
|
||||
TSDB_FUNC_DIFF,
|
||||
TSDB_FUNC_INVALID_ID,
|
||||
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_NEED_TS,
|
||||
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS,
|
||||
diff_function_setup,
|
||||
diff_function,
|
||||
diff_function_f,
|
||||
|
@ -5315,8 +5484,20 @@ SAggFunctionInfo aAggs[] = {{
|
|||
noop1,
|
||||
dataBlockRequired,
|
||||
},
|
||||
{ //32
|
||||
"derivative", // return table id and the corresponding tags for join match and subscribe
|
||||
TSDB_FUNC_DERIVATIVE,
|
||||
TSDB_FUNC_INVALID_ID,
|
||||
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS,
|
||||
deriv_function_setup,
|
||||
deriv_function,
|
||||
noop2,
|
||||
doFinalizer,
|
||||
noop1,
|
||||
dataBlockRequired,
|
||||
},
|
||||
{
|
||||
// 32
|
||||
// 33
|
||||
"_block_dist", // return table id and the corresponding tags for join match and subscribe
|
||||
TSDB_FUNC_BLKINFO,
|
||||
TSDB_FUNC_BLKINFO,
|
||||
|
|
|
@ -1681,6 +1681,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
|
|||
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
|
||||
|
||||
pRuntimeEnv->prevGroupId = INT32_MIN;
|
||||
pRuntimeEnv->enableGroupData = false;
|
||||
|
||||
pRuntimeEnv->pQueryAttr = pQueryAttr;
|
||||
|
||||
pRuntimeEnv->pResultRowHashTable = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
|
@ -2652,7 +2654,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
|
|||
pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) {
|
||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
} else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery)) { // stable aggregate, not interval aggregate or normal column aggregate
|
||||
} else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery)) { // stable aggregate, not interval aggregate or normal column aggregate
|
||||
doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx,
|
||||
pTableScanInfo->rowCellInfoOffset, pTableScanInfo->numOfOutput,
|
||||
pRuntimeEnv->current->groupIndex);
|
||||
|
@ -3119,8 +3121,8 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i
|
|||
assert(pCtx[i].pOutput != NULL);
|
||||
|
||||
// set the timestamp output buffer for top/bottom/diff query
|
||||
int32_t functionId = pCtx[i].functionId;
|
||||
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
|
||||
int32_t fid = pCtx[i].functionId;
|
||||
if (fid == TSDB_FUNC_TOP || fid == TSDB_FUNC_BOTTOM || fid == TSDB_FUNC_DIFF || fid == TSDB_FUNC_DERIVATIVE) {
|
||||
pCtx[i].ptsOutputBuf = pCtx[0].pOutput;
|
||||
}
|
||||
}
|
||||
|
@ -4271,6 +4273,14 @@ static SSDataBlock* doTableScanImpl(void* param, bool* newgroup) {
|
|||
|
||||
pRuntimeEnv->current = *pTableQueryInfo;
|
||||
doTableQueryInfoTimeWindowCheck(pQueryAttr, *pTableQueryInfo);
|
||||
|
||||
if (pRuntimeEnv->enableGroupData) {
|
||||
if(pTableScanInfo->prevGroupId != -1 && pTableScanInfo->prevGroupId != (*pTableQueryInfo)->groupIndex) {
|
||||
*newgroup = true;
|
||||
}
|
||||
}
|
||||
|
||||
pTableScanInfo->prevGroupId = (*pTableQueryInfo)->groupIndex;
|
||||
}
|
||||
|
||||
// this function never returns error?
|
||||
|
@ -4417,6 +4427,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
|
|||
pInfo->reverseTimes = 0;
|
||||
pInfo->order = pRuntimeEnv->pQueryAttr->order.order;
|
||||
pInfo->current = 0;
|
||||
// pInfo->prevGroupId = -1;
|
||||
|
||||
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
||||
pOperator->name = "TableScanOperator";
|
||||
|
@ -4439,6 +4450,8 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE
|
|||
pInfo->reverseTimes = 0;
|
||||
pInfo->order = pRuntimeEnv->pQueryAttr->order.order;
|
||||
pInfo->current = 0;
|
||||
pInfo->prevGroupId = -1;
|
||||
pRuntimeEnv->enableGroupData = true;
|
||||
|
||||
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
||||
pOperator->name = "TableSeqScanOperator";
|
||||
|
@ -4543,6 +4556,7 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime
|
|||
pInfo->reverseTimes = reverseTime;
|
||||
pInfo->current = 0;
|
||||
pInfo->order = pRuntimeEnv->pQueryAttr->order.order;
|
||||
// pInfo->prevGroupId = -1;
|
||||
|
||||
SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo));
|
||||
pOptr->name = "DataBlocksOptimizedScanOperator";
|
||||
|
@ -4921,10 +4935,17 @@ static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) {
|
|||
}
|
||||
|
||||
// Return result of the previous group in the firstly.
|
||||
if (*newgroup && pRes->info.rows > 0) {
|
||||
if (*newgroup) {
|
||||
if (pRes->info.rows > 0) {
|
||||
pArithInfo->existDataBlock = pBlock;
|
||||
clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput);
|
||||
return pInfo->pRes;
|
||||
} else { // init output buffer for a new group data
|
||||
for (int32_t j = 0; j < pOperator->numOfOutput; ++j) {
|
||||
aAggs[pInfo->pCtx[j].functionId].xFinalize(&pInfo->pCtx[j]);
|
||||
}
|
||||
initCtxOutputBuffer(pInfo->pCtx, pOperator->numOfOutput);
|
||||
}
|
||||
}
|
||||
|
||||
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
|
||||
|
@ -7442,7 +7463,6 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
|
|||
doCopyQueryResultToMsg(pQInfo, (int32_t)pRuntimeEnv->outputBuf->info.rows, data);
|
||||
}
|
||||
|
||||
pRuntimeEnv->resultInfo.total += pRuntimeEnv->outputBuf->info.rows;
|
||||
qDebug("QInfo:0x%"PRIx64" current numOfRes rows:%d, total:%" PRId64, pQInfo->qId,
|
||||
pRuntimeEnv->outputBuf->info.rows, pRuntimeEnv->resultInfo.total);
|
||||
|
||||
|
|
|
@ -531,7 +531,7 @@ SArray* createTableScanPlan(SQueryAttr* pQueryAttr) {
|
|||
} else {
|
||||
if (pQueryAttr->queryBlockDist) {
|
||||
op = OP_TableBlockInfoScan;
|
||||
} else if (pQueryAttr->tsCompQuery || pQueryAttr->pointInterpQuery) {
|
||||
} else if (pQueryAttr->tsCompQuery || pQueryAttr->pointInterpQuery || pQueryAttr->diffQuery) {
|
||||
op = OP_TableSeqScan;
|
||||
} else if (pQueryAttr->needReverseScan) {
|
||||
op = OP_DataBlocksOptScan;
|
||||
|
@ -605,7 +605,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
|
|||
taosArrayPush(plan, &op);
|
||||
}
|
||||
} else if (pQueryAttr->simpleAgg) {
|
||||
if (pQueryAttr->stableQuery && !pQueryAttr->tsCompQuery) {
|
||||
if (pQueryAttr->stableQuery && !pQueryAttr->tsCompQuery && !pQueryAttr->diffQuery) {
|
||||
op = OP_MultiTableAggregate;
|
||||
} else {
|
||||
op = OP_Aggregate;
|
||||
|
|
|
@ -241,15 +241,16 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
|
|||
|
||||
bool newgroup = false;
|
||||
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot, &newgroup);
|
||||
pRuntimeEnv->resultInfo.total += GET_NUM_OF_RESULTS(pRuntimeEnv);
|
||||
|
||||
if (isQueryKilled(pQInfo)) {
|
||||
qDebug("QInfo:0x%"PRIx64" query is killed", pQInfo->qId);
|
||||
} else if (GET_NUM_OF_RESULTS(pRuntimeEnv) == 0) {
|
||||
qDebug("QInfo:0x%"PRIx64" over, %u tables queried, %"PRId64" rows are returned", pQInfo->qId, pRuntimeEnv->tableqinfoGroupInfo.numOfTables,
|
||||
qDebug("QInfo:0x%"PRIx64" over, %u tables queried, total %"PRId64" rows returned", pQInfo->qId, pRuntimeEnv->tableqinfoGroupInfo.numOfTables,
|
||||
pRuntimeEnv->resultInfo.total);
|
||||
} else {
|
||||
qDebug("QInfo:0x%"PRIx64" query paused, %d rows returned, numOfTotal:%" PRId64 " rows",
|
||||
pQInfo->qId, GET_NUM_OF_RESULTS(pRuntimeEnv), pRuntimeEnv->resultInfo.total + GET_NUM_OF_RESULTS(pRuntimeEnv));
|
||||
qDebug("QInfo:0x%"PRIx64" query paused, %d rows returned, total:%" PRId64 " rows", pQInfo->qId,
|
||||
GET_NUM_OF_RESULTS(pRuntimeEnv), pRuntimeEnv->resultInfo.total);
|
||||
}
|
||||
|
||||
return doBuildResCheck(pQInfo);
|
||||
|
|
|
@ -1811,10 +1811,6 @@ if $data09 != 3 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
sql select st0.*,st1.* from st0, st1 where st1.id1=st0.id1 and st0.ts=st1.ts and st1.ts=st0.ts and st0.id1=st1.id1 order by st0.ts limit 5 offset 5
|
||||
if $rows != 5 then
|
||||
return -1
|
||||
|
@ -2294,7 +2290,6 @@ if $data19 != 9925 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
|
||||
sql_error select tb0_1.*, tb1_1.* from tb0_1, tb1_1 where tb0_1.f1=tb1_1.f1;
|
||||
sql_error select tb0_1.*, tb1_1.* from tb0_1, tb1_1 where tb0_1.ts=tb1_1.ts and tb0_1.id1=tb1_1.id2;
|
||||
sql_error select tb0_5.*, tb1_5.*,tb2_5.*,tb3_5.*,tb4_5.*,tb5_5.*, tb6_5.*,tb7_5.*,tb8_5.*,tb9_5.*,tba_5.* from tb0_5, tb1_5, tb2_5, tb3_5, tb4_5,tb5_5, tb6_5, tb7_5, tb8_5, tb9_5, tba_5 where tb9_5.ts=tb8_5.ts and tb8_5.ts=tb7_5.ts and tb7_5.ts=tb6_5.ts and tb6_5.ts=tb5_5.ts and tb5_5.ts=tb4_5.ts and tb4_5.ts=tb3_5.ts and tb3_5.ts=tb2_5.ts and tb2_5.ts=tb1_5.ts and tb1_5.ts=tb0_5.ts and tb0_5.ts=tba_5.ts;
|
||||
|
@ -2317,10 +2312,4 @@ sql_error select last(*) from st0, st1 where st0.ts=st1.ts and st0.id1=st1.id1 g
|
|||
sql_error select st0.*,st1.*,st2.*,st3.*,st4.*,st5.*,st6.*,st7.*,st8.*,st9.* from st0,st1,st2,st3,st4,st5,st6,st7,st8,st9 where st0.ts=st2.ts and st0.ts=st4.ts and st0.ts=st6.ts and st0.ts=st8.ts and st1.ts=st3.ts and st3.ts=st5.ts and st5.ts=st7.ts and st7.ts=st9.ts and st0.id1=st2.id1 and st0.id1=st4.id1 and st0.id1=st6.id1 and st0.id1=st8.id1 and st1.id1=st3.id1 and st3.id1=st5.id1 and st5.id1=st7.id1 and st7.id1=st9.id1;
|
||||
sql_error select st0.*,st1.*,st2.*,st3.*,st4.*,st5.*,st6.*,st7.*,st8.*,st9.* from st0,st1,st2,st3,st4,st5,st6,st7,st8,st9,sta where st0.ts=st2.ts and st0.ts=st4.ts and st0.ts=st6.ts and st0.ts=st8.ts and st1.ts=st3.ts and st3.ts=st5.ts and st5.ts=st7.ts and st7.ts=st9.ts and st0.ts=st1.ts and st0.id1=st2.id1 and st0.id1=st4.id1 and st0.id1=st6.id1 and st0.id1=st8.id1 and st1.id1=st3.id1 and st3.id1=st5.id1 and st5.id1=st7.id1 and st7.id1=st9.id1 and st0.id1=st1.id1 and st0.id1=sta.id1 and st0.ts=sta.ts;
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
|
@ -9,7 +9,7 @@ sql connect
|
|||
print ======================== dnode1 start
|
||||
|
||||
$db = testdb
|
||||
|
||||
sql drop database if exists $db
|
||||
sql create database $db cachelast 2
|
||||
sql use $db
|
||||
|
||||
|
|
|
@ -39,6 +39,7 @@ run general/parser/slimit1.sim
|
|||
run general/parser/slimit_alter_tags.sim
|
||||
run general/parser/tbnameIn.sim
|
||||
run general/parser/join.sim
|
||||
#run general/parser/join_multitables.sim
|
||||
run general/parser/join_multivnode.sim
|
||||
run general/parser/join_manyblocks.sim
|
||||
run general/parser/projection_limit_offset.sim
|
||||
|
|
Loading…
Reference in New Issue