From b0281578d557b1825a6af85c6fd5940a73cead78 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 4 Jun 2021 11:49:56 +0800 Subject: [PATCH 01/11] [td-4529] diff support super table query. --- src/client/inc/tscUtil.h | 4 +- src/client/inc/tsclient.h | 2 +- src/client/src/tscAsync.c | 8 ++-- src/client/src/tscLocal.c | 2 +- src/client/src/tscSQLParser.c | 11 +++--- src/client/src/tscServer.c | 9 +++-- src/client/src/tscSql.c | 2 +- src/client/src/tscSubquery.c | 8 ++-- src/client/src/tscUtil.c | 72 ++++++++++++++++------------------- src/common/inc/tcmdtype.h | 2 +- src/query/inc/qExecutor.h | 2 + src/query/inc/qTableMeta.h | 1 + src/query/src/qAggMain.c | 8 ++-- src/query/src/qExecutor.c | 27 ++++++++++--- src/query/src/qPlan.c | 4 +- src/query/src/queryMain.c | 7 ++-- 16 files changed, 92 insertions(+), 77 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index f747184ce0..448fee936f 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -123,6 +123,7 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i */ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo); bool tscIsTWAQuery(SQueryInfo* pQueryInfo); +bool tscIsDiffQuery(SQueryInfo* pQueryInfo); bool tscIsSessionWindowQuery(SQueryInfo* pQueryInfo); bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo); bool tsIsArithmeticQueryOnAggResult(SQueryInfo* pQueryInfo); @@ -132,7 +133,6 @@ bool hasTagValOutput(SQueryInfo* pQueryInfo); bool timeWindowInterpoRequired(SQueryInfo *pQueryInfo); bool isStabledev(SQueryInfo* pQueryInfo); bool isTsCompQuery(SQueryInfo* pQueryInfo); -bool isSimpleAggregate(SQueryInfo* pQueryInfo); bool isBlockDistQuery(SQueryInfo* pQueryInfo); bool isSimpleAggregateRv(SQueryInfo* pQueryInfo); @@ -331,7 +331,7 @@ SVgroupsInfo* tscVgroupsInfoDup(SVgroupsInfo* pVgroupsInfo); int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAttr, void* addr); void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchema* pSchema); -void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr, int32_t stage); +void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr, int32_t stage, uint64_t qId); void* malloc_throw(size_t size); void* calloc_throw(size_t nmemb, size_t size); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 67fd34ffd7..46aad90c42 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -319,7 +319,7 @@ int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo); void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo); void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock); -void handleDownstreamOperator(SSqlObj** pSqlList, int32_t numOfUpstream, SQueryInfo* px, SSqlRes* pOutput); +void handleDownstreamOperator(SSqlObj** pSqlList, int32_t numOfUpstream, SQueryInfo* px, SSqlObj* pParent); void destroyTableNameList(SInsertStatementParam* pInsertParam); void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta); diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 15276a3888..d857d00e15 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -144,7 +144,7 @@ static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) { } // local merge has handle this situation during super table non-projection query. - if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE) { + if (pCmd->command != TSDB_SQL_RETRIEVE_GLOBALMERGE) { pRes->numOfClauseTotal += pRes->numOfRows; } @@ -174,7 +174,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo } pSql->fp = fp; - if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) { + if (pCmd->command != TSDB_SQL_RETRIEVE_GLOBALMERGE && pCmd->command < TSDB_SQL_LOCAL) { pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; } @@ -257,14 +257,14 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) { } return; - } else if (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE) { + } else if (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_GLOBALMERGE) { // in case of show command, return no data (*pSql->fetchFp)(param, pSql, 0); } else { assert(0); } } else { // current query is not completed, continue retrieve from node - if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) { + if (pCmd->command != TSDB_SQL_RETRIEVE_GLOBALMERGE && pCmd->command < TSDB_SQL_LOCAL) { pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; } diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index f97f54a626..2c9717306f 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -323,7 +323,7 @@ TAOS_ROW tscFetchRow(void *param) { // current data set are exhausted, fetch more data from node if (pRes->row >= pRes->numOfRows && (pRes->completed != true || hasMoreVnodesToTry(pSql) || hasMoreClauseToTry(pSql)) && (pCmd->command == TSDB_SQL_RETRIEVE || - pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE || + pCmd->command == TSDB_SQL_RETRIEVE_GLOBALMERGE || pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE || pCmd->command == TSDB_SQL_FETCH || pCmd->command == TSDB_SQL_SHOW || diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 15e40d5918..5d712bb980 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -2928,8 +2928,8 @@ void tscRestoreFuncForSTableQuery(SQueryInfo* pQueryInfo) { } bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { - const char* msg1 = "TWA not allowed to apply to super table directly"; - const char* msg2 = "TWA only support group by tbname for super table query"; + const char* msg1 = "TWA/Diff not allowed to apply to super table directly"; + const char* msg2 = "TWA/Diff only support group by tbname for super table query"; const char* msg3 = "function not support for super table query"; // filter sql function not supported by metric query yet. @@ -2942,7 +2942,7 @@ bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) } } - if (tscIsTWAQuery(pQueryInfo)) { + if (tscIsTWAQuery(pQueryInfo) || tscIsDiffQuery(pQueryInfo)) { if (pQueryInfo->groupbyExpr.numOfGroupCols == 0) { invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); return true; @@ -6276,7 +6276,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { } if (IS_MULTIOUTPUT(aAggs[functId].status) && functId != TSDB_FUNC_TOP && functId != TSDB_FUNC_BOTTOM && - functId != TSDB_FUNC_TAGPRJ && functId != TSDB_FUNC_PRJ) { + functId != TSDB_FUNC_DIFF && functId != TSDB_FUNC_TAGPRJ && functId != TSDB_FUNC_PRJ) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); } @@ -6758,7 +6758,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { const char* msg6 = "from missing in subclause"; const char* msg7 = "time interval is required"; const char* msg8 = "the first column should be primary timestamp column"; - + SSqlCmd* pCmd = &pSql->cmd; SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); assert(pQueryInfo->numOfTables == 1); @@ -7719,6 +7719,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf pQueryInfo->arithmeticOnAgg = tsIsArithmeticQueryOnAggResult(pQueryInfo); pQueryInfo->orderProjectQuery = tscOrderedProjectionQueryOnSTable(pQueryInfo, 0); + pQueryInfo->diffQuery = tscIsDiffQuery(pQueryInfo); SExprInfo** p = NULL; int32_t numOfExpr = 0; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 2763005303..0fd8d6a0c7 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1588,7 +1588,7 @@ int tscProcessLocalRetrieveRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, numOfRes); } -int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) { +int tscProcessRetrieveGlobalMergeRsp(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; SSqlCmd* pCmd = &pSql->cmd; @@ -1615,10 +1615,11 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) { taosArrayPush(group, &tableKeyInfo); taosArrayPush(tableGroupInfo.pGroupList, &group); - pQueryInfo->pQInfo = createQInfoFromQueryNode(pQueryInfo, &tableGroupInfo, NULL, NULL, pRes->pMerger, MERGE_STAGE); + tscDebug("0x%"PRIx64" create QInfo 0x%"PRIx64" to execute query processing", pSql->self, pSql->self); + pQueryInfo->pQInfo = createQInfoFromQueryNode(pQueryInfo, &tableGroupInfo, NULL, NULL, pRes->pMerger, MERGE_STAGE, pSql->self); } - uint64_t localQueryId = 0; + uint64_t localQueryId = pSql->self; qTableQuery(pQueryInfo->pQInfo, &localQueryId); convertQueryResult(pRes, pQueryInfo); @@ -2689,7 +2690,7 @@ void tscInitMsgsFp() { tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp; - tscProcessMsgRsp[TSDB_SQL_RETRIEVE_LOCALMERGE] = tscProcessRetrieveLocalMergeRsp; + tscProcessMsgRsp[TSDB_SQL_RETRIEVE_GLOBALMERGE] = tscProcessRetrieveGlobalMergeRsp; tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp; tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 554ce351eb..1e93892876 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -456,7 +456,7 @@ static bool needToFetchNewBlock(SSqlObj* pSql) { return (pRes->completed != true || hasMoreVnodesToTry(pSql) || hasMoreClauseToTry(pSql)) && (pCmd->command == TSDB_SQL_RETRIEVE || - pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE || + pCmd->command == TSDB_SQL_RETRIEVE_GLOBALMERGE || pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE || pCmd->command == TSDB_SQL_FETCH || pCmd->command == TSDB_SQL_SHOW || diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 4b1b3e9080..431357bf08 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2422,7 +2422,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { // pRes->code check only serves in launching metric sub-queries if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) { - pCmd->command = TSDB_SQL_RETRIEVE_LOCALMERGE; // enable the abort of kill super table function. + pCmd->command = TSDB_SQL_RETRIEVE_GLOBALMERGE; // enable the abort of kill super table function. return pRes->code; } @@ -2778,7 +2778,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p if (code == TSDB_CODE_SUCCESS && trsupport->pExtMemBuffer == NULL) { pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; // no result, set the result empty } else { - pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE; + pParentSql->cmd.command = TSDB_SQL_RETRIEVE_GLOBALMERGE; } tscCreateResPointerInfo(&pParentSql->res, pPQueryInfo); @@ -3500,7 +3500,7 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) { } void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pSourceOperator, - char* sql, void* merger, int32_t stage) { + char* sql, void* merger, int32_t stage, uint64_t qId) { assert(pQueryInfo != NULL); SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo)); if (pQInfo == NULL) { @@ -3509,7 +3509,7 @@ void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGr // to make sure third party won't overwrite this structure pQInfo->signature = pQInfo; - + pQInfo->qId = qId; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryAttr *pQueryAttr = &pQInfo->query; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 8042f032c8..7a9e2d2289 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -222,6 +222,8 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) { functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_ARITHM && functionId != TSDB_FUNC_TS_COMP && + functionId != TSDB_FUNC_DIFF && + functionId != TSDB_FUNC_TS_DUMMY && functionId != TSDB_FUNC_TID_TAG) { return false; } @@ -434,6 +436,23 @@ bool tscIsTWAQuery(SQueryInfo* pQueryInfo) { return false; } + +bool tscIsDiffQuery(SQueryInfo* pQueryInfo) { + size_t num = tscNumOfExprs(pQueryInfo); + for(int32_t i = 0; i < num; ++i) { + SExprInfo* pExpr = tscExprGet(pQueryInfo, i); + if (pExpr == NULL || pExpr->base.functionId == TSDB_FUNC_TS_DUMMY) { + continue; + } + + if (pExpr->base.functionId == TSDB_FUNC_DIFF) { + return true; + } + } + + return false; +} + bool tscIsSessionWindowQuery(SQueryInfo* pQueryInfo) { return pQueryInfo->sessionWindow.gap > 0; } @@ -467,42 +486,12 @@ bool tscNeedReverseScan(SQueryInfo* pQueryInfo) { return false; } -bool isSimpleAggregate(SQueryInfo* pQueryInfo) { - if (pQueryInfo->interval.interval > 0) { - return false; - } - - // Note:top/bottom query is fixed output query - if (tscIsTopBotQuery(pQueryInfo) || tscGroupbyColumn(pQueryInfo) || isTsCompQuery(pQueryInfo)) { - return true; - } - - size_t numOfExprs = tscNumOfExprs(pQueryInfo); - for (int32_t i = 0; i < numOfExprs; ++i) { - SExprInfo* pExpr = tscExprGet(pQueryInfo, i); - if (pExpr == NULL) { - continue; - } - - int32_t functionId = pExpr->base.functionId; - if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY) { - continue; - } - - if (!IS_MULTIOUTPUT(aAggs[functionId].status)) { - return true; - } - } - - return false; -} - bool isSimpleAggregateRv(SQueryInfo* pQueryInfo) { if (pQueryInfo->interval.interval > 0 || pQueryInfo->sessionWindow.gap > 0) { return false; } - if (tscGroupbyColumn(pQueryInfo) || isTsCompQuery(pQueryInfo)) { + if (tscGroupbyColumn(pQueryInfo) || isTsCompQuery(pQueryInfo) || tscIsTopBotQuery(pQueryInfo) || tscIsDiffQuery(pQueryInfo)) { return false; } @@ -812,6 +801,7 @@ static void fetchNextBlockIfCompleted(SOperatorInfo* pOperator, bool* newgroup) for (int32_t i = 0; i < pOperator->numOfUpstream; ++i) { SJoinStatus* pStatus = &pJoinInfo->status[i]; if (pStatus->pBlock == NULL || pStatus->index >= pStatus->pBlock->info.rows) { + tscDebug("Retrieve nest query result, index:%d, total:%d", i, pOperator->numOfUpstream); pStatus->pBlock = pOperator->upstream[i]->exec(pOperator->upstream[i], newgroup); pStatus->index = 0; @@ -1088,7 +1078,9 @@ static void createInputDataFilterInfo(SQueryInfo* px, int32_t numOfCol1, int32_t tfree(tableCols); } -void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQueryInfo* px, SSqlRes* pOutput) { +void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQueryInfo* px, SSqlObj* pSql) { + SSqlRes* pOutput = &pSql->res; + // handle the following query process if (px->pQInfo == NULL) { SColumnInfo* pColumnInfo = extractColumnInfoFromResult(px->colList); @@ -1166,7 +1158,9 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue } } - px->pQInfo = createQInfoFromQueryNode(px, &tableGroupInfo, pSourceOperator, NULL, NULL, MASTER_SCAN); + tscDebug("0x%"PRIx64" create QInfo 0x%"PRIx64" to execute the main query while all nest query is ready", pSql->self, pSql->self); + px->pQInfo = createQInfoFromQueryNode(px, &tableGroupInfo, pSourceOperator, NULL, NULL, MASTER_SCAN, pSql->self); + tfree(pColumnInfo); tfree(schema); @@ -1174,7 +1168,7 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue pSourceOperator->pRuntimeEnv = &px->pQInfo->runtimeEnv; } - uint64_t qId = 0; + uint64_t qId = pSql->self; qTableQuery(px->pQInfo, &qId); convertQueryResult(pOutput, px); } @@ -1372,7 +1366,7 @@ void tscFreeSqlObj(SSqlObj* pSql) { SSqlCmd* pCmd = &pSql->cmd; int32_t cmd = pCmd->command; - if (cmd < TSDB_SQL_INSERT || cmd == TSDB_SQL_RETRIEVE_LOCALMERGE || cmd == TSDB_SQL_RETRIEVE_EMPTY_RESULT || + if (cmd < TSDB_SQL_INSERT || cmd == TSDB_SQL_RETRIEVE_GLOBALMERGE || cmd == TSDB_SQL_RETRIEVE_EMPTY_RESULT || cmd == TSDB_SQL_TABLE_JOIN_RETRIEVE) { tscRemoveFromSqlList(pSql); } @@ -3436,7 +3430,7 @@ void doRetrieveSubqueryData(SSchedMsg *pMsg) { } SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd); - handleDownstreamOperator(pSql->pSubs, pSql->subState.numOfSub, pQueryInfo, &pSql->res); + handleDownstreamOperator(pSql->pSubs, pSql->subState.numOfSub, pQueryInfo, pSql); pSql->res.qId = -1; if (pSql->res.code == TSDB_CODE_SUCCESS) { @@ -4236,10 +4230,11 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt pQueryAttr->hasTagResults = hasTagValOutput(pQueryInfo); pQueryAttr->stabledev = isStabledev(pQueryInfo); pQueryAttr->tsCompQuery = isTsCompQuery(pQueryInfo); - pQueryAttr->simpleAgg = isSimpleAggregate(pQueryInfo); + pQueryAttr->diffQuery = pQueryInfo->diffQuery; + pQueryAttr->simpleAgg = pQueryInfo->simpleAgg; pQueryAttr->needReverseScan = tscNeedReverseScan(pQueryInfo); pQueryAttr->stableQuery = QUERY_IS_STABLE_QUERY(pQueryInfo->type); - pQueryAttr->groupbyColumn = (!pQueryInfo->stateWindow) && tscGroupbyColumn(pQueryInfo); + pQueryAttr->groupbyColumn = (!pQueryInfo->stateWindow) && pQueryInfo->groupbyColumn; pQueryAttr->queryBlockDist = isBlockDistQuery(pQueryInfo); pQueryAttr->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo); pQueryAttr->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo); @@ -4255,7 +4250,6 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt pQueryAttr->fillType = pQueryInfo->fillType; pQueryAttr->havingNum = pQueryInfo->havingFieldNum; - if (pQueryInfo->order.order == TSDB_ORDER_ASC) { // TODO refactor pQueryAttr->window = pQueryInfo->window; } else { diff --git a/src/common/inc/tcmdtype.h b/src/common/inc/tcmdtype.h index adf210cfeb..90426519e6 100644 --- a/src/common/inc/tcmdtype.h +++ b/src/common/inc/tcmdtype.h @@ -76,7 +76,7 @@ enum { // SQL below for client local TSDB_DEFINE_SQL_TYPE( TSDB_SQL_LOCAL, "local" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DESCRIBE_TABLE, "describe-table" ) - TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RETRIEVE_LOCALMERGE, "retrieve-localmerge" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RETRIEVE_GLOBALMERGE, "retrieve-globalmerge" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_TABLE_JOIN_RETRIEVE, "join-retrieve" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SHOW_CREATE_TABLE, "show-create-table") diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index b5b4fb5854..93f2b73d7a 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -185,6 +185,7 @@ typedef struct SQueryAttr { bool queryBlockDist; // if query data block distribution bool stabledev; // super table stddev query bool tsCompQuery; // is tscomp query + bool diffQuery; // is diff query bool simpleAgg; bool pointInterpQuery; // point interpolation query bool needReverseScan; // need reverse scan @@ -386,6 +387,7 @@ typedef struct STableScanInfo { int64_t elapsedTime; int32_t tableIndex; + int32_t prevGroupId; // previous table group id } STableScanInfo; typedef struct STagScanInfo { diff --git a/src/query/inc/qTableMeta.h b/src/query/inc/qTableMeta.h index 4fc252b644..1fd78ed324 100644 --- a/src/query/inc/qTableMeta.h +++ b/src/query/inc/qTableMeta.h @@ -138,6 +138,7 @@ typedef struct SQueryInfo { bool hasFilter; bool onlyTagQuery; bool orderProjectQuery; + bool diffQuery; bool stateWindow; } SQueryInfo; diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index ba6efcabb2..74c95e7281 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -3393,7 +3393,7 @@ enum { }; static bool diff_function_setup(SQLFunctionCtx *pCtx) { - if (function_setup(pCtx)) { + if (!function_setup(pCtx)) { return false; } @@ -4606,7 +4606,7 @@ static void rate_function_f(SQLFunctionCtx *pCtx, int32_t index) { pRateInfo->lastValue = v; pRateInfo->lastKey = primaryKey[index]; - + SET_VAL(pCtx, 1, 1); // set has result flag @@ -4630,7 +4630,7 @@ static void rate_func_copy(SQLFunctionCtx *pCtx) { static void rate_finalizer(SQLFunctionCtx *pCtx) { SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo); - + if (pRateInfo->hasResult != DATA_SET_FLAG) { setNull(pCtx->pOutput, TSDB_DATA_TYPE_DOUBLE, sizeof(double)); return; @@ -5215,7 +5215,7 @@ SAggFunctionInfo aAggs[] = {{ "diff", TSDB_FUNC_DIFF, TSDB_FUNC_INVALID_ID, - TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_NEED_TS, + TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS, diff_function_setup, diff_function, diff_function_f, diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 9e1534eb24..7972649e25 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -2652,7 +2652,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) { longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } - } else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery)) { // stable aggregate, not interval aggregate or normal column aggregate + } else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery)) { // stable aggregate, not interval aggregate or normal column aggregate doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx, pTableScanInfo->rowCellInfoOffset, pTableScanInfo->numOfOutput, pRuntimeEnv->current->groupIndex); @@ -4273,6 +4273,12 @@ static SSDataBlock* doTableScanImpl(void* param, bool* newgroup) { pRuntimeEnv->current = *pTableQueryInfo; doTableQueryInfoTimeWindowCheck(pQueryAttr, *pTableQueryInfo); + + if (pTableScanInfo->prevGroupId != -1 && pTableScanInfo->prevGroupId != (*pTableQueryInfo)->groupIndex) { + *newgroup = true; + } + + pTableScanInfo->prevGroupId = (*pTableQueryInfo)->groupIndex; } // this function never returns error? @@ -4419,6 +4425,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pInfo->reverseTimes = 0; pInfo->order = pRuntimeEnv->pQueryAttr->order.order; pInfo->current = 0; +// pInfo->prevGroupId = -1; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "TableScanOperator"; @@ -4441,6 +4448,7 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE pInfo->reverseTimes = 0; pInfo->order = pRuntimeEnv->pQueryAttr->order.order; pInfo->current = 0; + pInfo->prevGroupId = -1; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "TableSeqScanOperator"; @@ -4545,6 +4553,7 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime pInfo->reverseTimes = reverseTime; pInfo->current = 0; pInfo->order = pRuntimeEnv->pQueryAttr->order.order; +// pInfo->prevGroupId = -1; SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo)); pOptr->name = "DataBlocksOptimizedScanOperator"; @@ -4923,10 +4932,17 @@ static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) { } // Return result of the previous group in the firstly. - if (*newgroup && pRes->info.rows > 0) { - pArithInfo->existDataBlock = pBlock; - clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput); - return pInfo->pRes; + if (*newgroup) { + if (pRes->info.rows > 0) { + pArithInfo->existDataBlock = pBlock; + clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput); + return pInfo->pRes; + } else { // init output buffer for a new group data + for (int32_t j = 0; j < pOperator->numOfOutput; ++j) { + aAggs[pInfo->pCtx[j].functionId].xFinalize(&pInfo->pCtx[j]); + } + initCtxOutputBuffer(pInfo->pCtx, pOperator->numOfOutput); + } } STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; @@ -7442,7 +7458,6 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { doCopyQueryResultToMsg(pQInfo, (int32_t)pRuntimeEnv->outputBuf->info.rows, data); } - pRuntimeEnv->resultInfo.total += pRuntimeEnv->outputBuf->info.rows; qDebug("QInfo:0x%"PRIx64" current numOfRes rows:%d, total:%" PRId64, pQInfo->qId, pRuntimeEnv->outputBuf->info.rows, pRuntimeEnv->resultInfo.total); diff --git a/src/query/src/qPlan.c b/src/query/src/qPlan.c index 24531c7f4e..ee587a515d 100644 --- a/src/query/src/qPlan.c +++ b/src/query/src/qPlan.c @@ -531,7 +531,7 @@ SArray* createTableScanPlan(SQueryAttr* pQueryAttr) { } else { if (pQueryAttr->queryBlockDist) { op = OP_TableBlockInfoScan; - } else if (pQueryAttr->tsCompQuery || pQueryAttr->pointInterpQuery) { + } else if (pQueryAttr->tsCompQuery || pQueryAttr->pointInterpQuery || pQueryAttr->diffQuery) { op = OP_TableSeqScan; } else if (pQueryAttr->needReverseScan) { op = OP_DataBlocksOptScan; @@ -605,7 +605,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) { taosArrayPush(plan, &op); } } else if (pQueryAttr->simpleAgg) { - if (pQueryAttr->stableQuery && !pQueryAttr->tsCompQuery) { + if (pQueryAttr->stableQuery && !pQueryAttr->tsCompQuery && !pQueryAttr->diffQuery) { op = OP_MultiTableAggregate; } else { op = OP_Aggregate; diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index f00d5ceb41..fa2eac6619 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -241,15 +241,16 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) { bool newgroup = false; pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot, &newgroup); + pRuntimeEnv->resultInfo.total += GET_NUM_OF_RESULTS(pRuntimeEnv); if (isQueryKilled(pQInfo)) { qDebug("QInfo:0x%"PRIx64" query is killed", pQInfo->qId); } else if (GET_NUM_OF_RESULTS(pRuntimeEnv) == 0) { - qDebug("QInfo:0x%"PRIx64" over, %u tables queried, %"PRId64" rows are returned", pQInfo->qId, pRuntimeEnv->tableqinfoGroupInfo.numOfTables, + qDebug("QInfo:0x%"PRIx64" over, %u tables queried, total %"PRId64" rows returned", pQInfo->qId, pRuntimeEnv->tableqinfoGroupInfo.numOfTables, pRuntimeEnv->resultInfo.total); } else { - qDebug("QInfo:0x%"PRIx64" query paused, %d rows returned, numOfTotal:%" PRId64 " rows", - pQInfo->qId, GET_NUM_OF_RESULTS(pRuntimeEnv), pRuntimeEnv->resultInfo.total + GET_NUM_OF_RESULTS(pRuntimeEnv)); + qDebug("QInfo:0x%"PRIx64" query paused, %d rows returned, total:%" PRId64 " rows", pQInfo->qId, + GET_NUM_OF_RESULTS(pRuntimeEnv), pRuntimeEnv->resultInfo.total); } return doBuildResCheck(pQInfo); From e6072702ebce4342a71d141aee73ed45c228eac6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 4 Jun 2021 12:47:57 +0800 Subject: [PATCH 02/11] [td-4529] update log and merge develop. --- src/client/src/tscUtil.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 659f8399de..d5693a29bf 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1160,7 +1160,7 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue } } - tscDebug("0x%"PRIx64" create QInfo 0x%"PRIx64" to execute the main query while all nest query is ready", pSql->self, pSql->self); + tscDebug("0x%"PRIx64" create QInfo 0x%"PRIx64" to execute the main query while all nest queries are ready", pSql->self, pSql->self); px->pQInfo = createQInfoFromQueryNode(px, &tableGroupInfo, pSourceOperator, NULL, NULL, MASTER_SCAN, pSql->self); tfree(pColumnInfo); From ae91d43a7fa6e7df784f703b856340bc4f882665 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 4 Jun 2021 13:28:06 +0800 Subject: [PATCH 03/11] [td-4529] --- src/client/src/tscUtil.c | 10 ++++++++++ src/query/src/qExecutor.c | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index d5693a29bf..b621f89d52 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -3273,6 +3273,16 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t pNewQueryInfo->pTableMetaInfo = NULL; pNewQueryInfo->bufLen = pQueryInfo->bufLen; + pNewQueryInfo->projectionQuery = pQueryInfo->projectionQuery; + pNewQueryInfo->hasFilter = pQueryInfo->hasFilter; + pNewQueryInfo->simpleAgg = pQueryInfo->simpleAgg; + pNewQueryInfo->onlyTagQuery = pQueryInfo->onlyTagQuery; + pNewQueryInfo->groupbyColumn = pQueryInfo->groupbyColumn; + + pNewQueryInfo->arithmeticOnAgg = pQueryInfo->arithmeticOnAgg; + pNewQueryInfo->orderProjectQuery = pQueryInfo->orderProjectQuery; + pNewQueryInfo->diffQuery = pQueryInfo->diffQuery; + pNewQueryInfo->buf = malloc(pQueryInfo->bufLen); if (pNewQueryInfo->buf == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 3aae336169..d14189657a 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -4275,7 +4275,7 @@ static SSDataBlock* doTableScanImpl(void* param, bool* newgroup) { doTableQueryInfoTimeWindowCheck(pQueryAttr, *pTableQueryInfo); if (pTableScanInfo->prevGroupId != -1 && pTableScanInfo->prevGroupId != (*pTableQueryInfo)->groupIndex) { - *newgroup = true; + *newgroup = false; } pTableScanInfo->prevGroupId = (*pTableQueryInfo)->groupIndex; From 0e223ea2c13fdf366270cfaab36635572b00e3cf Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 4 Jun 2021 14:26:28 +0800 Subject: [PATCH 04/11] [td-225] fix bugs found by regression test. --- src/client/inc/tscUtil.h | 2 -- src/client/src/tscSQLParser.c | 14 +++++++------- src/client/src/tscUtil.c | 19 ++++--------------- 3 files changed, 11 insertions(+), 24 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 448fee936f..0a57767926 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -329,8 +329,6 @@ STableMeta* tscTableMetaDup(STableMeta* pTableMeta); SVgroupsInfo* tscVgroupsInfoDup(SVgroupsInfo* pVgroupsInfo); int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAttr, void* addr); - -void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchema* pSchema); void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr, int32_t stage, uint64_t qId); void* malloc_throw(size_t size); diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 53bea3f48b..4a80cbd340 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -7812,15 +7812,15 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf } { // set the query info - pQueryInfo->projectionQuery = tscIsProjectionQuery(pQueryInfo); - pQueryInfo->hasFilter = tscHasColumnFilter(pQueryInfo); - pQueryInfo->simpleAgg = isSimpleAggregateRv(pQueryInfo); - pQueryInfo->onlyTagQuery = onlyTagPrjFunction(pQueryInfo); - pQueryInfo->groupbyColumn = tscGroupbyColumn(pQueryInfo); + pQueryInfo->projectionQuery = tscIsProjectionQuery(pQueryInfo); + pQueryInfo->hasFilter = tscHasColumnFilter(pQueryInfo); + pQueryInfo->simpleAgg = isSimpleAggregateRv(pQueryInfo); + pQueryInfo->onlyTagQuery = onlyTagPrjFunction(pQueryInfo); + pQueryInfo->groupbyColumn = tscGroupbyColumn(pQueryInfo); - pQueryInfo->arithmeticOnAgg = tsIsArithmeticQueryOnAggResult(pQueryInfo); + pQueryInfo->arithmeticOnAgg = tsIsArithmeticQueryOnAggResult(pQueryInfo); pQueryInfo->orderProjectQuery = tscOrderedProjectionQueryOnSTable(pQueryInfo, 0); - pQueryInfo->diffQuery = tscIsDiffQuery(pQueryInfo); + pQueryInfo->diffQuery = tscIsDiffQuery(pQueryInfo); SExprInfo** p = NULL; int32_t numOfExpr = 0; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index b621f89d52..627fbbd0ee 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -491,7 +491,7 @@ bool isSimpleAggregateRv(SQueryInfo* pQueryInfo) { return false; } - if (tscGroupbyColumn(pQueryInfo) || isTsCompQuery(pQueryInfo) || tscIsTopBotQuery(pQueryInfo) || tscIsDiffQuery(pQueryInfo)) { + if (/*tscGroupbyColumn(pQueryInfo) || */isTsCompQuery(pQueryInfo) || tscIsTopBotQuery(pQueryInfo) || tscIsDiffQuery(pQueryInfo)) { return false; } @@ -3272,17 +3272,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t pNewQueryInfo->numOfTables = 0; pNewQueryInfo->pTableMetaInfo = NULL; pNewQueryInfo->bufLen = pQueryInfo->bufLen; - - pNewQueryInfo->projectionQuery = pQueryInfo->projectionQuery; - pNewQueryInfo->hasFilter = pQueryInfo->hasFilter; - pNewQueryInfo->simpleAgg = pQueryInfo->simpleAgg; - pNewQueryInfo->onlyTagQuery = pQueryInfo->onlyTagQuery; - pNewQueryInfo->groupbyColumn = pQueryInfo->groupbyColumn; - - pNewQueryInfo->arithmeticOnAgg = pQueryInfo->arithmeticOnAgg; - pNewQueryInfo->orderProjectQuery = pQueryInfo->orderProjectQuery; - pNewQueryInfo->diffQuery = pQueryInfo->diffQuery; - pNewQueryInfo->buf = malloc(pQueryInfo->bufLen); if (pNewQueryInfo->buf == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -4242,11 +4231,11 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt pQueryAttr->hasTagResults = hasTagValOutput(pQueryInfo); pQueryAttr->stabledev = isStabledev(pQueryInfo); pQueryAttr->tsCompQuery = isTsCompQuery(pQueryInfo); - pQueryAttr->diffQuery = pQueryInfo->diffQuery; - pQueryAttr->simpleAgg = pQueryInfo->simpleAgg; + pQueryAttr->diffQuery = tscIsDiffQuery(pQueryInfo); + pQueryAttr->simpleAgg = isSimpleAggregateRv(pQueryInfo); pQueryAttr->needReverseScan = tscNeedReverseScan(pQueryInfo); pQueryAttr->stableQuery = QUERY_IS_STABLE_QUERY(pQueryInfo->type); - pQueryAttr->groupbyColumn = (!pQueryInfo->stateWindow) && pQueryInfo->groupbyColumn; + pQueryAttr->groupbyColumn = (!pQueryInfo->stateWindow) && tscGroupbyColumn(pQueryInfo); pQueryAttr->queryBlockDist = isBlockDistQuery(pQueryInfo); pQueryAttr->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo); pQueryAttr->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo); From 89abdd980927fcfc5f5aed3713968563b9f8893d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 6 Jun 2021 15:20:19 +0800 Subject: [PATCH 05/11] [td-4555]: add derivative function --- src/client/inc/tscUtil.h | 2 +- src/client/src/tscSQLParser.c | 40 +- src/client/src/tscServer.c | 2 +- src/client/src/tscSubquery.c | 3 +- src/client/src/tscUtil.c | 12 +- src/query/inc/qAggMain.h | 22 +- src/query/inc/qExecutor.h | 1 + src/query/src/qAggMain.c | 358 ++++++++++++++++-- src/query/src/qExecutor.c | 13 +- .../general/parser/join_multitables.sim | 11 - tests/script/general/parser/testSuite.sim | 1 + 11 files changed, 383 insertions(+), 82 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 0a57767926..427431d3c0 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -214,7 +214,7 @@ void tscColumnListDestroy(SArray* pColList); void tscColumnListCopy(SArray* dst, const SArray* src, uint64_t tableUid); void tscColumnListCopyAll(SArray* dst, const SArray* src); -void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo); +void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo, uint64_t objId); void tscDequoteAndTrimToken(SStrToken* pToken); int32_t tscValidateName(SStrToken* pToken); diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 4a80cbd340..d48f193e07 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -2157,11 +2157,14 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col case TSDB_FUNC_MIN: case TSDB_FUNC_MAX: case TSDB_FUNC_DIFF: + case TSDB_FUNC_DERIVATIVE: case TSDB_FUNC_STDDEV: case TSDB_FUNC_LEASTSQR: { // 1. valid the number of parameters - if (pItem->pNode->pParam == NULL || (functionId != TSDB_FUNC_LEASTSQR && taosArrayGetSize(pItem->pNode->pParam) != 1) || - (functionId == TSDB_FUNC_LEASTSQR && taosArrayGetSize(pItem->pNode->pParam) != 3)) { + int32_t numOfParams = taosArrayGetSize(pItem->pNode->pParam); + if (pItem->pNode->pParam == NULL || + (functionId != TSDB_FUNC_LEASTSQR && functionId != TSDB_FUNC_DERIVATIVE && numOfParams != 1) || + ((functionId == TSDB_FUNC_LEASTSQR || functionId == TSDB_FUNC_DERIVATIVE) && numOfParams != 3)) { /* no parameters or more than one parameter for function */ return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); } @@ -2182,11 +2185,13 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col // 2. check if sql function can be applied on this column data type pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); + STableComInfo info = tscGetTableInfo(pTableMetaInfo->pTableMeta); + SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex); if (!IS_NUMERIC_TYPE(pSchema->type)) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); - } else if (IS_UNSIGNED_NUMERIC_TYPE(pSchema->type) && functionId == TSDB_FUNC_DIFF) { + } else if (IS_UNSIGNED_NUMERIC_TYPE(pSchema->type) && (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE)) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg9); } @@ -2200,11 +2205,11 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col } // set the first column ts for diff query - if (functionId == TSDB_FUNC_DIFF) { + if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) { colIndex += 1; SColumnIndex indexTS = {.tableIndex = index.tableIndex, .columnIndex = 0}; - SExprInfo* pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &indexTS, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, - getNewResColId(pCmd), TSDB_KEYSIZE, false); + SExprInfo* pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &indexTS, TSDB_DATA_TYPE_TIMESTAMP, + TSDB_KEYSIZE, getNewResColId(pCmd), TSDB_KEYSIZE, false); SColumnList ids = createColumnList(1, 0, 0); insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS_DUMMY].name, pExpr); @@ -2230,12 +2235,29 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col return TSDB_CODE_TSC_INVALID_OPERATION; } - tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_DOUBLE, sizeof(double)); + tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_DOUBLE, DOUBLE_BYTES); } else if (functionId == TSDB_FUNC_IRATE) { - STableComInfo info = tscGetTableInfo(pTableMetaInfo->pTableMeta); int64_t prec = info.precision; - tscExprAddParams(&pExpr->base, (char*)&prec, TSDB_DATA_TYPE_BIGINT, LONG_BYTES); + } else if (functionId == TSDB_FUNC_DERIVATIVE) { + char val[8] = {0}; + + int64_t tickPerSec = 0; + if (tVariantDump(&pParamElem[1].pNode->value, (char*) &tickPerSec, TSDB_DATA_TYPE_BIGINT, true) < 0) { + return TSDB_CODE_TSC_INVALID_OPERATION; + } + + if (info.precision == TSDB_TIME_PRECISION_MILLI) { + tickPerSec /= 1000; + } + + tscExprAddParams(&pExpr->base, (char*) &tickPerSec, TSDB_DATA_TYPE_BIGINT, LONG_BYTES); + memset(val, 0, tListLen(val)); + if (tVariantDump(&pParamElem[2].pNode->value, val, TSDB_DATA_TYPE_BIGINT, true) < 0) { + return TSDB_CODE_TSC_INVALID_OPERATION; + } + + tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_BIGINT, LONG_BYTES); } SColumnList ids = createColumnList(1, index.tableIndex, index.columnIndex); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 5f2de43ef0..880c58aa1c 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1621,7 +1621,7 @@ int tscProcessRetrieveGlobalMergeRsp(SSqlObj *pSql) { uint64_t localQueryId = pSql->self; qTableQuery(pQueryInfo->pQInfo, &localQueryId); - convertQueryResult(pRes, pQueryInfo); + convertQueryResult(pRes, pQueryInfo, pSql->self); code = pRes->code; if (pRes->code == TSDB_CODE_SUCCESS) { diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index cf732a8f20..49ce738545 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1977,9 +1977,8 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { } memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub); - tscDebug("0x%"PRIx64" reset all sub states to 0", pSql->self); + tscDebug("0x%"PRIx64" reset all sub states to 0, start subquery, total:%d", pSql->self, pQueryInfo->numOfTables); - tscDebug("0x%"PRIx64" start subquery, total:%d", pSql->self, pQueryInfo->numOfTables); for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, i); if (pSupporter == NULL) { // failed to create support struct, abort current query diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 627fbbd0ee..46d49bf68d 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -491,7 +491,7 @@ bool isSimpleAggregateRv(SQueryInfo* pQueryInfo) { return false; } - if (/*tscGroupbyColumn(pQueryInfo) || */isTsCompQuery(pQueryInfo) || tscIsTopBotQuery(pQueryInfo) || tscIsDiffQuery(pQueryInfo)) { + if (tscIsDiffQuery(pQueryInfo)) { return false; } @@ -507,13 +507,13 @@ bool isSimpleAggregateRv(SQueryInfo* pQueryInfo) { continue; } - if (!IS_MULTIOUTPUT(aAggs[functionId].status)) { + if ((!IS_MULTIOUTPUT(aAggs[functionId].status)) || + (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_TS_COMP)) { return true; } } return false; - } bool isBlockDistQuery(SQueryInfo* pQueryInfo) { @@ -1046,7 +1046,7 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUp return pOperator; } -void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo) { +void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo, uint64_t objId) { // set the correct result SSDataBlock* p = pQueryInfo->pQInfo->runtimeEnv.outputBuf; pRes->numOfRows = (p != NULL)? p->info.rows: 0; @@ -1056,6 +1056,7 @@ void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo) { tscSetResRawPtrRv(pRes, pQueryInfo, p); } + tscDebug("0x%"PRIx64" retrieve result in pRes, numOfRows:%d", objId, pRes->numOfRows); pRes->row = 0; pRes->completed = (pRes->numOfRows == 0); } @@ -1172,7 +1173,7 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue uint64_t qId = pSql->self; qTableQuery(px->pQInfo, &qId); - convertQueryResult(pOutput, px); + convertQueryResult(pOutput, px, pSql->self); } static void tscDestroyResPointerInfo(SSqlRes* pRes) { @@ -2171,6 +2172,7 @@ size_t tscNumOfExprs(SQueryInfo* pQueryInfo) { return taosArrayGetSize(pQueryInfo->exprList); } +// todo REFACTOR void tscExprAddParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes) { assert (pExpr != NULL || argument != NULL || bytes != 0); diff --git a/src/query/inc/qAggMain.h b/src/query/inc/qAggMain.h index 47c61fc444..57e7d2982f 100644 --- a/src/query/inc/qAggMain.h +++ b/src/query/inc/qAggMain.h @@ -66,17 +66,19 @@ extern "C" { #define TSDB_FUNC_RATE 29 #define TSDB_FUNC_IRATE 30 #define TSDB_FUNC_TID_TAG 31 -#define TSDB_FUNC_BLKINFO 32 +#define TSDB_FUNC_DERIVATIVE 32 +#define TSDB_FUNC_BLKINFO 33 -#define TSDB_FUNC_HISTOGRAM 33 -#define TSDB_FUNC_HLL 34 -#define TSDB_FUNC_MODE 35 -#define TSDB_FUNC_SAMPLE 36 -#define TSDB_FUNC_CEIL 37 -#define TSDB_FUNC_FLOOR 38 -#define TSDB_FUNC_ROUND 39 -#define TSDB_FUNC_MAVG 40 -#define TSDB_FUNC_CSUM 41 + +#define TSDB_FUNC_HISTOGRAM 34 +#define TSDB_FUNC_HLL 35 +#define TSDB_FUNC_MODE 36 +#define TSDB_FUNC_SAMPLE 37 +#define TSDB_FUNC_CEIL 38 +#define TSDB_FUNC_FLOOR 39 +#define TSDB_FUNC_ROUND 40 +#define TSDB_FUNC_MAVG 41 +#define TSDB_FUNC_CSUM 42 #define TSDB_FUNCSTATE_SO 0x1u // single output #define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 93f2b73d7a..7e76bd7622 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -246,6 +246,7 @@ typedef struct SQueryRuntimeEnv { void* pQueryHandle; int32_t prevGroupId; // previous executed group id + bool enableGroupData; SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file SHashObj* pResultRowHashTable; // quick locate the window object for each result char* keyBuf; // window key buffer diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index 74c95e7281..dc1829930c 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -161,6 +161,14 @@ typedef struct SRateInfo { bool isIRate; // true for IRate functions, false for Rate functions } SRateInfo; +typedef struct SDerivInfo { + double prevValue; // previous value + TSKEY prevTs; // previous timestamp + bool ignoreNegative;// ignore the negative value + int64_t tsWindow; // time window for derivative + bool valueSet; // the value has been set already +} SDerivInfo; + int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type, int16_t *bytes, int32_t *interBytes, int16_t extLength, bool isSuperTable) { if (!isValidDataType(dataType)) { @@ -189,7 +197,9 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI *bytes = (int16_t)(dataBytes + sizeof(int16_t) + sizeof(int64_t) + sizeof(int32_t) + sizeof(int32_t) + VARSTR_HEADER_SIZE); *interBytes = 0; return TSDB_CODE_SUCCESS; - } else if (functionId == TSDB_FUNC_BLKINFO) { + } + + if (functionId == TSDB_FUNC_BLKINFO) { *type = TSDB_DATA_TYPE_BINARY; *bytes = 16384; *interBytes = 0; @@ -216,7 +226,15 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI *interBytes = POINTER_BYTES; return TSDB_CODE_SUCCESS; } - + + if (functionId == TSDB_FUNC_DERIVATIVE) { + *type = TSDB_DATA_TYPE_DOUBLE; + *bytes = sizeof(double); // this results is compressed ts data, only one byte + *interBytes = sizeof(SDerivInfo); + return TSDB_CODE_SUCCESS; + } + + //TODO twa function definit error. if (isSuperTable) { if (functionId == TSDB_FUNC_MIN || functionId == TSDB_FUNC_MAX) { *type = TSDB_DATA_TYPE_BINARY; @@ -3402,16 +3420,266 @@ static bool diff_function_setup(SQLFunctionCtx *pCtx) { return false; } +#define DIFF_IMPL(ctx, d, type) \ + do { \ + if ((ctx)->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { \ + (ctx)->param[1].nType = (ctx)->inputType; \ + *(type *)&(ctx)->param[1].i64 = *(type *)(d); \ + } else { \ + *(type *)(ctx)->pOutput = *(type *)(d) - (*(type *)(&(ctx)->param[1].i64)); \ + *(type *)(&(ctx)->param[1].i64) = *(type *)(d); \ + *(int64_t *)(ctx)->ptsOutputBuf = GET_TS_DATA(ctx, index); \ + } \ + } while (0); + +static bool deriv_function_setup(SQLFunctionCtx *pCtx) { + if (!function_setup(pCtx)) { + return false; + } + + // diff function require the value is set to -1 + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + SDerivInfo* pDerivInfo = GET_ROWCELL_INTERBUF(pResInfo); + + pDerivInfo->ignoreNegative = pCtx->param[2].i64; + pDerivInfo->prevTs = -1; + pDerivInfo->tsWindow = pCtx->param[0].i64; + pDerivInfo->valueSet = false; + return false; +} + +static void deriv_function(SQLFunctionCtx *pCtx) { + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + SDerivInfo* pDerivInfo = GET_ROWCELL_INTERBUF(pResInfo); + + void *data = GET_INPUT_DATA_LIST(pCtx); + bool isFirstBlock = (pDerivInfo->valueSet == false); + + int32_t notNullElems = 0; + + int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order); + int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1; + + TSKEY *pTimestamp = pCtx->ptsOutputBuf; + TSKEY *tsList = GET_TS_LIST(pCtx); + + double *pOutput = (double *)pCtx->pOutput; + + switch (pCtx->inputType) { + case TSDB_DATA_TYPE_INT: { + int32_t *pData = (int32_t *)data; + for (; i < pCtx->size && i >= 0; i += step) { + if (pCtx->hasNull && isNull((const char *)&pData[i], pCtx->inputType)) { + continue; + } + + if (!pDerivInfo->valueSet) { // initial value is not set yet + pDerivInfo->valueSet = true; + } else { + *pOutput = ((pData[i] - pDerivInfo->prevValue) * pDerivInfo->tsWindow) / (tsList[i] - pDerivInfo->prevTs); + if (pDerivInfo->ignoreNegative && *pOutput < 0) { + } else { + *pTimestamp = tsList[i]; + + pOutput += 1; + pTimestamp += 1; + } + } + + pDerivInfo->prevValue = pData[i]; + pDerivInfo->prevTs = tsList[i]; + notNullElems++; + } + + break; + }; + + case TSDB_DATA_TYPE_BIGINT: { + int64_t *pData = (int64_t *)data; + for (; i < pCtx->size && i >= 0; i += step) { + if (pCtx->hasNull && isNull((const char *)&pData[i], pCtx->inputType)) { + continue; + } + + if (!pDerivInfo->valueSet) { // initial value is not set yet + pDerivInfo->valueSet = true; + } else { + *pOutput = ((pData[i] - pDerivInfo->prevValue) * pDerivInfo->tsWindow) / (tsList[i] - pDerivInfo->prevTs); + if (pDerivInfo->ignoreNegative && *pOutput < 0) { + } else { + *pTimestamp = tsList[i]; + + pOutput += 1; + pTimestamp += 1; + } + } + + pDerivInfo->prevValue = pData[i]; + pDerivInfo->prevTs = tsList[i]; + notNullElems++; + } + break; + } + case TSDB_DATA_TYPE_DOUBLE: { + double *pData = (double *)data; + + for (; i < pCtx->size && i >= 0; i += step) { + if (pCtx->hasNull && isNull((const char *)&pData[i], pCtx->inputType)) { + continue; + } + + if (!pDerivInfo->valueSet) { // initial value is not set yet + pDerivInfo->valueSet = true; + } else { + *pOutput = ((pData[i] - pDerivInfo->prevValue) * pDerivInfo->tsWindow) / (tsList[i] - pDerivInfo->prevTs); + if (pDerivInfo->ignoreNegative && *pOutput < 0) { + } else { + *pTimestamp = tsList[i]; + + pOutput += 1; + pTimestamp += 1; + } + } + + pDerivInfo->prevValue = pData[i]; + pDerivInfo->prevTs = tsList[i]; + notNullElems++; + } + break; + } + + case TSDB_DATA_TYPE_FLOAT: { + float *pData = (float *)data; + + for (; i < pCtx->size && i >= 0; i += step) { + if (pCtx->hasNull && isNull((const char *)&pData[i], pCtx->inputType)) { + continue; + } + + if (!pDerivInfo->valueSet) { // initial value is not set yet + pDerivInfo->valueSet = true; + } else { + *pOutput = ((pData[i] - pDerivInfo->prevValue) * pDerivInfo->tsWindow) / (tsList[i] - pDerivInfo->prevTs); + if (pDerivInfo->ignoreNegative && *pOutput < 0) { + } else { + *pTimestamp = tsList[i]; + + pOutput += 1; + pTimestamp += 1; + } + } + + pDerivInfo->prevValue = pData[i]; + pDerivInfo->prevTs = tsList[i]; + notNullElems++; + } + break; + } + + case TSDB_DATA_TYPE_SMALLINT: { + int16_t *pData = (int16_t *)data; + for (; i < pCtx->size && i >= 0; i += step) { + if (pCtx->hasNull && isNull((const char *)&pData[i], pCtx->inputType)) { + continue; + } + + if (!pDerivInfo->valueSet) { // initial value is not set yet + pDerivInfo->valueSet = true; + } else { + *pOutput = ((pData[i] - pDerivInfo->prevValue) * pDerivInfo->tsWindow) / (tsList[i] - pDerivInfo->prevTs); + if (pDerivInfo->ignoreNegative && *pOutput < 0) { + } else { + *pTimestamp = tsList[i]; + + pOutput += 1; + pTimestamp += 1; + } + } + + pDerivInfo->prevValue = pData[i]; + pDerivInfo->prevTs = tsList[i]; + notNullElems++; + } + break; + } + + case TSDB_DATA_TYPE_TINYINT: { + int8_t *pData = (int8_t *)data; + for (; i < pCtx->size && i >= 0; i += step) { + if (pCtx->hasNull && isNull((char *)&pData[i], pCtx->inputType)) { + continue; + } + + if (!pDerivInfo->valueSet) { // initial value is not set yet + pDerivInfo->valueSet = true; + } else { + *pOutput = ((pData[i] - pDerivInfo->prevValue) * pDerivInfo->tsWindow) / (tsList[i] - pDerivInfo->prevTs); + if (pDerivInfo->ignoreNegative && *pOutput < 0) { + } else { + *pTimestamp = tsList[i]; + + pOutput += 1; + pTimestamp += 1; + } + } + + pDerivInfo->prevValue = pData[i]; + pDerivInfo->prevTs = tsList[i]; + notNullElems++; + } + break; + } + default: + qError("error input type"); + } + + // initial value is not set yet, all data block are null + if (!pDerivInfo->valueSet || notNullElems <= 0) { + /* + * 1. current block and blocks before are full of null + * 2. current block may be null value + */ + assert(pCtx->hasNull); + } else { + int32_t forwardStep = (isFirstBlock) ? notNullElems - 1 : notNullElems; + GET_RES_INFO(pCtx)->numOfRes += forwardStep; + } +} + +#define DIFF_IMPL(ctx, d, type) \ + do { \ + if ((ctx)->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { \ + (ctx)->param[1].nType = (ctx)->inputType; \ + *(type *)&(ctx)->param[1].i64 = *(type *)(d); \ + } else { \ + *(type *)(ctx)->pOutput = *(type *)(d) - (*(type *)(&(ctx)->param[1].i64)); \ + *(type *)(&(ctx)->param[1].i64) = *(type *)(d); \ + *(int64_t *)(ctx)->ptsOutputBuf = GET_TS_DATA(ctx, index); \ + } \ + } while (0); + +#define DIFF_IMPL(ctx, d, type) \ + do { \ + if ((ctx)->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { \ + (ctx)->param[1].nType = (ctx)->inputType; \ + *(type *)&(ctx)->param[1].i64 = *(type *)(d); \ + } else { \ + *(type *)(ctx)->pOutput = *(type *)(d) - (*(type *)(&(ctx)->param[1].i64)); \ + *(type *)(&(ctx)->param[1].i64) = *(type *)(d); \ + *(int64_t *)(ctx)->ptsOutputBuf = GET_TS_DATA(ctx, index); \ + } \ + } while (0); + // TODO difference in date column static void diff_function(SQLFunctionCtx *pCtx) { void *data = GET_INPUT_DATA_LIST(pCtx); bool isFirstBlock = (pCtx->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED); - + int32_t notNullElems = 0; - + int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order); int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1; - + TSKEY* pTimestamp = pCtx->ptsOutputBuf; TSKEY* tsList = GET_TS_LIST(pCtx); @@ -3419,29 +3687,29 @@ static void diff_function(SQLFunctionCtx *pCtx) { case TSDB_DATA_TYPE_INT: { int32_t *pData = (int32_t *)data; int32_t *pOutput = (int32_t *)pCtx->pOutput; - + for (; i < pCtx->size && i >= 0; i += step) { if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { continue; } - + if (pCtx->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet pCtx->param[1].i64 = pData[i]; pCtx->param[1].nType = pCtx->inputType; } else if ((i == 0 && pCtx->order == TSDB_ORDER_ASC) || (i == pCtx->size - 1 && pCtx->order == TSDB_ORDER_DESC)) { *pOutput = (int32_t)(pData[i] - pCtx->param[1].i64); *pTimestamp = tsList[i]; - + pOutput += 1; pTimestamp += 1; } else { *pOutput = (int32_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null *pTimestamp = tsList[i]; - + pOutput += 1; pTimestamp += 1; } - + pCtx->param[1].i64 = pData[i]; pCtx->param[1].nType = pCtx->inputType; notNullElems++; @@ -3451,29 +3719,29 @@ static void diff_function(SQLFunctionCtx *pCtx) { case TSDB_DATA_TYPE_BIGINT: { int64_t *pData = (int64_t *)data; int64_t *pOutput = (int64_t *)pCtx->pOutput; - + for (; i < pCtx->size && i >= 0; i += step) { if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { continue; } - + if (pCtx->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet pCtx->param[1].i64 = pData[i]; pCtx->param[1].nType = pCtx->inputType; } else if ((i == 0 && pCtx->order == TSDB_ORDER_ASC) || (i == pCtx->size - 1 && pCtx->order == TSDB_ORDER_DESC)) { *pOutput = pData[i] - pCtx->param[1].i64; *pTimestamp = tsList[i]; - + pOutput += 1; pTimestamp += 1; } else { *pOutput = pData[i] - pCtx->param[1].i64; *pTimestamp = tsList[i]; - + pOutput += 1; pTimestamp += 1; } - + pCtx->param[1].i64 = pData[i]; pCtx->param[1].nType = pCtx->inputType; notNullElems++; @@ -3483,12 +3751,12 @@ static void diff_function(SQLFunctionCtx *pCtx) { case TSDB_DATA_TYPE_DOUBLE: { double *pData = (double *)data; double *pOutput = (double *)pCtx->pOutput; - + for (; i < pCtx->size && i >= 0; i += step) { if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { continue; } - + if (pCtx->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet pCtx->param[1].dKey = pData[i]; pCtx->param[1].nType = pCtx->inputType; @@ -3503,7 +3771,7 @@ static void diff_function(SQLFunctionCtx *pCtx) { pOutput += 1; pTimestamp += 1; } - + pCtx->param[1].dKey = pData[i]; pCtx->param[1].nType = pCtx->inputType; notNullElems++; @@ -3513,29 +3781,29 @@ static void diff_function(SQLFunctionCtx *pCtx) { case TSDB_DATA_TYPE_FLOAT: { float *pData = (float *)data; float *pOutput = (float *)pCtx->pOutput; - + for (; i < pCtx->size && i >= 0; i += step) { if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { continue; } - + if (pCtx->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet pCtx->param[1].dKey = pData[i]; pCtx->param[1].nType = pCtx->inputType; } else if ((i == 0 && pCtx->order == TSDB_ORDER_ASC) || (i == pCtx->size - 1 && pCtx->order == TSDB_ORDER_DESC)) { *pOutput = (float)(pData[i] - pCtx->param[1].dKey); *pTimestamp = tsList[i]; - + pOutput += 1; pTimestamp += 1; } else { *pOutput = (float)(pData[i] - pCtx->param[1].dKey); *pTimestamp = tsList[i]; - + pOutput += 1; pTimestamp += 1; } - + // keep the last value, the remain may be all null pCtx->param[1].dKey = pData[i]; pCtx->param[1].nType = pCtx->inputType; @@ -3546,12 +3814,12 @@ static void diff_function(SQLFunctionCtx *pCtx) { case TSDB_DATA_TYPE_SMALLINT: { int16_t *pData = (int16_t *)data; int16_t *pOutput = (int16_t *)pCtx->pOutput; - + for (; i < pCtx->size && i >= 0; i += step) { if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { continue; } - + if (pCtx->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet pCtx->param[1].i64 = pData[i]; pCtx->param[1].nType = pCtx->inputType; @@ -3563,11 +3831,11 @@ static void diff_function(SQLFunctionCtx *pCtx) { } else { *pOutput = (int16_t)(pData[i] - pCtx->param[1].i64); *pTimestamp = tsList[i]; - + pOutput += 1; pTimestamp += 1; } - + pCtx->param[1].i64 = pData[i]; pCtx->param[1].nType = pCtx->inputType; notNullElems++; @@ -3577,29 +3845,29 @@ static void diff_function(SQLFunctionCtx *pCtx) { case TSDB_DATA_TYPE_TINYINT: { int8_t *pData = (int8_t *)data; int8_t *pOutput = (int8_t *)pCtx->pOutput; - + for (; i < pCtx->size && i >= 0; i += step) { if (pCtx->hasNull && isNull((char *)&pData[i], pCtx->inputType)) { continue; } - + if (pCtx->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet pCtx->param[1].i64 = pData[i]; pCtx->param[1].nType = pCtx->inputType; } else if ((i == 0 && pCtx->order == TSDB_ORDER_ASC) || (i == pCtx->size - 1 && pCtx->order == TSDB_ORDER_DESC)) { *pOutput = (int8_t)(pData[i] - pCtx->param[1].i64); *pTimestamp = tsList[i]; - + pOutput += 1; pTimestamp += 1; } else { *pOutput = (int8_t)(pData[i] - pCtx->param[1].i64); *pTimestamp = tsList[i]; - + pOutput += 1; pTimestamp += 1; } - + pCtx->param[1].i64 = pData[i]; pCtx->param[1].nType = pCtx->inputType; notNullElems++; @@ -3609,7 +3877,7 @@ static void diff_function(SQLFunctionCtx *pCtx) { default: qError("error input type"); } - + // initial value is not set yet if (pCtx->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED || notNullElems <= 0) { /* @@ -3619,7 +3887,7 @@ static void diff_function(SQLFunctionCtx *pCtx) { assert(pCtx->hasNull); } else { int32_t forwardStep = (isFirstBlock) ? notNullElems - 1 : notNullElems; - + GET_RES_INFO(pCtx)->numOfRes += forwardStep; } } @@ -3641,14 +3909,14 @@ static void diff_function_f(SQLFunctionCtx *pCtx, int32_t index) { if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { return; } - + // the output start from the second source element if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is set GET_RES_INFO(pCtx)->numOfRes += 1; } - + int32_t step = 1/*GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/; - + switch (pCtx->inputType) { case TSDB_DATA_TYPE_INT: { if (pCtx->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet @@ -3684,7 +3952,7 @@ static void diff_function_f(SQLFunctionCtx *pCtx, int32_t index) { default: qError("error input type"); } - + if (GET_RES_INFO(pCtx)->numOfRes > 0) { pCtx->pOutput += pCtx->outputBytes * step; pCtx->ptsOutputBuf = (char *)pCtx->ptsOutputBuf + TSDB_KEYSIZE * step; @@ -5315,8 +5583,20 @@ SAggFunctionInfo aAggs[] = {{ noop1, dataBlockRequired, }, + { //32 + "derivative", // return table id and the corresponding tags for join match and subscribe + TSDB_FUNC_DERIVATIVE, + TSDB_FUNC_INVALID_ID, + TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS, + deriv_function_setup, + deriv_function, + noop2, + doFinalizer, + noop1, + dataBlockRequired, + }, { - // 32 + // 33 "_block_dist", // return table id and the corresponding tags for join match and subscribe TSDB_FUNC_BLKINFO, TSDB_FUNC_BLKINFO, diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index d14189657a..338861144e 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1681,6 +1681,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; pRuntimeEnv->prevGroupId = INT32_MIN; + pRuntimeEnv->enableGroupData = false; + pRuntimeEnv->pQueryAttr = pQueryAttr; pRuntimeEnv->pResultRowHashTable = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); @@ -3119,8 +3121,8 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i assert(pCtx[i].pOutput != NULL); // set the timestamp output buffer for top/bottom/diff query - int32_t functionId = pCtx[i].functionId; - if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { + int32_t fid = pCtx[i].functionId; + if (fid == TSDB_FUNC_TOP || fid == TSDB_FUNC_BOTTOM || fid == TSDB_FUNC_DIFF || fid == TSDB_FUNC_DERIVATIVE) { pCtx[i].ptsOutputBuf = pCtx[0].pOutput; } } @@ -4274,8 +4276,10 @@ static SSDataBlock* doTableScanImpl(void* param, bool* newgroup) { pRuntimeEnv->current = *pTableQueryInfo; doTableQueryInfoTimeWindowCheck(pQueryAttr, *pTableQueryInfo); - if (pTableScanInfo->prevGroupId != -1 && pTableScanInfo->prevGroupId != (*pTableQueryInfo)->groupIndex) { - *newgroup = false; + if (pRuntimeEnv->enableGroupData) { + if(pTableScanInfo->prevGroupId != -1 && pTableScanInfo->prevGroupId != (*pTableQueryInfo)->groupIndex) { + *newgroup = true; + } } pTableScanInfo->prevGroupId = (*pTableQueryInfo)->groupIndex; @@ -4449,6 +4453,7 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE pInfo->order = pRuntimeEnv->pQueryAttr->order.order; pInfo->current = 0; pInfo->prevGroupId = -1; + pRuntimeEnv->enableGroupData = true; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "TableSeqScanOperator"; diff --git a/tests/script/general/parser/join_multitables.sim b/tests/script/general/parser/join_multitables.sim index acb8be10e7..d675499640 100644 --- a/tests/script/general/parser/join_multitables.sim +++ b/tests/script/general/parser/join_multitables.sim @@ -1811,10 +1811,6 @@ if $data09 != 3 then return -1 endi - - - - sql select st0.*,st1.* from st0, st1 where st1.id1=st0.id1 and st0.ts=st1.ts and st1.ts=st0.ts and st0.id1=st1.id1 order by st0.ts limit 5 offset 5 if $rows != 5 then return -1 @@ -2294,7 +2290,6 @@ if $data19 != 9925 then return -1 endi - sql_error select tb0_1.*, tb1_1.* from tb0_1, tb1_1 where tb0_1.f1=tb1_1.f1; sql_error select tb0_1.*, tb1_1.* from tb0_1, tb1_1 where tb0_1.ts=tb1_1.ts and tb0_1.id1=tb1_1.id2; sql_error select tb0_5.*, tb1_5.*,tb2_5.*,tb3_5.*,tb4_5.*,tb5_5.*, tb6_5.*,tb7_5.*,tb8_5.*,tb9_5.*,tba_5.* from tb0_5, tb1_5, tb2_5, tb3_5, tb4_5,tb5_5, tb6_5, tb7_5, tb8_5, tb9_5, tba_5 where tb9_5.ts=tb8_5.ts and tb8_5.ts=tb7_5.ts and tb7_5.ts=tb6_5.ts and tb6_5.ts=tb5_5.ts and tb5_5.ts=tb4_5.ts and tb4_5.ts=tb3_5.ts and tb3_5.ts=tb2_5.ts and tb2_5.ts=tb1_5.ts and tb1_5.ts=tb0_5.ts and tb0_5.ts=tba_5.ts; @@ -2317,10 +2312,4 @@ sql_error select last(*) from st0, st1 where st0.ts=st1.ts and st0.id1=st1.id1 g sql_error select st0.*,st1.*,st2.*,st3.*,st4.*,st5.*,st6.*,st7.*,st8.*,st9.* from st0,st1,st2,st3,st4,st5,st6,st7,st8,st9 where st0.ts=st2.ts and st0.ts=st4.ts and st0.ts=st6.ts and st0.ts=st8.ts and st1.ts=st3.ts and st3.ts=st5.ts and st5.ts=st7.ts and st7.ts=st9.ts and st0.id1=st2.id1 and st0.id1=st4.id1 and st0.id1=st6.id1 and st0.id1=st8.id1 and st1.id1=st3.id1 and st3.id1=st5.id1 and st5.id1=st7.id1 and st7.id1=st9.id1; sql_error select st0.*,st1.*,st2.*,st3.*,st4.*,st5.*,st6.*,st7.*,st8.*,st9.* from st0,st1,st2,st3,st4,st5,st6,st7,st8,st9,sta where st0.ts=st2.ts and st0.ts=st4.ts and st0.ts=st6.ts and st0.ts=st8.ts and st1.ts=st3.ts and st3.ts=st5.ts and st5.ts=st7.ts and st7.ts=st9.ts and st0.ts=st1.ts and st0.id1=st2.id1 and st0.id1=st4.id1 and st0.id1=st6.id1 and st0.id1=st8.id1 and st1.id1=st3.id1 and st3.id1=st5.id1 and st5.id1=st7.id1 and st7.id1=st9.id1 and st0.id1=st1.id1 and st0.id1=sta.id1 and st0.ts=sta.ts; - - - - - - system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/general/parser/testSuite.sim b/tests/script/general/parser/testSuite.sim index 6265fc3a02..bb220049af 100644 --- a/tests/script/general/parser/testSuite.sim +++ b/tests/script/general/parser/testSuite.sim @@ -39,6 +39,7 @@ run general/parser/slimit1.sim run general/parser/slimit_alter_tags.sim run general/parser/tbnameIn.sim run general/parser/join.sim +#run general/parser/join_multitables.sim run general/parser/join_multivnode.sim run general/parser/join_manyblocks.sim run general/parser/projection_limit_offset.sim From 678cc06fbb968494d7e7c09cb21a422f1ad90759 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 6 Jun 2021 16:09:58 +0800 Subject: [PATCH 06/11] [td-4546]: fix bug in taos_fetch_block while nest query existed. --- src/client/src/tscUtil.c | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 46d49bf68d..280d2aa630 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -3463,13 +3463,12 @@ static void tscSubqueryRetrieveCallback(void* param, TAOS_RES* tres, int code) { } pParentSql->cmd.active = pParentSql->cmd.pQueryInfo; - - SSchedMsg schedMsg = {0}; - schedMsg.fp = doRetrieveSubqueryData; - schedMsg.ahandle = (void *)pParentSql; - schedMsg.thandle = (void *)1; - schedMsg.msg = 0; - taosScheduleTask(tscQhandle, &schedMsg); + pParentSql->res.qId = -1; + if (pSql->res.code == TSDB_CODE_SUCCESS) { + (*pSql->fp)(pParentSql->param, pParentSql, pParentSql->res.numOfRows); + } else { + tscAsyncResultOnError(pParentSql); + } } // todo handle the failure From 9b073353e62d0c1c49a93ba0bba41e4a451f936a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 7 Jun 2021 12:08:40 +0800 Subject: [PATCH 07/11] [td-225] fix bug found by regression test. --- src/client/src/tscSQLParser.c | 2 +- src/query/src/qAggMain.c | 138 +++------------------ tests/script/general/parser/last_cache.sim | 2 +- 3 files changed, 21 insertions(+), 121 deletions(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index d48f193e07..8f915346f1 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -2161,7 +2161,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col case TSDB_FUNC_STDDEV: case TSDB_FUNC_LEASTSQR: { // 1. valid the number of parameters - int32_t numOfParams = taosArrayGetSize(pItem->pNode->pParam); + int32_t numOfParams = (pItem->pNode->pParam == NULL)? 0: taosArrayGetSize(pItem->pNode->pParam); if (pItem->pNode->pParam == NULL || (functionId != TSDB_FUNC_LEASTSQR && functionId != TSDB_FUNC_DERIVATIVE && numOfParams != 1) || ((functionId == TSDB_FUNC_LEASTSQR || functionId == TSDB_FUNC_DERIVATIVE) && numOfParams != 3)) { diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index dc1829930c..baef9a57a4 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -234,7 +234,6 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI return TSDB_CODE_SUCCESS; } - //TODO twa function definit error. if (isSuperTable) { if (functionId == TSDB_FUNC_MIN || functionId == TSDB_FUNC_MAX) { *type = TSDB_DATA_TYPE_BINARY; @@ -3420,18 +3419,6 @@ static bool diff_function_setup(SQLFunctionCtx *pCtx) { return false; } -#define DIFF_IMPL(ctx, d, type) \ - do { \ - if ((ctx)->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { \ - (ctx)->param[1].nType = (ctx)->inputType; \ - *(type *)&(ctx)->param[1].i64 = *(type *)(d); \ - } else { \ - *(type *)(ctx)->pOutput = *(type *)(d) - (*(type *)(&(ctx)->param[1].i64)); \ - *(type *)(&(ctx)->param[1].i64) = *(type *)(d); \ - *(int64_t *)(ctx)->ptsOutputBuf = GET_TS_DATA(ctx, index); \ - } \ - } while (0); - static bool deriv_function_setup(SQLFunctionCtx *pCtx) { if (!function_setup(pCtx)) { return false; @@ -3480,7 +3467,6 @@ static void deriv_function(SQLFunctionCtx *pCtx) { if (pDerivInfo->ignoreNegative && *pOutput < 0) { } else { *pTimestamp = tsList[i]; - pOutput += 1; pTimestamp += 1; } @@ -3508,7 +3494,6 @@ static void deriv_function(SQLFunctionCtx *pCtx) { if (pDerivInfo->ignoreNegative && *pOutput < 0) { } else { *pTimestamp = tsList[i]; - pOutput += 1; pTimestamp += 1; } @@ -3535,7 +3520,6 @@ static void deriv_function(SQLFunctionCtx *pCtx) { if (pDerivInfo->ignoreNegative && *pOutput < 0) { } else { *pTimestamp = tsList[i]; - pOutput += 1; pTimestamp += 1; } @@ -3563,7 +3547,6 @@ static void deriv_function(SQLFunctionCtx *pCtx) { if (pDerivInfo->ignoreNegative && *pOutput < 0) { } else { *pTimestamp = tsList[i]; - pOutput += 1; pTimestamp += 1; } @@ -3590,7 +3573,6 @@ static void deriv_function(SQLFunctionCtx *pCtx) { if (pDerivInfo->ignoreNegative && *pOutput < 0) { } else { *pTimestamp = tsList[i]; - pOutput += 1; pTimestamp += 1; } @@ -3646,18 +3628,6 @@ static void deriv_function(SQLFunctionCtx *pCtx) { } } -#define DIFF_IMPL(ctx, d, type) \ - do { \ - if ((ctx)->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { \ - (ctx)->param[1].nType = (ctx)->inputType; \ - *(type *)&(ctx)->param[1].i64 = *(type *)(d); \ - } else { \ - *(type *)(ctx)->pOutput = *(type *)(d) - (*(type *)(&(ctx)->param[1].i64)); \ - *(type *)(&(ctx)->param[1].i64) = *(type *)(d); \ - *(int64_t *)(ctx)->ptsOutputBuf = GET_TS_DATA(ctx, index); \ - } \ - } while (0); - #define DIFF_IMPL(ctx, d, type) \ do { \ if ((ctx)->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { \ @@ -3693,20 +3663,10 @@ static void diff_function(SQLFunctionCtx *pCtx) { continue; } - if (pCtx->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet - pCtx->param[1].i64 = pData[i]; - pCtx->param[1].nType = pCtx->inputType; - } else if ((i == 0 && pCtx->order == TSDB_ORDER_ASC) || (i == pCtx->size - 1 && pCtx->order == TSDB_ORDER_DESC)) { - *pOutput = (int32_t)(pData[i] - pCtx->param[1].i64); - *pTimestamp = tsList[i]; - - pOutput += 1; - pTimestamp += 1; - } else { + if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet *pOutput = (int32_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null *pTimestamp = tsList[i]; - - pOutput += 1; + pOutput += 1; pTimestamp += 1; } @@ -3725,20 +3685,10 @@ static void diff_function(SQLFunctionCtx *pCtx) { continue; } - if (pCtx->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet - pCtx->param[1].i64 = pData[i]; - pCtx->param[1].nType = pCtx->inputType; - } else if ((i == 0 && pCtx->order == TSDB_ORDER_ASC) || (i == pCtx->size - 1 && pCtx->order == TSDB_ORDER_DESC)) { - *pOutput = pData[i] - pCtx->param[1].i64; + if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet + *pOutput = (int32_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null *pTimestamp = tsList[i]; - - pOutput += 1; - pTimestamp += 1; - } else { - *pOutput = pData[i] - pCtx->param[1].i64; - *pTimestamp = tsList[i]; - - pOutput += 1; + pOutput += 1; pTimestamp += 1; } @@ -3757,22 +3707,14 @@ static void diff_function(SQLFunctionCtx *pCtx) { continue; } - if (pCtx->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet - pCtx->param[1].dKey = pData[i]; - pCtx->param[1].nType = pCtx->inputType; - } else if ((i == 0 && pCtx->order == TSDB_ORDER_ASC) || (i == pCtx->size - 1 && pCtx->order == TSDB_ORDER_DESC)) { - *pOutput = pData[i] - pCtx->param[1].dKey; + if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet + *pOutput = (int32_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null *pTimestamp = tsList[i]; - pOutput += 1; - pTimestamp += 1; - } else { - *pOutput = pData[i] - pCtx->param[1].dKey; - *pTimestamp = tsList[i]; - pOutput += 1; + pOutput += 1; pTimestamp += 1; } - pCtx->param[1].dKey = pData[i]; + pCtx->param[1].i64 = pData[i]; pCtx->param[1].nType = pCtx->inputType; notNullElems++; } @@ -3787,25 +3729,14 @@ static void diff_function(SQLFunctionCtx *pCtx) { continue; } - if (pCtx->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet - pCtx->param[1].dKey = pData[i]; - pCtx->param[1].nType = pCtx->inputType; - } else if ((i == 0 && pCtx->order == TSDB_ORDER_ASC) || (i == pCtx->size - 1 && pCtx->order == TSDB_ORDER_DESC)) { - *pOutput = (float)(pData[i] - pCtx->param[1].dKey); + if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet + *pOutput = (int32_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null *pTimestamp = tsList[i]; - - pOutput += 1; - pTimestamp += 1; - } else { - *pOutput = (float)(pData[i] - pCtx->param[1].dKey); - *pTimestamp = tsList[i]; - - pOutput += 1; + pOutput += 1; pTimestamp += 1; } - // keep the last value, the remain may be all null - pCtx->param[1].dKey = pData[i]; + pCtx->param[1].i64 = pData[i]; pCtx->param[1].nType = pCtx->inputType; notNullElems++; } @@ -3820,19 +3751,10 @@ static void diff_function(SQLFunctionCtx *pCtx) { continue; } - if (pCtx->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet - pCtx->param[1].i64 = pData[i]; - pCtx->param[1].nType = pCtx->inputType; - } else if ((i == 0 && pCtx->order == TSDB_ORDER_ASC) || (i == pCtx->size - 1 && pCtx->order == TSDB_ORDER_DESC)) { - *pOutput = (int16_t)(pData[i] - pCtx->param[1].i64); + if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet + *pOutput = (int32_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null *pTimestamp = tsList[i]; - pOutput += 1; - pTimestamp += 1; - } else { - *pOutput = (int16_t)(pData[i] - pCtx->param[1].i64); - *pTimestamp = tsList[i]; - - pOutput += 1; + pOutput += 1; pTimestamp += 1; } @@ -3851,20 +3773,10 @@ static void diff_function(SQLFunctionCtx *pCtx) { continue; } - if (pCtx->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet - pCtx->param[1].i64 = pData[i]; - pCtx->param[1].nType = pCtx->inputType; - } else if ((i == 0 && pCtx->order == TSDB_ORDER_ASC) || (i == pCtx->size - 1 && pCtx->order == TSDB_ORDER_DESC)) { - *pOutput = (int8_t)(pData[i] - pCtx->param[1].i64); + if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet + *pOutput = (int32_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null *pTimestamp = tsList[i]; - - pOutput += 1; - pTimestamp += 1; - } else { - *pOutput = (int8_t)(pData[i] - pCtx->param[1].i64); - *pTimestamp = tsList[i]; - - pOutput += 1; + pOutput += 1; pTimestamp += 1; } @@ -3892,18 +3804,6 @@ static void diff_function(SQLFunctionCtx *pCtx) { } } -#define DIFF_IMPL(ctx, d, type) \ - do { \ - if ((ctx)->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { \ - (ctx)->param[1].nType = (ctx)->inputType; \ - *(type *)&(ctx)->param[1].i64 = *(type *)(d); \ - } else { \ - *(type *)(ctx)->pOutput = *(type *)(d) - (*(type *)(&(ctx)->param[1].i64)); \ - *(type *)(&(ctx)->param[1].i64) = *(type *)(d); \ - *(int64_t *)(ctx)->ptsOutputBuf = GET_TS_DATA(ctx, index); \ - } \ - } while (0); - static void diff_function_f(SQLFunctionCtx *pCtx, int32_t index) { char *pData = GET_INPUT_DATA(pCtx, index); if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { diff --git a/tests/script/general/parser/last_cache.sim b/tests/script/general/parser/last_cache.sim index 9d7da9ddba..4b3285871b 100644 --- a/tests/script/general/parser/last_cache.sim +++ b/tests/script/general/parser/last_cache.sim @@ -9,7 +9,7 @@ sql connect print ======================== dnode1 start $db = testdb - +sql drop database if exists $db sql create database $db cachelast 2 sql use $db From 0c1d5f4a4c20ffbc669a2ee30374b8cbc8e7acec Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 7 Jun 2021 13:10:59 +0800 Subject: [PATCH 08/11] [td-225] fix compiler error. --- src/query/src/qAggMain.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index baef9a57a4..ced83d539b 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -3499,7 +3499,7 @@ static void deriv_function(SQLFunctionCtx *pCtx) { } } - pDerivInfo->prevValue = pData[i]; + pDerivInfo->prevValue = (double) pData[i]; pDerivInfo->prevTs = tsList[i]; notNullElems++; } @@ -3686,7 +3686,7 @@ static void diff_function(SQLFunctionCtx *pCtx) { } if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet - *pOutput = (int32_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null + *pOutput = pData[i] - pCtx->param[1].i64; // direct previous may be null *pTimestamp = tsList[i]; pOutput += 1; pTimestamp += 1; @@ -3708,13 +3708,13 @@ static void diff_function(SQLFunctionCtx *pCtx) { } if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet - *pOutput = (int32_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null + *pOutput = pData[i] - pCtx->param[1].d64; // direct previous may be null *pTimestamp = tsList[i]; pOutput += 1; pTimestamp += 1; } - pCtx->param[1].i64 = pData[i]; + pCtx->param[1].d64 = pData[i]; pCtx->param[1].nType = pCtx->inputType; notNullElems++; } @@ -3730,13 +3730,13 @@ static void diff_function(SQLFunctionCtx *pCtx) { } if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet - *pOutput = (int32_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null + *pOutput = pData[i] - pCtx->param[1].d64; // direct previous may be null *pTimestamp = tsList[i]; pOutput += 1; pTimestamp += 1; } - pCtx->param[1].i64 = pData[i]; + pCtx->param[1].d64 = pData[i]; pCtx->param[1].nType = pCtx->inputType; notNullElems++; } @@ -3752,7 +3752,7 @@ static void diff_function(SQLFunctionCtx *pCtx) { } if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet - *pOutput = (int32_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null + *pOutput = (int16_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null *pTimestamp = tsList[i]; pOutput += 1; pTimestamp += 1; @@ -3774,7 +3774,7 @@ static void diff_function(SQLFunctionCtx *pCtx) { } if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet - *pOutput = (int32_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null + *pOutput = (int8_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null *pTimestamp = tsList[i]; pOutput += 1; pTimestamp += 1; From db8defa8da74d56ebc25d211800c5424f8ad154c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 7 Jun 2021 14:18:40 +0800 Subject: [PATCH 09/11] [td-225] fix compiler error. --- src/query/src/qAggMain.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index ced83d539b..cdc1a67c06 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -3708,13 +3708,13 @@ static void diff_function(SQLFunctionCtx *pCtx) { } if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet - *pOutput = pData[i] - pCtx->param[1].d64; // direct previous may be null + *pOutput = pData[i] - pCtx->param[1].dKey; // direct previous may be null *pTimestamp = tsList[i]; pOutput += 1; pTimestamp += 1; } - pCtx->param[1].d64 = pData[i]; + pCtx->param[1].dKey = pData[i]; pCtx->param[1].nType = pCtx->inputType; notNullElems++; } @@ -3730,13 +3730,13 @@ static void diff_function(SQLFunctionCtx *pCtx) { } if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet - *pOutput = pData[i] - pCtx->param[1].d64; // direct previous may be null + *pOutput = pData[i] - pCtx->param[1].dKey; // direct previous may be null *pTimestamp = tsList[i]; pOutput += 1; pTimestamp += 1; } - pCtx->param[1].d64 = pData[i]; + pCtx->param[1].dKey = pData[i]; pCtx->param[1].nType = pCtx->inputType; notNullElems++; } From 76638283b130e95be2f5dedde143903286c29a2b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 7 Jun 2021 14:28:04 +0800 Subject: [PATCH 10/11] [td-225] fix compiler error. --- src/query/src/qAggMain.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index cdc1a67c06..0cb895709c 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -3730,7 +3730,7 @@ static void diff_function(SQLFunctionCtx *pCtx) { } if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet - *pOutput = pData[i] - pCtx->param[1].dKey; // direct previous may be null + *pOutput = (float)(pData[i] - pCtx->param[1].dKey); // direct previous may be null *pTimestamp = tsList[i]; pOutput += 1; pTimestamp += 1; From e275dd32cd94bc6b6011811efdf50e182350659a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 7 Jun 2021 15:28:57 +0800 Subject: [PATCH 11/11] [td-225] fix compiler error. --- src/client/src/tscSQLParser.c | 2 +- src/query/src/qAggMain.c | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 8f915346f1..15a76daab7 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -2161,7 +2161,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col case TSDB_FUNC_STDDEV: case TSDB_FUNC_LEASTSQR: { // 1. valid the number of parameters - int32_t numOfParams = (pItem->pNode->pParam == NULL)? 0: taosArrayGetSize(pItem->pNode->pParam); + int32_t numOfParams = (pItem->pNode->pParam == NULL)? 0: (int32_t) taosArrayGetSize(pItem->pNode->pParam); if (pItem->pNode->pParam == NULL || (functionId != TSDB_FUNC_LEASTSQR && functionId != TSDB_FUNC_DERIVATIVE && numOfParams != 1) || ((functionId == TSDB_FUNC_LEASTSQR || functionId == TSDB_FUNC_DERIVATIVE) && numOfParams != 3)) { diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index 0cb895709c..62be49c3df 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -3764,6 +3764,7 @@ static void diff_function(SQLFunctionCtx *pCtx) { } break; } + case TSDB_DATA_TYPE_TINYINT: { int8_t *pData = (int8_t *)data; int8_t *pOutput = (int8_t *)pCtx->pOutput;