diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index daab5d1b64..76b62db3f1 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -154,7 +154,7 @@ SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functi int32_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo); SSqlExpr* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index); -void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t uid); +void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t uid, bool deepcopy); void* tscSqlExprDestroy(SSqlExpr* pExpr); void tscSqlExprInfoDestroy(SSqlExprInfo* pExprInfo); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 4893835d56..3e4da10aec 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -96,9 +96,9 @@ typedef struct SFieldInfo { } SFieldInfo; typedef struct SSqlExprInfo { - int16_t numOfAlloc; - int16_t numOfExprs; - SSqlExpr *pExprs; + int16_t numOfAlloc; + int16_t numOfExprs; + SSqlExpr** pExprs; } SSqlExprInfo; typedef struct SColumnBase { @@ -283,12 +283,11 @@ typedef struct { int64_t useconds; int64_t offset; // offset value from vnode during projection query of stable int row; - int16_t numOfnchar; + int16_t numOfCols; int16_t precision; int32_t numOfGroups; SResRec * pGroupRec; char * data; - short * bytes; void ** tsrow; char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t) SColumnIndex *pColumnIndex; @@ -406,8 +405,6 @@ int taos_retrieve(TAOS_RES *res); int32_t tscTansformSQLFunctionForSTableQuery(SQueryInfo *pQueryInfo); void tscRestoreSQLFunctionForMetricQuery(SQueryInfo *pQueryInfo); -void tscClearSqlMetaInfoForce(SSqlCmd *pCmd); - int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo); void tscDestroyResPointerInfo(SSqlRes *pRes); diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index a0ff10220d..c6d4b8ff23 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -300,6 +300,10 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI return TSDB_CODE_SUCCESS; } +bool stableQueryFunctChanged(int32_t funcId) { + return (aAggs[funcId].stableFuncId != funcId); +} + /** * the numOfRes should be kept, since it may be used later * and allow the ResultInfo to be re initialized @@ -3558,6 +3562,7 @@ void spread_function_finalizer(SQLFunctionCtx *pCtx) { } GET_RES_INFO(pCtx)->numOfRes = 1; // todo add test case + doFinalizer(pCtx); } /* diff --git a/src/client/src/tscJoinProcess.c b/src/client/src/tscJoinProcess.c index a17e090b06..afd8e98eda 100644 --- a/src/client/src/tscJoinProcess.c +++ b/src/client/src/tscJoinProcess.c @@ -275,6 +275,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { pSubQueryInfo->tsBuf = NULL; // free result for async object will also free sqlObj + assert(pSubQueryInfo->exprsInfo.numOfExprs == 1); // ts_comp query only requires one resutl columns taos_free_result(pPrevSub); SSqlObj *pNew = createSubqueryObj(pSql, (int16_t) i, tscJoinQueryCallback, pSupporter, NULL); @@ -299,18 +300,20 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { tscColumnBaseInfoCopy(&pQueryInfo->colList, &pSupporter->colList, 0); tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond); - tscSqlExprCopy(&pQueryInfo->exprsInfo, &pSupporter->exprsInfo, pSupporter->uid); + tscSqlExprCopy(&pQueryInfo->exprsInfo, &pSupporter->exprsInfo, pSupporter->uid, false); tscFieldInfoCopyAll(&pQueryInfo->fieldsInfo, &pSupporter->fieldsInfo); - + + pSupporter->exprsInfo.numOfExprs = 0; + pSupporter->fieldsInfo.numOfOutputCols = 0; + /* * if the first column of the secondary query is not ts function, add this function. * Because this column is required to filter with timestamp after intersecting. */ - if (pSupporter->exprsInfo.pExprs[0].functionId != TSDB_FUNC_TS) { + if (pSupporter->exprsInfo.pExprs[0]->functionId != TSDB_FUNC_TS) { tscAddTimestampColumn(pQueryInfo, TSDB_FUNC_TS, 0); } - // todo refactor function name SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); assert(pNew->numOfSubs == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1); diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index 402838bb68..7208dc4d72 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -251,6 +251,13 @@ static int32_t tscBuildMeterSchemaResultFields(SSqlObj *pSql, int32_t numOfCols, tscFieldInfoSetValue(&pQueryInfo->fieldsInfo, 3, TSDB_DATA_TYPE_BINARY, "Note", noteColLength); rowLen += noteColLength; + + //set the sqlexpr part + SColumnIndex index = {0}; + pQueryInfo->fieldsInfo.pSqlExpr[0] = tscSqlExprInsert(pQueryInfo, 0, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, TSDB_COL_NAME_LEN, TSDB_COL_NAME_LEN); + pQueryInfo->fieldsInfo.pSqlExpr[1] = tscSqlExprInsert(pQueryInfo, 1, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, typeColLength, typeColLength); + pQueryInfo->fieldsInfo.pSqlExpr[2] = tscSqlExprInsert(pQueryInfo, 2, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_INT, sizeof(int32_t), sizeof(int32_t)); + pQueryInfo->fieldsInfo.pSqlExpr[3] = tscSqlExprInsert(pQueryInfo, 3, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, noteColLength, noteColLength); return rowLen; } @@ -455,6 +462,8 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa tscInitResObjForLocalQuery(pSql, 1, valueLength); TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, 0); + pQueryInfo->fieldsInfo.pSqlExpr[0] = pQueryInfo->exprsInfo.pExprs[0]; + strncpy(pRes->data, val, pField->bytes); } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index b35019f3f3..acb2aaba6b 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -16,7 +16,6 @@ #define _XOPEN_SOURCE #define _DEFAULT_SOURCE -#include #include "os.h" #include "taos.h" #include "taosmsg.h" @@ -28,6 +27,7 @@ #include "tscUtil.h" #include "tschemautil.h" #include "tsclient.h" +#include "tast.h" #define DEFAULT_PRIMARY_TIMESTAMP_COL_NAME "_c0" @@ -130,7 +130,7 @@ static int32_t tscQueryOnlyMetricTags(SQueryInfo* pQueryInfo, bool* queryOnMetri assert(QUERY_IS_STABLE_QUERY(pQueryInfo->type)); *queryOnMetricTags = true; - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); if (pExpr->functionId != TSDB_FUNC_TAGPRJ && @@ -570,7 +570,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { * are available. */ static bool isTopBottomQuery(SQueryInfo* pQueryInfo) { - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) { int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId; if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { @@ -623,7 +623,7 @@ int32_t parseIntervalClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) { * check invalid SQL: * select count(tbname)/count(tag1)/count(tag2) from super_table_name interval(1d); */ - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); if (pExpr->functionId == TSDB_FUNC_COUNT && TSDB_COL_IS_TAG(pExpr->colInfo.flag)) { return invalidSqlErrMsg(pQueryInfo->msg, msg1); @@ -1330,7 +1330,8 @@ static void addProjectQueryCol(SQueryInfo* pQueryInfo, int32_t startPos, SColumn SSchema* pSchema = tsGetColumnSchema(pMeterMeta, pIndex->columnIndex); char* colName = (pItem->aliasName == NULL) ? pSchema->name : pItem->aliasName; - + strncpy(pExpr->aliasName, colName, tListLen(pExpr->aliasName)); + SColumnList ids = {0}; ids.num = 1; ids.ids[0] = *pIndex; @@ -1339,7 +1340,7 @@ static void addProjectQueryCol(SQueryInfo* pQueryInfo, int32_t startPos, SColumn ids.num = 0; } - insertResultField(pQueryInfo, startPos, &ids, pExpr->resBytes, pExpr->resType, colName, pExpr); + insertResultField(pQueryInfo, startPos, &ids, pExpr->resBytes, pExpr->resType, pExpr->aliasName, pExpr); } void tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId, @@ -1375,6 +1376,7 @@ static int32_t doAddProjectionExprAndResultFields(SQueryInfo* pQueryInfo, SColum for (int32_t j = 0; j < numOfTotalColumns; ++j) { SSqlExpr* pExpr = doAddProjectCol(pQueryInfo, startPos + j, j, pIndex->tableIndex); + strncpy(pExpr->aliasName, pSchema[j].name, tListLen(pExpr->aliasName)); pIndex->columnIndex = j; SColumnList ids = {0}; @@ -1393,7 +1395,7 @@ int32_t addProjectionExprAndResultField(SQueryInfo* pQueryInfo, tSQLExprItem* pI const char* msg0 = "invalid column name"; const char* msg1 = "tag for table query is not allowed"; - int32_t startPos = pQueryInfo->fieldsInfo.numOfOutputCols; + int32_t startPos = pQueryInfo->exprsInfo.numOfExprs; if (pItem->pNode->nSQLOptr == TK_ALL) { // project on all fields SColumnIndex index = COLUMN_INDEX_INITIALIZER; @@ -1471,7 +1473,8 @@ static int32_t setExprInfoForFunctions(SQueryInfo* pQueryInfo, SSchema* pSchema, } SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, resColIdx, functionID, pColIndex, type, bytes, bytes); - + strncpy(pExpr->aliasName, columnName, tListLen(pExpr->aliasName)); + // for point interpolation/last_row query, we need the timestamp column to be loaded SColumnIndex index = {.tableIndex = pColIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; if (functionID == TSDB_FUNC_INTERP || functionID == TSDB_FUNC_LAST_ROW) { @@ -2222,8 +2225,6 @@ int32_t tscTansformSQLFunctionForSTableQuery(SQueryInfo* pQueryInfo) { for (int32_t k = 0; k < pQueryInfo->exprsInfo.numOfExprs; ++k) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, k); -// TAOS_FIELD* pField = tscFieldInfoGetField(pQueryInfo, k); - int16_t functionId = aAggs[pExpr->functionId].stableFuncId; int32_t colIndex = pExpr->colInfo.colIdx; @@ -2257,14 +2258,27 @@ void tscRestoreSQLFunctionForMetricQuery(SQueryInfo* pQueryInfo) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); SSchema* pSchema = tsGetColumnSchema(pMeterMetaInfo->pMeterMeta, pExpr->colInfo.colIdx); - if ((pExpr->functionId >= TSDB_FUNC_FIRST_DST && pExpr->functionId <= TSDB_FUNC_LAST_DST) || - (pExpr->functionId >= TSDB_FUNC_SUM && pExpr->functionId <= TSDB_FUNC_MAX)) { +// if (/*(pExpr->functionId >= TSDB_FUNC_FIRST_DST && pExpr->functionId <= TSDB_FUNC_LAST_DST) || +// (pExpr->functionId >= TSDB_FUNC_SUM && pExpr->functionId <= TSDB_FUNC_MAX) || +// pExpr->functionId == TSDB_FUNC_LAST_ROW*/) { // the final result size and type in the same as query on single table. // so here, set the flag to be false; int16_t inter = 0; - getResultDataInfo(pSchema->type, pSchema->bytes, pExpr->functionId, 0, &pExpr->resType, &pExpr->resBytes, + + int32_t functionId = pExpr->functionId; + if (functionId >= TSDB_FUNC_TS && functionId <= TSDB_FUNC_DIFF) { + continue; + } + + if (functionId == TSDB_FUNC_FIRST_DST) { + functionId = TSDB_FUNC_FIRST; + } else if (functionId == TSDB_FUNC_LAST_DST) { + functionId = TSDB_FUNC_LAST; + } + + getResultDataInfo(pSchema->type, pSchema->bytes, functionId, 0, &pExpr->resType, &pExpr->resBytes, &inter, 0, false); - } +// } } } @@ -2274,7 +2288,7 @@ bool hasUnsupportFunctionsForSTableQuery(SQueryInfo* pQueryInfo) { const char* msg3 = "function not support for super table query"; // filter sql function not supported by metric query yet. - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) { int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId; if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_METRIC) == 0) { invalidSqlErrMsg(pQueryInfo->msg, msg3); @@ -2311,7 +2325,7 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo) { // diff function cannot be executed with other function // arithmetic function can be executed with other arithmetic functions - for (int32_t i = startIdx + 1; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { + for (int32_t i = startIdx + 1; i < pQueryInfo->exprsInfo.numOfExprs; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); int16_t functionId = pExpr->functionId; @@ -3852,6 +3866,7 @@ int32_t getTimeRange(int64_t* stime, int64_t* etime, tSQLExpr* pRight, int32_t o return TSDB_CODE_SUCCESS; } +// todo error !!!! int32_t tsRewriteFieldNameIfNecessary(SQueryInfo* pQueryInfo) { const char rep[] = {'(', ')', '*', ',', '.', '/', '\\', '+', '-', '%', ' '}; @@ -3898,7 +3913,7 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) { } if (pQueryInfo->defaultVal == NULL) { - pQueryInfo->defaultVal = calloc(pQueryInfo->fieldsInfo.numOfOutputCols, sizeof(int64_t)); + pQueryInfo->defaultVal = calloc(pQueryInfo->exprsInfo.numOfExprs, sizeof(int64_t)); if (pQueryInfo->defaultVal == NULL) { return TSDB_CODE_CLI_OUT_OF_MEMORY; } @@ -3908,7 +3923,7 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) { pQueryInfo->interpoType = TSDB_INTERPO_NONE; } else if (strncasecmp(pItem->pVar.pz, "null", 4) == 0 && pItem->pVar.nLen == 4) { pQueryInfo->interpoType = TSDB_INTERPO_NULL; - for (int32_t i = START_INTERPO_COL_IDX; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { + for (int32_t i = START_INTERPO_COL_IDX; i < pQueryInfo->exprsInfo.numOfExprs; ++i) { TAOS_FIELD* pFields = tscFieldInfoGetField(pQueryInfo, i); setNull((char*)&pQueryInfo->defaultVal[i], pFields->type, pFields->bytes); } @@ -3930,12 +3945,12 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) { if (tscIsPointInterpQuery(pQueryInfo)) { startPos = 0; - if (numOfFillVal > pQueryInfo->fieldsInfo.numOfOutputCols) { - numOfFillVal = pQueryInfo->fieldsInfo.numOfOutputCols; + if (numOfFillVal > pQueryInfo->exprsInfo.numOfExprs) { + numOfFillVal = pQueryInfo->exprsInfo.numOfExprs; } } else { - numOfFillVal = (pFillToken->nExpr > pQueryInfo->fieldsInfo.numOfOutputCols) - ? pQueryInfo->fieldsInfo.numOfOutputCols + numOfFillVal = (pFillToken->nExpr > pQueryInfo->exprsInfo.numOfExprs) + ? pQueryInfo->exprsInfo.numOfExprs : pFillToken->nExpr; } @@ -3955,11 +3970,11 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) { } } - if ((pFillToken->nExpr < pQueryInfo->fieldsInfo.numOfOutputCols) || - ((pFillToken->nExpr - 1 < pQueryInfo->fieldsInfo.numOfOutputCols) && (tscIsPointInterpQuery(pQueryInfo)))) { + if ((pFillToken->nExpr < pQueryInfo->exprsInfo.numOfExprs) || + ((pFillToken->nExpr - 1 < pQueryInfo->exprsInfo.numOfExprs) && (tscIsPointInterpQuery(pQueryInfo)))) { tVariantListItem* lastItem = &pFillToken->a[pFillToken->nExpr - 1]; - for (int32_t i = numOfFillVal; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { + for (int32_t i = numOfFillVal; i < pQueryInfo->exprsInfo.numOfExprs; ++i) { TAOS_FIELD* pFields = tscFieldInfoGetField(pQueryInfo, i); if (pFields->type == TSDB_DATA_TYPE_BINARY || pFields->type == TSDB_DATA_TYPE_NCHAR) { @@ -4363,7 +4378,7 @@ int32_t validateSqlFunctionInStreamSql(SQueryInfo* pQueryInfo) { return invalidSqlErrMsg(pQueryInfo->msg, msg0); } - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) { int32_t functId = tscSqlExprGet(pQueryInfo, i)->functionId; if (!IS_STREAM_QUERY_VALID(aAggs[functId].nStatus)) { return invalidSqlErrMsg(pQueryInfo->msg, msg1); @@ -4378,13 +4393,13 @@ int32_t validateFunctionsInIntervalOrGroupbyQuery(SQueryInfo* pQueryInfo) { const char* msg1 = "column projection is not compatible with interval"; // multi-output set/ todo refactor - for (int32_t k = 0; k < pQueryInfo->fieldsInfo.numOfOutputCols; ++k) { + for (int32_t k = 0; k < pQueryInfo->exprsInfo.numOfExprs; ++k) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, k); // projection query on primary timestamp, the selectivity function needs to be present. if (pExpr->functionId == TSDB_FUNC_PRJ && pExpr->colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { bool hasSelectivity = false; - for (int32_t j = 0; j < pQueryInfo->fieldsInfo.numOfOutputCols; ++j) { + for (int32_t j = 0; j < pQueryInfo->exprsInfo.numOfExprs; ++j) { SSqlExpr* pEx = tscSqlExprGet(pQueryInfo, j); if ((aAggs[pEx->functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) == TSDB_FUNCSTATE_SELECTIVITY) { hasSelectivity = true; @@ -4645,7 +4660,7 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL* } // filter the query functions operating on "tbname" column that are not supported by normal columns. - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); if (pExpr->colInfo.colIdx == TSDB_TBNAME_COLUMN_INDEX) { return invalidSqlErrMsg(pQueryInfo->msg, msg2); @@ -4783,13 +4798,13 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClau int16_t bytes = pSchema[index.columnIndex].bytes; char* name = pSchema[index.columnIndex].name; - pExpr = tscSqlExprInsert(pQueryInfo, pQueryInfo->fieldsInfo.numOfOutputCols, TSDB_FUNC_TAG, &index, type, bytes, + pExpr = tscSqlExprInsert(pQueryInfo, pQueryInfo->exprsInfo.numOfExprs, TSDB_FUNC_TAG, &index, type, bytes, bytes); pExpr->colInfo.flag = TSDB_COL_TAG; // NOTE: tag column does not add to source column list SColumnList ids = {0}; - insertResultField(pQueryInfo, pQueryInfo->fieldsInfo.numOfOutputCols, &ids, bytes, type, name, pExpr); + insertResultField(pQueryInfo, pQueryInfo->exprsInfo.numOfExprs, &ids, bytes, type, name, pExpr); int32_t relIndex = index.columnIndex; @@ -4816,7 +4831,7 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex) { SSchema* pSchema = tsGetColumnSchema(pMeterMetaInfo->pMeterMeta, index); SColumnIndex colIndex = {.tableIndex = 0, .columnIndex = index}; - SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, pQueryInfo->fieldsInfo.numOfOutputCols, TSDB_FUNC_PRJ, &colIndex, + SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, pQueryInfo->exprsInfo.numOfExprs, TSDB_FUNC_PRJ, &colIndex, pSchema->type, pSchema->bytes, pSchema->bytes); pExpr->colInfo.flag = TSDB_COL_NORMAL; @@ -4827,14 +4842,14 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex) { list.num = 1; list.ids[0] = colIndex; - insertResultField(pQueryInfo, pQueryInfo->fieldsInfo.numOfOutputCols, &list, pSchema->bytes, pSchema->type, + insertResultField(pQueryInfo, pQueryInfo->exprsInfo.numOfExprs - 1, &list, pSchema->bytes, pSchema->type, pSchema->name, pExpr); - tscFieldInfoUpdateVisible(&pQueryInfo->fieldsInfo, pQueryInfo->fieldsInfo.numOfOutputCols - 1, false); + tscFieldInfoUpdateVisible(&pQueryInfo->fieldsInfo, pQueryInfo->exprsInfo.numOfExprs - 1, false); } static void doUpdateSqlFunctionForTagPrj(SQueryInfo* pQueryInfo) { int32_t tagLength = 0; - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); if (pExpr->functionId == TSDB_FUNC_TAGPRJ || pExpr->functionId == TSDB_FUNC_TAG) { pExpr->functionId = TSDB_FUNC_TAG_DUMMY; @@ -4848,7 +4863,7 @@ static void doUpdateSqlFunctionForTagPrj(SQueryInfo* pQueryInfo) { SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); SSchema* pSchema = tsGetSchema(pMeterMetaInfo->pMeterMeta); - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); if (pExpr->functionId != TSDB_FUNC_TAG_DUMMY && pExpr->functionId != TSDB_FUNC_TS_DUMMY) { SSchema* pColSchema = &pSchema[pExpr->colInfo.colIdx]; @@ -4859,7 +4874,7 @@ static void doUpdateSqlFunctionForTagPrj(SQueryInfo* pQueryInfo) { } static void doUpdateSqlFunctionForColPrj(SQueryInfo* pQueryInfo) { - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); if (pExpr->functionId == TSDB_FUNC_PRJ) { bool qualifiedCol = false; @@ -4891,7 +4906,7 @@ static bool onlyTagPrjFunction(SQueryInfo* pQueryInfo) { bool hasTagPrj = false; bool hasColumnPrj = false; - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); if (pExpr->functionId == TSDB_FUNC_PRJ) { hasColumnPrj = true; @@ -4907,7 +4922,7 @@ static bool onlyTagPrjFunction(SQueryInfo* pQueryInfo) { static bool allTagPrjInGroupby(SQueryInfo* pQueryInfo) { bool allInGroupby = true; - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); if (pExpr->functionId != TSDB_FUNC_TAGPRJ) { continue; @@ -4924,7 +4939,7 @@ static bool allTagPrjInGroupby(SQueryInfo* pQueryInfo) { } static void updateTagPrjFunction(SQueryInfo* pQueryInfo) { - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); if (pExpr->functionId == TSDB_FUNC_TAGPRJ) { pExpr->functionId = TSDB_FUNC_TAG; @@ -4946,7 +4961,7 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo) { int16_t numOfSelectivity = 0; int16_t numOfAggregation = 0; - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); if (pExpr->functionId == TSDB_FUNC_TAGPRJ || (pExpr->functionId == TSDB_FUNC_PRJ && pExpr->colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX)) { @@ -4955,7 +4970,7 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo) { } } - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) { int16_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId; if (functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_ARITHM) { @@ -4987,7 +5002,7 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo) { * If more than one selectivity functions exist, all the selectivity functions must be last_row. * Otherwise, return with error code. */ - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) { int16_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId; if (functionId == TSDB_FUNC_TAGPRJ) { continue; @@ -5048,14 +5063,14 @@ static int32_t doAddGroupbyColumnsOnDemand(SQueryInfo* pQueryInfo) { if (TSDB_COL_IS_TAG(pColIndex->flag)) { SColumnIndex index = {.tableIndex = pQueryInfo->groupbyExpr.tableIndex, .columnIndex = colIndex}; - SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, pQueryInfo->fieldsInfo.numOfOutputCols, TSDB_FUNC_TAG, &index, + SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, pQueryInfo->exprsInfo.numOfExprs, TSDB_FUNC_TAG, &index, type, bytes, bytes); pExpr->colInfo.flag = TSDB_COL_TAG; // NOTE: tag column does not add to source column list SColumnList ids = {0}; - insertResultField(pQueryInfo, pQueryInfo->fieldsInfo.numOfOutputCols, &ids, bytes, type, name, pExpr); + insertResultField(pQueryInfo, pQueryInfo->exprsInfo.numOfExprs-1, &ids, bytes, type, name, pExpr); } else { // if this query is "group by" normal column, interval is not allowed if (pQueryInfo->intervalTime > 0) { @@ -5063,7 +5078,7 @@ static int32_t doAddGroupbyColumnsOnDemand(SQueryInfo* pQueryInfo) { } bool hasGroupColumn = false; - for (int32_t j = 0; j < pQueryInfo->fieldsInfo.numOfOutputCols; ++j) { + for (int32_t j = 0; j < pQueryInfo->exprsInfo.numOfExprs; ++j) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, j); if (pExpr->colInfo.colId == pColIndex->colId) { break; @@ -5106,7 +5121,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { } // check all query functions in selection clause, multi-output functions are not allowed - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); int32_t functId = pExpr->functionId; @@ -5194,11 +5209,8 @@ int32_t doLocalQueryProcess(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) { } SSqlExpr* pExpr1 = tscSqlExprInsertEmpty(pQueryInfo, 0, TSDB_FUNC_TAG_DUMMY); - if (pExprList->a[0].aliasName != NULL) { - strncpy(pExpr1->aliasName, pExprList->a[0].aliasName, tListLen(pExpr1->aliasName)); - } else { - strncpy(pExpr1->aliasName, functionsInfo[index].name, tListLen(pExpr1->aliasName)); - } + const char* name = (pExprList->a[0].aliasName != NULL)? pExprList->a[0].aliasName:functionsInfo[index].name; + strncpy(pExpr1->aliasName, name, tListLen(pExpr1->aliasName)); switch (index) { case 0: @@ -5777,9 +5789,9 @@ static int32_t tSQLBinaryExprCreateFromSqlExpr(tSQLSyntaxNode **pExpr, tSQLExpr* // set the input column data byte and type. for (int32_t i = 0; i < pExprInfo->numOfExprs; ++i) { - if (strcmp((*pExpr)->pSchema->name, pExprInfo->pExprs[i].aliasName) == 0) { - (*pExpr)->pSchema->type = pExprInfo->pExprs[i].resType; - (*pExpr)->pSchema->bytes = pExprInfo->pExprs[i].resBytes; + if (strcmp((*pExpr)->pSchema->name, pExprInfo->pExprs[i]->aliasName) == 0) { + (*pExpr)->pSchema->type = pExprInfo->pExprs[i]->resType; + (*pExpr)->pSchema->bytes = pExprInfo->pExprs[i]->resBytes; break; } } diff --git a/src/client/src/tscSecondaryMerge.c b/src/client/src/tscSecondaryMerge.c index a156522cdf..51a59005f0 100644 --- a/src/client/src/tscSecondaryMerge.c +++ b/src/client/src/tscSecondaryMerge.c @@ -13,13 +13,13 @@ * along with this program. If not, see . */ +#include "tscSecondaryMerge.h" #include "os.h" #include "tlosertree.h" -#include "tscSecondaryMerge.h" #include "tscUtil.h" +#include "tschemautil.h" #include "tsclient.h" #include "tutil.h" -#include "tschemautil.h" typedef struct SCompareParam { SLocalDataSource **pLocalData; @@ -59,19 +59,20 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SSqlRes *pRes, SLocalReducer *pRedu * the fields and offset attributes in pCmd and pModel may be different due to * merge requirement. So, the final result in pRes structure is formatted in accordance with the pCmd object. */ - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) { SQLFunctionCtx *pCtx = &pReducer->pCtx[i]; - SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); - - pCtx->aOutputBuf = pReducer->pResultBuf->data + tscFieldInfoGetOffset(pQueryInfo, i) * pReducer->resColModel->capacity; + SSqlExpr * pExpr = tscSqlExprGet(pQueryInfo, i); + + pCtx->aOutputBuf = + pReducer->pResultBuf->data + tscFieldInfoGetOffset(pQueryInfo, i) * pReducer->resColModel->capacity; pCtx->order = pQueryInfo->order.order; pCtx->functionId = pExpr->functionId; // input buffer hold only one point data - int16_t offset = getColumnModelOffset(pDesc->pColumnModel, i); - SSchema* pSchema = getColumnModelSchema(pDesc->pColumnModel, i); - + int16_t offset = getColumnModelOffset(pDesc->pColumnModel, i); + SSchema *pSchema = getColumnModelSchema(pDesc->pColumnModel, i); + pCtx->aInputElemBuf = pReducer->pTempBuffer->data + offset; // input data format comes from pModel @@ -87,10 +88,8 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SSqlRes *pRes, SLocalReducer *pRedu pCtx->hasNull = true; pCtx->currentStage = SECONDARY_STAGE_MERGE; - pRes->bytes[i] = pExpr->resBytes; - // for top/bottom function, the output of timestamp is the first column - int32_t functionId = pExpr->functionId; + int32_t functionId = pExpr->functionId; if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { pCtx->ptsOutputBuf = pReducer->pCtx[0].aOutputBuf; pCtx->param[2].i64Key = pQueryInfo->order.order; @@ -106,12 +105,12 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SSqlRes *pRes, SLocalReducer *pRedu pCtx->resultInfo->superTableQ = true; } - int16_t n = 0; - int16_t tagLen = 0; - SQLFunctionCtx** pTagCtx = calloc(pQueryInfo->fieldsInfo.numOfOutputCols, POINTER_BYTES); + int16_t n = 0; + int16_t tagLen = 0; + SQLFunctionCtx **pTagCtx = calloc(pQueryInfo->fieldsInfo.numOfOutputCols, POINTER_BYTES); - SQLFunctionCtx* pCtx = NULL; - for(int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { + SQLFunctionCtx *pCtx = NULL; + for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i); if (pExpr->functionId == TSDB_FUNC_TAG_DUMMY || pExpr->functionId == TSDB_FUNC_TS_DUMMY) { tagLen += pExpr->resBytes; @@ -217,12 +216,12 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd #ifdef _DEBUG_VIEW printf("load data page into mem for build loser tree: %" PRIu64 " rows\n", pDS->filePage.numOfElems); SSrcColumnInfo colInfo[256] = {0}; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); tscGetSrcColumnInfo(colInfo, pQueryInfo); - tColModelDisplayEx(pDesc->pColumnModel, pDS->filePage.data, pDS->filePage.numOfElems, pMemBuffer[0]->numOfElemsPerPage, - colInfo); + tColModelDisplayEx(pDesc->pColumnModel, pDS->filePage.data, pDS->filePage.numOfElems, + pMemBuffer[0]->numOfElemsPerPage, colInfo); #endif if (pDS->filePage.numOfElems == 0) { // no data in this flush tscTrace("%p flush data is empty, ignore %d flush record", pSqlObjAddr, idx); @@ -243,8 +242,8 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd param->pLocalData = pReducer->pLocalDataSrc; param->pDesc = pReducer->pDesc; param->numOfElems = pReducer->pLocalDataSrc[0]->pMemBuffer->numOfElemsPerPage; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + param->groupOrderType = pQueryInfo->groupbyExpr.orderType; pRes->code = tLoserTreeCreate(&pReducer->pLoserTree, pReducer->numOfBuffer, param, treeComparator); @@ -278,7 +277,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd pReducer->nResultBufSize = pMemBuffer[0]->pageSize * 16; pReducer->pResultBuf = (tFilePage *)calloc(1, pReducer->nResultBufSize + sizeof(tFilePage)); - + int32_t finalRowLength = tscGetResRowLength(pQueryInfo); pReducer->resColModel = finalmodel; pReducer->resColModel->capacity = pReducer->nResultBufSize / finalRowLength; @@ -287,12 +286,12 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd pReducer->pFinalRes = calloc(1, pReducer->rowSize * pReducer->resColModel->capacity); pReducer->pBufForInterpo = calloc(1, pReducer->nResultBufSize); - if (pReducer->pTempBuffer == NULL|| pReducer->discardData == NULL || pReducer->pResultBuf == NULL || + if (pReducer->pTempBuffer == NULL || pReducer->discardData == NULL || pReducer->pResultBuf == NULL || pReducer->pBufForInterpo == NULL || pReducer->pFinalRes == NULL || pReducer->prevRowOfInput == NULL) { tfree(pReducer->pTempBuffer); tfree(pReducer->discardData); tfree(pReducer->pResultBuf); - tfree(pReducer->pFinalRes); + tfree(pReducer->pFinalRes); tfree(pReducer->pBufForInterpo); tfree(pReducer->prevRowOfInput); @@ -308,23 +307,24 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd // we change the capacity of schema to denote that there is only one row in temp buffer pReducer->pDesc->pColumnModel->capacity = 1; - - //restore the limitation value at the last stage + + // restore the limitation value at the last stage if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { pQueryInfo->limit.limit = pQueryInfo->clauseLimit; pQueryInfo->limit.offset = pQueryInfo->prjOffset; } - + pReducer->offset = pQueryInfo->limit.offset; - + pRes->pLocalReducer = pReducer; pRes->numOfGroups = 0; - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); - int16_t prec = pMeterMetaInfo->pMeterMeta->precision; + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); + int16_t prec = pMeterMetaInfo->pMeterMeta->precision; int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime; - int64_t revisedSTime = taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, prec); + int64_t revisedSTime = + taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, prec); SInterpolationInfo *pInterpoInfo = &pReducer->interpolationInfo; taosInitInterpoInfo(pInterpoInfo, pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols, @@ -335,7 +335,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) { pInterpoInfo->pTags[0] = (char *)pInterpoInfo->pTags + POINTER_BYTES * pQueryInfo->groupbyExpr.numOfGroupCols; for (int32_t i = 1; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) { - SSchema* pSchema = getColumnModelSchema(pReducer->resColModel, startIndex + i - 1); + SSchema *pSchema = getColumnModelSchema(pReducer->resColModel, startIndex + i - 1); pInterpoInfo->pTags[i] = pSchema->bytes + pInterpoInfo->pTags[i - 1]; } } else { @@ -387,7 +387,7 @@ int32_t tscFlushTmpBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tF int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage, void *data, int32_t numOfRows, int32_t orderType) { SColumnModel *pModel = pDesc->pColumnModel; - + if (pPage->numOfElems + numOfRows <= pModel->capacity) { tColModelAppend(pModel, pPage, data, 0, numOfRows, numOfRows); return 0; @@ -444,11 +444,11 @@ void tscDestroyLocalReducer(SSqlObj *pSql) { return; } - SSqlCmd *pCmd = &pSql->cmd; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - + SSqlCmd * pCmd = &pSql->cmd; + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + // there is no more result, so we release all allocated resource - SLocalReducer *pLocalReducer = (SLocalReducer*)atomic_exchange_ptr(&pRes->pLocalReducer, NULL); + SLocalReducer *pLocalReducer = (SLocalReducer *)atomic_exchange_ptr(&pRes->pLocalReducer, NULL); if (pLocalReducer != NULL) { int32_t status = 0; while ((status = atomic_val_compare_exchange_32(&pLocalReducer->status, TSC_LOCALREDUCE_READY, @@ -460,19 +460,18 @@ void tscDestroyLocalReducer(SSqlObj *pSql) { taosDestoryInterpoInfo(&pLocalReducer->interpolationInfo); if (pLocalReducer->pCtx != NULL) { - for(int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[i]; - + tVariantDestroy(&pCtx->tag); if (pCtx->tagInfo.pTagCtxList != NULL) { tfree(pCtx->tagInfo.pTagCtxList); } } - + tfree(pLocalReducer->pCtx); } - tfree(pLocalReducer->prevRowOfInput); tfree(pLocalReducer->pTempBuffer); @@ -513,9 +512,9 @@ void tscDestroyLocalReducer(SSqlObj *pSql) { } static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCmd, SColumnModel *pModel) { - int32_t numOfGroupByCols = 0; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - + int32_t numOfGroupByCols = 0; + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) { numOfGroupByCols = pQueryInfo->groupbyExpr.numOfGroupCols; } @@ -555,17 +554,17 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm } bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage *tmpBuffer) { - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + int16_t functionId = tscSqlExprGet(pQueryInfo, 0)->functionId; // disable merge procedure for column projection query assert(functionId != TSDB_FUNC_ARITHM); - + if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { return true; } - + if (functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_ARITHM) { return false; } @@ -598,11 +597,11 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - SSchema * pSchema = NULL; + SSchema * pSchema = NULL; SColumnModel *pModel = NULL; *pFinalModel = NULL; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); (*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * pMeterMetaInfo->pMetricMeta->numOfVnodes); @@ -633,7 +632,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr if (rlen != 0) { capacity = nBufferSizes / rlen; } - + pModel = createColumnModel(pSchema, pQueryInfo->exprsInfo.numOfExprs, capacity); for (int32_t i = 0; i < pMeterMetaInfo->pMetricMeta->numOfVnodes; ++i) { @@ -649,22 +648,32 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr // final result depends on the fields number memset(pSchema, 0, sizeof(SSchema) * pQueryInfo->exprsInfo.numOfExprs); for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) { - SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); - - SSchema* p1 = tsGetColumnSchema(pMeterMetaInfo->pMeterMeta, pExpr->colInfo.colIdx); - + SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i); + + SSchema *p1 = tsGetColumnSchema(pMeterMetaInfo->pMeterMeta, pExpr->colInfo.colIdx); + int16_t inter = 0; int16_t type = -1; int16_t bytes = 0; - - if ((pExpr->functionId >= TSDB_FUNC_FIRST_DST && pExpr->functionId <= TSDB_FUNC_LAST_DST) || - (pExpr->functionId >= TSDB_FUNC_SUM && pExpr->functionId <= TSDB_FUNC_MAX)) { - // the final result size and type in the same as query on single table. - // so here, set the flag to be false; - getResultDataInfo(p1->type, p1->bytes, pExpr->functionId, 0, &type, &bytes, &inter, 0, false); - } else { + + // if ((pExpr->functionId >= TSDB_FUNC_FIRST_DST && pExpr->functionId <= TSDB_FUNC_LAST_DST) || + // (pExpr->functionId >= TSDB_FUNC_SUM && pExpr->functionId <= TSDB_FUNC_MAX) || + // pExpr->functionId == TSDB_FUNC_LAST_ROW) { + // the final result size and type in the same as query on single table. + // so here, set the flag to be false; + + int32_t functionId = pExpr->functionId; + if (functionId >= TSDB_FUNC_TS && functionId <= TSDB_FUNC_DIFF) { type = pModel->pFields[i].field.type; bytes = pModel->pFields[i].field.bytes; + } else { + if (functionId == TSDB_FUNC_FIRST_DST) { + functionId = TSDB_FUNC_FIRST; + } else if (functionId == TSDB_FUNC_LAST_DST) { + functionId = TSDB_FUNC_LAST; + } + + getResultDataInfo(p1->type, p1->bytes, functionId, 0, &type, &bytes, &inter, 0, false); } pSchema[i].type = type; @@ -762,13 +771,15 @@ void adjustLoserTreeFromNewData(SLocalReducer *pLocalReducer, SLocalDataSource * } } -void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo* pQueryInfo, SInterpolationInfo *pInterpoInfo) { +void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo *pQueryInfo, + SInterpolationInfo *pInterpoInfo) { // discard following dataset in the same group and reset the interpolation information - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); - int16_t prec = pMeterMetaInfo->pMeterMeta->precision; + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); + int16_t prec = pMeterMetaInfo->pMeterMeta->precision; int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime; - int64_t revisedSTime = taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, prec); + int64_t revisedSTime = + taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, prec); taosInitInterpoInfo(pInterpoInfo, pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols, pLocalReducer->rowSize); @@ -781,7 +792,7 @@ void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo* } // todo merge with following function -//static void reversedCopyResultToDstBuf(SQueryInfo* pQueryInfo, SSqlRes *pRes, tFilePage *pFinalDataPage) { +// static void reversedCopyResultToDstBuf(SQueryInfo* pQueryInfo, SSqlRes *pRes, tFilePage *pFinalDataPage) { // // for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) { // TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i); @@ -798,7 +809,8 @@ void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo* // } //} -static void reversedCopyFromInterpolationToDstBuf(SQueryInfo* pQueryInfo, SSqlRes *pRes, tFilePage **pResPages, SLocalReducer *pLocalReducer) { +static void reversedCopyFromInterpolationToDstBuf(SQueryInfo *pQueryInfo, SSqlRes *pRes, tFilePage **pResPages, + SLocalReducer *pLocalReducer) { assert(0); for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) { TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i); @@ -822,11 +834,11 @@ static void reversedCopyFromInterpolationToDstBuf(SQueryInfo* pQueryInfo, SSqlRe * by "interuptHandler" function in shell */ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneOutput) { - SSqlCmd * pCmd = &pSql->cmd; - SSqlRes * pRes = &pSql->res; - tFilePage *pFinalDataPage = pLocalReducer->pResultBuf; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - + SSqlCmd * pCmd = &pSql->cmd; + SSqlRes * pRes = &pSql->res; + tFilePage * pFinalDataPage = pLocalReducer->pResultBuf; + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + if (pRes->pLocalReducer != pLocalReducer) { /* * Release the SSqlObj is called, and it is int destroying function invoked by other thread. @@ -885,7 +897,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo } int64_t *pPrimaryKeys = (int64_t *)pLocalReducer->pBufForInterpo; - + SInterpolationInfo *pInterpoInfo = &pLocalReducer->interpolationInfo; int64_t actualETime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->etime : pQueryInfo->stime; @@ -900,17 +912,18 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo int32_t *functions = (int32_t *)((char *)srcData + pQueryInfo->fieldsInfo.numOfOutputCols * sizeof(void *)); for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { - srcData[i] = pLocalReducer->pBufForInterpo + tscFieldInfoGetOffset(pQueryInfo, i) * pInterpoInfo->numOfRawDataInRows; + srcData[i] = + pLocalReducer->pBufForInterpo + tscFieldInfoGetOffset(pQueryInfo, i) * pInterpoInfo->numOfRawDataInRows; functions[i] = tscSqlExprGet(pQueryInfo, i)->functionId; } - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); - int8_t precision = pMeterMetaInfo->pMeterMeta->precision; + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); + int8_t precision = pMeterMetaInfo->pMeterMeta->precision; while (1) { int32_t remains = taosNumOfRemainPoints(pInterpoInfo); - TSKEY etime = taosGetRevisedEndKey(actualETime, pQueryInfo->order.order, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, - precision); + TSKEY etime = taosGetRevisedEndKey(actualETime, pQueryInfo->order.order, pQueryInfo->intervalTime, + pQueryInfo->intervalTimeUnit, precision); int32_t nrows = taosGetNumOfResultWithInterpo(pInterpoInfo, pPrimaryKeys, remains, pQueryInfo->intervalTime, etime, pLocalReducer->resColModel->capacity); @@ -925,7 +938,8 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo if (pQueryInfo->limit.offset > 0) { for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i); - memmove(pResPages[i]->data, pResPages[i]->data + pField->bytes * pQueryInfo->limit.offset, newRows * pField->bytes); + memmove(pResPages[i]->data, pResPages[i]->data + pField->bytes * pQueryInfo->limit.offset, + newRows * pField->bytes); } } @@ -973,10 +987,10 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo if (pQueryInfo->order.order == TSQL_SO_ASC) { for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i); - int16_t offset = getColumnModelOffset(pLocalReducer->resColModel, i); + int16_t offset = getColumnModelOffset(pLocalReducer->resColModel, i); memcpy(pRes->data + offset * pRes->numOfRows, pResPages[i]->data, pField->bytes * pRes->numOfRows); } - } else {//todo bug?? + } else { // todo bug?? reversedCopyFromInterpolationToDstBuf(pQueryInfo, pRes, pResPages, pLocalReducer); } } @@ -996,9 +1010,9 @@ static void savePreviousRow(SLocalReducer *pLocalReducer, tFilePage *tmpBuffer) // copy to previous temp buffer for (int32_t i = 0; i < pColumnModel->numOfCols; ++i) { - SSchema* pSchema = getColumnModelSchema(pColumnModel, i); - int16_t offset = getColumnModelOffset(pColumnModel, i); - + SSchema *pSchema = getColumnModelSchema(pColumnModel, i); + int16_t offset = getColumnModelOffset(pColumnModel, i); + memcpy(pLocalReducer->prevRowOfInput + offset, tmpBuffer->data + offset, pSchema->bytes); } @@ -1006,11 +1020,11 @@ static void savePreviousRow(SLocalReducer *pLocalReducer, tFilePage *tmpBuffer) pLocalReducer->hasPrevRow = true; } -static void doExecuteSecondaryMerge(SSqlCmd* pCmd, SLocalReducer *pLocalReducer, bool needInit) { +static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalReducer *pLocalReducer, bool needInit) { // the tag columns need to be set before all functions execution - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - - for(int32_t j = 0; j < pQueryInfo->exprsInfo.numOfExprs; ++j) { + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + + for (int32_t j = 0; j < pQueryInfo->exprsInfo.numOfExprs; ++j) { SSqlExpr * pExpr = tscSqlExprGet(pQueryInfo, j); SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[j]; @@ -1040,7 +1054,7 @@ static void doExecuteSecondaryMerge(SSqlCmd* pCmd, SLocalReducer *pLocalReducer, } } -static void handleUnprocessedRow(SSqlCmd* pCmd, SLocalReducer *pLocalReducer, tFilePage *tmpBuffer) { +static void handleUnprocessedRow(SSqlCmd *pCmd, SLocalReducer *pLocalReducer, tFilePage *tmpBuffer) { if (pLocalReducer->hasUnprocessedRow) { pLocalReducer->hasUnprocessedRow = false; doExecuteSecondaryMerge(pCmd, pLocalReducer, true); @@ -1050,31 +1064,31 @@ static void handleUnprocessedRow(SSqlCmd* pCmd, SLocalReducer *pLocalReducer, tF static int64_t getNumOfResultLocal(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx) { int64_t maxOutput = 0; - + for (int32_t j = 0; j < pQueryInfo->exprsInfo.numOfExprs; ++j) { -// SSqlExpr* pExpr = pQueryInfo->fieldsInfo.pSqlExpr[j]; -// if (pExpr == NULL) { -// assert(pQueryInfo->fieldsInfo.pExpr[j] != NULL); -// -// maxOutput = 1; -// continue; -// } + // SSqlExpr* pExpr = pQueryInfo->fieldsInfo.pSqlExpr[j]; + // if (pExpr == NULL) { + // assert(pQueryInfo->fieldsInfo.pExpr[j] != NULL); + // + // maxOutput = 1; + // continue; + // } /* * ts, tag, tagprj function can not decide the output number of current query * the number of output result is decided by main output */ - SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, j); - int32_t functionId = pExpr->functionId; + SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, j); + int32_t functionId = pExpr->functionId; if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ) { continue; } - + if (maxOutput < GET_RES_INFO(&pCtx[j])->numOfRes) { maxOutput = GET_RES_INFO(&pCtx[j])->numOfRes; } } - + return maxOutput; } @@ -1084,7 +1098,7 @@ static int64_t getNumOfResultLocal(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx) * filled with the same result, which is the tags, specified in group by clause * */ -static void fillMultiRowsOfTagsVal(SQueryInfo* pQueryInfo, int32_t numOfRes, SLocalReducer *pLocalReducer) { +static void fillMultiRowsOfTagsVal(SQueryInfo *pQueryInfo, int32_t numOfRes, SLocalReducer *pLocalReducer) { int32_t maxBufSize = 0; // find the max tags column length to prepare the buffer for (int32_t k = 0; k < pQueryInfo->exprsInfo.numOfExprs; ++k) { SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, k); @@ -1095,7 +1109,7 @@ static void fillMultiRowsOfTagsVal(SQueryInfo* pQueryInfo, int32_t numOfRes, SLo assert(maxBufSize >= 0); - char *buf = malloc((size_t) maxBufSize); + char *buf = malloc((size_t)maxBufSize); for (int32_t k = 0; k < pQueryInfo->exprsInfo.numOfExprs; ++k) { SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, k); if (pExpr->functionId != TSDB_FUNC_TAG) { @@ -1117,7 +1131,7 @@ static void fillMultiRowsOfTagsVal(SQueryInfo* pQueryInfo, int32_t numOfRes, SLo free(buf); } -int32_t finalizeRes(SQueryInfo* pQueryInfo, SLocalReducer *pLocalReducer) { +int32_t finalizeRes(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) { for (int32_t k = 0; k < pQueryInfo->exprsInfo.numOfExprs; ++k) { SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, k); aAggs[pExpr->functionId].xFinalize(&pLocalReducer->pCtx[k]); @@ -1139,7 +1153,7 @@ int32_t finalizeRes(SQueryInfo* pQueryInfo, SLocalReducer *pLocalReducer) { * results generated by simple aggregation function, we merge them all into one points * *Exception*: column projection query, required no merge procedure */ -bool needToMerge(SQueryInfo* pQueryInfo, SLocalReducer *pLocalReducer, tFilePage *tmpBuffer) { +bool needToMerge(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer, tFilePage *tmpBuffer) { int32_t ret = 0; // merge all result by default int16_t functionId = tscSqlExprGet(pQueryInfo, 0)->functionId; @@ -1161,7 +1175,7 @@ bool needToMerge(SQueryInfo* pQueryInfo, SLocalReducer *pLocalReducer, tFilePage return (ret == 0); } -static bool reachGroupResultLimit(SQueryInfo* pQueryInfo, SSqlRes *pRes) { +static bool reachGroupResultLimit(SQueryInfo *pQueryInfo, SSqlRes *pRes) { return (pRes->numOfGroups >= pQueryInfo->slimit.limit && pQueryInfo->slimit.limit >= 0); } @@ -1169,7 +1183,7 @@ static bool saveGroupResultInfo(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); pRes->numOfGroups += 1; // the output group is limited by the slimit clause @@ -1192,11 +1206,11 @@ static bool saveGroupResultInfo(SSqlObj *pSql) { * @return if current group is skipped, return false, and do NOT record it into pRes->numOfGroups */ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCurrentGroupRes) { - SSqlCmd * pCmd = &pSql->cmd; - SSqlRes * pRes = &pSql->res; - - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - tFilePage *pResBuf = pLocalReducer->pResultBuf; + SSqlCmd *pCmd = &pSql->cmd; + SSqlRes *pRes = &pSql->res; + + SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + tFilePage * pResBuf = pLocalReducer->pResultBuf; SColumnModel *pModel = pLocalReducer->resColModel; pRes->code = TSDB_CODE_SUCCESS; @@ -1224,11 +1238,10 @@ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool no int32_t startIndex = pQueryInfo->fieldsInfo.numOfOutputCols - pQueryInfo->groupbyExpr.numOfGroupCols; for (int32_t i = 0; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) { - int16_t offset = getColumnModelOffset(pModel, startIndex + i); - SSchema* pSchema = getColumnModelSchema(pModel, startIndex + i); - - memcpy(pInterpoInfo->pTags[i], - pLocalReducer->pBufForInterpo + offset * pResBuf->numOfElems, pSchema->bytes); + int16_t offset = getColumnModelOffset(pModel, startIndex + i); + SSchema *pSchema = getColumnModelSchema(pModel, startIndex + i); + + memcpy(pInterpoInfo->pTags[i], pLocalReducer->pBufForInterpo + offset * pResBuf->numOfElems, pSchema->bytes); } taosInterpoSetStartInfo(&pLocalReducer->interpolationInfo, pResBuf->numOfElems, pQueryInfo->interpoType); @@ -1237,7 +1250,7 @@ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool no return true; } -void resetOutputBuf(SQueryInfo* pQueryInfo, SLocalReducer *pLocalReducer) { // reset output buffer to the beginning +void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) { // reset output buffer to the beginning for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { pLocalReducer->pCtx[i].aOutputBuf = pLocalReducer->pResultBuf->data + tscFieldInfoGetOffset(pQueryInfo, i) * pLocalReducer->resColModel->capacity; @@ -1250,21 +1263,22 @@ static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer // In handling data in other groups, we need to reset the interpolation information for a new group data pRes->numOfRows = 0; pRes->numOfTotalInCurrentClause = 0; - - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - + + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + pQueryInfo->limit.offset = pLocalReducer->offset; - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); - int16_t precision = pMeterMetaInfo->pMeterMeta->precision; + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); + int16_t precision = pMeterMetaInfo->pMeterMeta->precision; // for group result interpolation, do not return if not data is generated if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) { int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime; - int64_t newTime = taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, precision); + int64_t newTime = + taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, precision); - taosInitInterpoInfo(&pLocalReducer->interpolationInfo, pQueryInfo->order.order, newTime, pQueryInfo->groupbyExpr.numOfGroupCols, - pLocalReducer->rowSize); + taosInitInterpoInfo(&pLocalReducer->interpolationInfo, pQueryInfo->order.order, newTime, + pQueryInfo->groupbyExpr.numOfGroupCols, pLocalReducer->rowSize); } } @@ -1276,12 +1290,12 @@ static bool doInterpolationForCurrentGroup(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SLocalReducer * pLocalReducer = pRes->pLocalReducer; SInterpolationInfo *pInterpoInfo = &pLocalReducer->interpolationInfo; - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); - int8_t p = pMeterMetaInfo->pMeterMeta->precision; + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); + int8_t p = pMeterMetaInfo->pMeterMeta->precision; if (taosHasRemainsDataForInterpolation(pInterpoInfo)) { assert(pQueryInfo->interpoType != TSDB_INTERPO_NONE); @@ -1290,7 +1304,8 @@ static bool doInterpolationForCurrentGroup(SSqlObj *pSql) { int64_t etime = *(int64_t *)(pFinalDataBuf->data + TSDB_KEYSIZE * (pInterpoInfo->numOfRawDataInRows - 1)); int32_t remain = taosNumOfRemainPoints(pInterpoInfo); - TSKEY ekey = taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, p); + TSKEY ekey = + taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, p); int32_t rows = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY *)pLocalReducer->pBufForInterpo, remain, pQueryInfo->intervalTime, ekey, pLocalReducer->resColModel->capacity); if (rows > 0) { // do interpo @@ -1312,9 +1327,9 @@ static bool doHandleLastRemainData(SSqlObj *pSql) { bool prevGroupCompleted = (!pLocalReducer->discard) && pLocalReducer->hasUnprocessedRow; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); - int8_t precision = pMeterMetaInfo->pMeterMeta->precision; + SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); + int8_t precision = pMeterMetaInfo->pMeterMeta->precision; if ((isAllSourcesCompleted(pLocalReducer) && !pLocalReducer->hasPrevRow) || pLocalReducer->pLocalDataSrc[0] == NULL || prevGroupCompleted) { @@ -1322,7 +1337,8 @@ static bool doHandleLastRemainData(SSqlObj *pSql) { if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) { int64_t etime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->etime : pQueryInfo->stime; - etime = taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, precision); + etime = taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime, + pQueryInfo->intervalTimeUnit, precision); int32_t rows = taosGetNumOfResultWithInterpo(pInterpoInfo, NULL, 0, pQueryInfo->intervalTime, etime, pLocalReducer->resColModel->capacity); if (rows > 0) { // do interpo @@ -1351,15 +1367,15 @@ static bool doHandleLastRemainData(SSqlObj *pSql) { return false; } -static void doMergeWithPrevRows(SSqlObj *pSql, int32_t numOfRes) { - SSqlCmd * pCmd = &pSql->cmd; - SSqlRes * pRes = &pSql->res; - +static void doProcessResultInNextWindow(SSqlObj *pSql, int32_t numOfRes) { + SSqlCmd *pCmd = &pSql->cmd; + SSqlRes *pRes = &pSql->res; + SLocalReducer *pLocalReducer = pRes->pLocalReducer; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - - for (int32_t k = 0; k < pQueryInfo->fieldsInfo.numOfOutputCols; ++k) { - SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, k); + SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + + for (int32_t k = 0; k < pQueryInfo->exprsInfo.numOfExprs; ++k) { + SSqlExpr * pExpr = tscSqlExprGet(pQueryInfo, k); SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[k]; pCtx->aOutputBuf += pCtx->outputBytes * numOfRes; @@ -1376,24 +1392,24 @@ static void doMergeWithPrevRows(SSqlObj *pSql, int32_t numOfRes) { int32_t tscDoLocalreduce(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - + tscResetForNextRetrieve(pRes); - + if (pSql->signature != pSql || pRes == NULL || pRes->pLocalReducer == NULL) { // all data has been processed tscTrace("%s call the drop local reducer", __FUNCTION__); tscDestroyLocalReducer(pSql); return 0; } - + SLocalReducer *pLocalReducer = pRes->pLocalReducer; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - + SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + // set the data merge in progress int32_t prevStatus = atomic_val_compare_exchange_32(&pLocalReducer->status, TSC_LOCALREDUCE_READY, TSC_LOCALREDUCE_IN_PROGRESS); if (prevStatus != TSC_LOCALREDUCE_READY || pLocalReducer == NULL) { - assert(prevStatus == TSC_LOCALREDUCE_TOBE_FREED); // it is in tscDestroyLocalReducer function already + assert(prevStatus == TSC_LOCALREDUCE_TOBE_FREED); // it is in tscDestroyLocalReducer function already return TSDB_CODE_SUCCESS; } @@ -1491,8 +1507,7 @@ int32_t tscDoLocalreduce(SSqlObj *pSql) { * if the previous group does NOT generate any result (pResBuf->numOfElems == 0), * continue to process results instead of return results. */ - if ((!sameGroup && pResBuf->numOfElems > 0) || - (pResBuf->numOfElems == pLocalReducer->resColModel->capacity)) { + if ((!sameGroup && pResBuf->numOfElems > 0) || (pResBuf->numOfElems == pLocalReducer->resColModel->capacity)) { // does not belong to the same group bool notSkipped = doGenerateFinalResults(pSql, pLocalReducer, !sameGroup); @@ -1545,7 +1560,7 @@ int32_t tscDoLocalreduce(SSqlObj *pSql) { return TSDB_CODE_SUCCESS; } } else { // result buffer is not full - doMergeWithPrevRows(pSql, numOfRes); + doProcessResultInNextWindow(pSql, numOfRes); savePreviousRow(pLocalReducer, tmpBuffer); } } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 121ff0c0e9..ca6c5ad236 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -643,9 +643,9 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu tscColumnBaseInfoUpdateTableIndex(&pNewQueryInfo->colList, 0); tscColumnBaseInfoCopy(&pSupporter->colList, &pNewQueryInfo->colList, 0); - tscSqlExprCopy(&pSupporter->exprsInfo, &pNewQueryInfo->exprsInfo, pSupporter->uid); - + tscSqlExprCopy(&pSupporter->exprsInfo, &pNewQueryInfo->exprsInfo, pSupporter->uid, false); tscFieldInfoCopyAll(&pSupporter->fieldsInfo, &pNewQueryInfo->fieldsInfo); + tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond); pNew->cmd.numOfCols = 0; @@ -656,6 +656,10 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu pSupporter->groupbyExpr = pNewQueryInfo->groupbyExpr; memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr)); + // this data needs to be transfer to support struct + pNewQueryInfo->fieldsInfo.numOfOutputCols = 0; + pNewQueryInfo->exprsInfo.numOfExprs = 0; + // set the ts,tags that involved in join, as the output column of intermediate result tscClearSubqueryInfo(&pNew->cmd); @@ -1537,7 +1541,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) { int32_t srcColListSize = pQueryInfo->colList.numOfCols * sizeof(SColumnInfo); - int32_t exprSize = sizeof(SSqlFuncExprMsg) * pQueryInfo->fieldsInfo.numOfOutputCols; + int32_t exprSize = sizeof(SSqlFuncExprMsg) * pQueryInfo->exprsInfo.numOfExprs; SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); // meter query without tags values @@ -1546,11 +1550,10 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) { } SMetricMeta *pMetricMeta = pMeterMetaInfo->pMetricMeta; - SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex); int32_t meterInfoSize = (pMetricMeta->tagLen + sizeof(SMeterSidExtInfo)) * pVnodeSidList->numOfSids; - int32_t outputColumnSize = pQueryInfo->fieldsInfo.numOfOutputCols * sizeof(SSqlFuncExprMsg); + int32_t outputColumnSize = pQueryInfo->exprsInfo.numOfExprs * sizeof(SSqlFuncExprMsg); int32_t size = meterInfoSize + outputColumnSize + srcColListSize + exprSize + MIN_QUERY_MSG_PKT_SIZE; if (pQueryInfo->tsBuf != NULL) { @@ -1787,7 +1790,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pSqlFuncExpr->functionId = htons(pExpr->functionId); pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams); - pMsg += (sizeof(SSqlFuncExprMsg) - TSDB_COL_NAME_LEN); + pMsg += sizeof(SSqlFuncExprMsg); for (int32_t j = 0; j < pExpr->numOfParams; ++j) { pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType); @@ -1862,6 +1865,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { *((int16_t *)pMsg) += pCol->flag; pMsg += sizeof(pCol->flag); + + memcpy(pMsg, pCol->name, tListLen(pCol->name)); + pMsg += tListLen(pCol->name); } } @@ -2491,16 +2497,8 @@ static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) { } for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { - TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i); - int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i); - - pRes->bytes[i] = pField->bytes; -// if (pQueryInfo->order.order == TSQL_SO_DESC) { -// pRes->bytes[i] = -pRes->bytes[i]; -// pRes->tsrow[i] = ((pRes->data + offset * pRes->numOfRows) + (pRes->numOfRows - 1) * pField->bytes); -// } else { - pRes->tsrow[i] = (pRes->data + offset * pRes->numOfRows); -// } + int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i); + pRes->tsrow[i] = (pRes->data + offset * pRes->numOfRows); } return 0; @@ -2725,8 +2723,10 @@ static int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) { int32_t joinCondLen = (TSDB_METER_ID_LEN + sizeof(int16_t)) * 2; int32_t elemSize = sizeof(SMetricMetaElemMsg) * pQueryInfo->numOfTables; + + int32_t colSize = pQueryInfo->groupbyExpr.numOfGroupCols*sizeof(SColIndexEx); - int32_t len = tagLen + joinCondLen + elemSize + defaultSize; + int32_t len = tagLen + joinCondLen + elemSize + colSize + defaultSize; return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE); } @@ -2854,6 +2854,7 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pDestCol->colIdx = htons(pCol->colIdx); pDestCol->colId = htons(pDestCol->colId); pDestCol->flag = htons(pDestCol->flag); + strncpy(pDestCol->name, pCol->name, tListLen(pCol->name)); pMsg += sizeof(SColIndexEx); } @@ -3291,6 +3292,7 @@ int tscProcessShowRsp(SSqlObj *pSql) { int32_t size = pMeta->numOfColumns * sizeof(SSchema) + sizeof(SMeterMeta); pMeterMetaInfo->pMeterMeta = (SMeterMeta *)taosAddDataIntoCache(tscCacheHandle, key, (char *)pMeta, size, tsMeterMetaKeepTimer); + pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutputCols; SSchema *pMeterSchema = tsGetSchema(pMeterMetaInfo->pMeterMeta); @@ -3301,6 +3303,9 @@ int tscProcessShowRsp(SSqlObj *pSql) { index.columnIndex = i; tscColumnBaseInfoInsert(pQueryInfo, &index); tscFieldInfoSetValFromSchema(&pQueryInfo->fieldsInfo, i, &pMeterSchema[i]); + + pQueryInfo->fieldsInfo.pSqlExpr[i] = tscSqlExprInsert(pQueryInfo, i, TSDB_FUNC_TS_DUMMY, &index, + pMeterSchema[i].type, pMeterSchema[i].bytes, pMeterSchema[i].bytes); } tscFieldInfoCalOffset(pQueryInfo); diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index e9e74413c6..57a3e88663 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -441,7 +441,10 @@ static void **doSetResultRowData(SSqlObj *pSql) { int32_t num = 0; for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) { if (pQueryInfo->fieldsInfo.pSqlExpr[i] != NULL) { - pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i) + pRes->bytes[i] * pRes->row; + SSqlExpr* pExpr = pQueryInfo->fieldsInfo.pSqlExpr[i]; + pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i) + pExpr->resBytes * pRes->row; + } else { + assert(0); } // primary key column cannot be null in interval query, no need to check diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index bff58bd61a..8dc8ad49b5 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -246,8 +246,6 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf int32_t retry = tsProjectExecInterval; tscError("%p stream:%p, retrieve no data, code:%d, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retry); - tscClearSqlMetaInfoForce(&(pStream->pSql->cmd)); - tscSetRetryTimer(pStream, pStream->pSql, retry); return; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 65d41fe516..6dcf27b1d7 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -336,35 +336,17 @@ void tscClearInterpInfo(SQueryInfo* pQueryInfo) { tfree(pQueryInfo->defaultVal); } -void tscClearSqlMetaInfoForce(SSqlCmd* pCmd) { - /* remove the metermeta/metricmeta in cache */ - // taosRemoveDataFromCache(tscCacheHandle, (void**)&(pCmd->pMeterMeta), true); - // taosRemoveDataFromCache(tscCacheHandle, (void**)&(pCmd->pMetricMeta), true); -} - int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) { if (pRes->tsrow == NULL) { -// int32_t numOfOutputCols = pQueryInfo->fieldsInfo.numOfOutputCols; - pRes->numOfnchar = numOfOutputCols; -// for (int32_t i = 0; i < numOfOutputCols; ++i) { -// TAOS_FIELD* pField = tscFieldInfoGetField(pQueryInfo, i); -// if (pField->type == TSDB_DATA_TYPE_NCHAR) { -// pRes->numOfnchar++; -// } -// } + pRes->numOfCols = numOfOutputCols; - pRes->tsrow = calloc(1, (POINTER_BYTES + sizeof(short)) * numOfOutputCols + POINTER_BYTES * pRes->numOfnchar); - pRes->bytes = calloc(numOfOutputCols, sizeof(short)); - -// if (pRes->numOfnchar > 0) { - pRes->buffer = calloc(POINTER_BYTES, numOfOutputCols); -// } + pRes->tsrow = calloc(POINTER_BYTES, numOfOutputCols); + pRes->buffer = calloc(POINTER_BYTES, numOfOutputCols); // not enough memory - if (pRes->tsrow == NULL || pRes->bytes == NULL || (pRes->buffer == NULL && pRes->numOfnchar > 0)) { + if (pRes->tsrow == NULL || (pRes->buffer == NULL && pRes->numOfCols > 0)) { tfree(pRes->tsrow); - tfree(pRes->bytes); tfree(pRes->buffer); pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; @@ -377,13 +359,12 @@ int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) { void tscDestroyResPointerInfo(SSqlRes* pRes) { if (pRes->buffer != NULL) { -// assert(pRes->numOfnchar > 0); // free all buffers containing the multibyte string - for (int i = 0; i < pRes->numOfnchar; i++) { + for (int i = 0; i < pRes->numOfCols; i++) { tfree(pRes->buffer[i]); } - pRes->numOfnchar = 0; + pRes->numOfCols = 0; } tfree(pRes->pRsp); @@ -392,7 +373,6 @@ void tscDestroyResPointerInfo(SSqlRes* pRes) { tfree(pRes->pGroupRec); tfree(pRes->pColumnIndex); tfree(pRes->buffer); - tfree(pRes->bytes); pRes->data = NULL; // pRes->data points to the buffer of pRsp, no need to free } @@ -930,10 +910,10 @@ void tscFieldInfoSetBinExpr(SFieldInfo* pFieldInfo, int32_t index, SSqlFunctionE void tscFieldInfoCalOffset(SQueryInfo* pQueryInfo) { SSqlExprInfo* pExprInfo = &pQueryInfo->exprsInfo; - pExprInfo->pExprs[0].offset = 0; + pExprInfo->pExprs[0]->offset = 0; for (int32_t i = 1; i < pExprInfo->numOfExprs; ++i) { - pExprInfo->pExprs[i].offset = pExprInfo->pExprs[i - 1].offset + pExprInfo->pExprs[i - 1].resBytes; + pExprInfo->pExprs[i]->offset = pExprInfo->pExprs[i - 1]->offset + pExprInfo->pExprs[i - 1]->resBytes; } } @@ -957,10 +937,10 @@ void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo) { return; } - pExprInfo->pExprs[0].offset = 0; + pExprInfo->pExprs[0]->offset = 0; for (int32_t i = 1; i < pExprInfo->numOfExprs; ++i) { - pExprInfo->pExprs[i].offset = pExprInfo->pExprs[i - 1].offset + pExprInfo->pExprs[i - 1].resBytes; + pExprInfo->pExprs[i]->offset = pExprInfo->pExprs[i - 1]->offset + pExprInfo->pExprs[i - 1]->resBytes; } } @@ -976,6 +956,8 @@ void tscFieldInfoCopy(SFieldInfo* src, SFieldInfo* dst, const int32_t* indexList for (int32_t i = 0; i < size; ++i) { assert(indexList[i] >= 0 && indexList[i] <= src->numOfOutputCols); tscFieldInfoSetValFromField(dst, i, &src->pFields[indexList[i]]); + dst->pVisibleCols[i] = src->pVisibleCols[indexList[i]]; + dst->pSqlExpr[i] = src->pSqlExpr[indexList[i]]; } } } @@ -984,14 +966,14 @@ void tscFieldInfoCopyAll(SFieldInfo* dst, SFieldInfo* src) { *dst = *src; dst->pFields = malloc(sizeof(TAOS_FIELD) * dst->numOfAlloc); -// dst->pOffset = malloc(sizeof(short) * dst->numOfAlloc); dst->pVisibleCols = malloc(sizeof(bool) * dst->numOfAlloc); dst->pSqlExpr = malloc(POINTER_BYTES * dst->numOfAlloc); + dst->pExpr = malloc(POINTER_BYTES * dst->numOfAlloc); memcpy(dst->pFields, src->pFields, sizeof(TAOS_FIELD) * dst->numOfOutputCols); -// memcpy(dst->pOffset, src->pOffset, sizeof(short) * dst->numOfOutputCols); memcpy(dst->pVisibleCols, src->pVisibleCols, sizeof(bool) * dst->numOfOutputCols); memcpy(dst->pSqlExpr, src->pSqlExpr, POINTER_BYTES * dst->numOfOutputCols); + memcpy(dst->pExpr, src->pExpr, POINTER_BYTES * dst->numOfOutputCols); } TAOS_FIELD* tscFieldInfoGetField(SQueryInfo* pQueryInfo, int32_t index) { @@ -1009,7 +991,7 @@ int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index) { return 0; } - return pQueryInfo->exprsInfo.pExprs[index].offset; + return pQueryInfo->exprsInfo.pExprs[index]->offset; } int32_t tscFieldInfoCompare(SFieldInfo* pFieldInfo1, SFieldInfo* pFieldInfo2) { @@ -1039,7 +1021,7 @@ int32_t tscGetResRowLength(SQueryInfo* pQueryInfo) { int32_t size = 0; for(int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) { - size += pQueryInfo->exprsInfo.pExprs[i].resBytes; + size += pQueryInfo->exprsInfo.pExprs[i]->resBytes; } return size; @@ -1050,7 +1032,6 @@ void tscClearFieldInfo(SFieldInfo* pFieldInfo) { return; } -// tfree(pFieldInfo->pOffset); tfree(pFieldInfo->pFields); tfree(pFieldInfo->pVisibleCols); tfree(pFieldInfo->pSqlExpr); @@ -1101,11 +1082,12 @@ SSqlExpr* tscSqlExprInsertEmpty(SQueryInfo* pQueryInfo, int32_t index, int16_t f _exprCheckSpace(pExprInfo, pExprInfo->numOfExprs + 1); _exprEvic(pExprInfo, index); - - SSqlExpr* pExpr = &pExprInfo->pExprs[index]; + + SSqlExpr* pExpr = calloc(1, sizeof(SSqlExpr)); pExpr->functionId = functionId; - + pExprInfo->numOfExprs++; + pExprInfo->pExprs[index] = pExpr; return pExpr; } @@ -1118,8 +1100,9 @@ SSqlExpr* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functi _exprCheckSpace(pExprInfo, pExprInfo->numOfExprs + 1); _exprEvic(pExprInfo, index); - SSqlExpr* pExpr = &pExprInfo->pExprs[index]; - + SSqlExpr* pExpr = calloc(1, sizeof(SSqlExpr)); + pExprInfo->pExprs[index] = pExpr; + pExpr->functionId = functionId; int16_t numOfCols = pMeterMetaInfo->pMeterMeta->numOfColumns; @@ -1161,7 +1144,7 @@ SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functi return NULL; } - SSqlExpr* pExpr = &pExprInfo->pExprs[index]; + SSqlExpr* pExpr = pExprInfo->pExprs[index]; pExpr->functionId = functionId; @@ -1196,7 +1179,7 @@ SSqlExpr* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index) { return NULL; } - return &pQueryInfo->exprsInfo.pExprs[index]; + return pQueryInfo->exprsInfo.pExprs[index]; } void* tscSqlExprDestroy(SSqlExpr* pExpr) { @@ -1208,6 +1191,8 @@ void* tscSqlExprDestroy(SSqlExpr* pExpr) { tVariantDestroy(&pExpr->param[i]); } + tfree(pExpr); + return NULL; } @@ -1219,8 +1204,8 @@ void tscSqlExprInfoDestroy(SSqlExprInfo* pExprInfo) { return; } - for(int32_t i = 0; i < pExprInfo->numOfAlloc; ++i) { - tscSqlExprDestroy(&pExprInfo->pExprs[i]); + for(int32_t i = 0; i < pExprInfo->numOfExprs; ++i) { + tscSqlExprDestroy(pExprInfo->pExprs[i]); } tfree(pExprInfo->pExprs); @@ -1230,27 +1215,40 @@ void tscSqlExprInfoDestroy(SSqlExprInfo* pExprInfo) { } -void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t tableuid) { +void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t tableuid, bool deepcopy) { if (src == NULL) { return; } *dst = *src; - dst->pExprs = calloc(dst->numOfAlloc, sizeof(SSqlExpr)); + dst->pExprs = calloc(dst->numOfAlloc, POINTER_BYTES); + int16_t num = 0; for (int32_t i = 0; i < src->numOfExprs; ++i) { - if (src->pExprs[i].uid == tableuid) { - dst->pExprs[num++] = src->pExprs[i]; + if (src->pExprs[i]->uid == tableuid) { + + if (deepcopy) { + dst->pExprs[num] = calloc(1, sizeof(SSqlExpr)); + *dst->pExprs[num] = *src->pExprs[i]; + } else { + dst->pExprs[num] = src->pExprs[i]; + } + + num++; } } dst->numOfExprs = num; - for (int32_t i = 0; i < dst->numOfExprs; ++i) { - for (int32_t j = 0; j < src->pExprs[i].numOfParams; ++j) { - tVariantAssign(&dst->pExprs[i].param[j], &src->pExprs[i].param[j]); + + if (deepcopy) { + for (int32_t i = 0; i < dst->numOfExprs; ++i) { + for (int32_t j = 0; j < src->pExprs[i]->numOfParams; ++j) { + tVariantAssign(&dst->pExprs[i]->param[j], &src->pExprs[i]->param[j]); + } } } + } static void clearVal(SColumnBase* pBase) { @@ -2005,7 +2003,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void } uint64_t uid = pMeterMetaInfo->pMeterMeta->uid; - tscSqlExprCopy(&pNewQueryInfo->exprsInfo, &pQueryInfo->exprsInfo, uid); + tscSqlExprCopy(&pNewQueryInfo->exprsInfo, &pQueryInfo->exprsInfo, uid, true); int32_t numOfOutputCols = pNewQueryInfo->exprsInfo.numOfExprs; @@ -2020,7 +2018,19 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void tscFieldInfoCopy(&pQueryInfo->fieldsInfo, &pNewQueryInfo->fieldsInfo, indexList, numOfOutputCols); free(indexList); - + + // make sure the the sqlExpr for each fields is correct +// todo handle the agg arithmetic expression + for(int32_t f = 0; f < pNewQueryInfo->fieldsInfo.numOfOutputCols; ++f) { + char* name = pNewQueryInfo->fieldsInfo.pFields[f].name; + for(int32_t k1 = 0; k1 < pNewQueryInfo->exprsInfo.numOfExprs; ++k1) { + SSqlExpr* pExpr1 = tscSqlExprGet(pNewQueryInfo, k1); + if (strcmp(name, pExpr1->aliasName) == 0) { + pNewQueryInfo->fieldsInfo.pSqlExpr[f] = pExpr1; + } + } + } + tscFieldInfoUpdateOffsetForInterResult(pNewQueryInfo); } diff --git a/src/inc/tsqlfunction.h b/src/inc/tsqlfunction.h index b3f095507f..9b53e5d84b 100644 --- a/src/inc/tsqlfunction.h +++ b/src/inc/tsqlfunction.h @@ -298,6 +298,9 @@ void getStatistics(char *priData, char *data, int32_t size, int32_t numOfRow, in bool top_bot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, char *minval, char *maxval); +bool stableQueryFunctChanged(int32_t funcId); + + void resetResultInfo(SResultInfo *pResInfo); void initResultInfo(SResultInfo *pResInfo); void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable); diff --git a/src/system/detail/src/vnodeRead.c b/src/system/detail/src/vnodeRead.c index e07ee6d13a..e82078685a 100644 --- a/src/system/detail/src/vnodeRead.c +++ b/src/system/detail/src/vnodeRead.c @@ -1056,7 +1056,7 @@ int32_t vnodeConvertQueryMeterMsg(SQueryMeterMsg *pQueryMsg) { pExprMsg->functionId = htons(pExprMsg->functionId); pExprMsg->numOfParams = htons(pExprMsg->numOfParams); - pMsg += (sizeof(SSqlFuncExprMsg) - TSDB_COL_NAME_LEN); + pMsg += sizeof(SSqlFuncExprMsg); for (int32_t j = 0; j < pExprMsg->numOfParams; ++j) { pExprMsg->arg[j].argType = htons(pExprMsg->arg[j].argType);