diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 35f3b42811..950d81fb61 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -210,7 +210,8 @@ int32_t createProjectionExpr(SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaI void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta); SColumn* tscColumnClone(const SColumn* src); -bool tscColumnExists(SArray* pColumnList, int32_t columnIndex, uint64_t uid); +void tscColumnCopy(SColumn* pDest, const SColumn* pSrc); +int32_t tscColumnExists(SArray* pColumnList, int32_t columnId, uint64_t uid); SColumn* tscColumnListInsert(SArray* pColumnList, int32_t columnIndex, uint64_t uid, SSchema* pSchema); void tscColumnListDestroy(SArray* pColList); void tscColumnListCopy(SArray* dst, const SArray* src, uint64_t tableUid); diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index bac8920d8f..8bb776ffee 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -1156,27 +1156,6 @@ static void insertBatchClean(STscStmt* pStmt) { tfree(pCmd->insertParam.pTableNameList); -/* - STableDataBlocks** p = taosHashIterate(pCmd->insertParam.pTableBlockHashList, NULL); - - STableDataBlocks* pOneTableBlock = *p; - - while (1) { - SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData; - - pOneTableBlock->size = sizeof(SSubmitBlk); - - pBlocks->numOfRows = 0; - - p = taosHashIterate(pCmd->insertParam.pTableBlockHashList, p); - if (p == NULL) { - break; - } - - pOneTableBlock = *p; - } -*/ - pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks); pCmd->insertParam.numOfTables = 0; @@ -1499,7 +1478,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) { pRes->numOfRows = 1; strtolower(pSql->sqlstr, sql); - tscDebugL("%p SQL: %s", pSql, pSql->sqlstr); + tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr); if (tscIsInsertData(pSql->sqlstr)) { pStmt->isInsert = true; @@ -1604,7 +1583,7 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags if (taosHashGetSize(pCmd->insertParam.pTableBlockHashList) > 0) { SHashObj* hashList = pCmd->insertParam.pTableBlockHashList; pCmd->insertParam.pTableBlockHashList = NULL; - tscResetSqlCmd(pCmd, true); + tscResetSqlCmd(pCmd, false); pCmd->insertParam.pTableBlockHashList = hashList; } @@ -1663,7 +1642,7 @@ int taos_stmt_close(TAOS_STMT* stmt) { } else { if (pStmt->multiTbInsert) { taosHashCleanup(pStmt->mtb.pTableHash); - pStmt->mtb.pTableBlockHashList = tscDestroyBlockHashTable(pStmt->mtb.pTableBlockHashList, true); + pStmt->mtb.pTableBlockHashList = tscDestroyBlockHashTable(pStmt->mtb.pTableBlockHashList, false); taosHashCleanup(pStmt->pSql->cmd.insertParam.pTableBlockHashList); pStmt->pSql->cmd.insertParam.pTableBlockHashList = NULL; taosArrayDestroy(pStmt->mtb.tags); diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index b2b188661d..67e0667ee0 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1699,7 +1699,7 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32 // arithmetic expression always return result in the format of double float pExprInfo->base.resBytes = sizeof(double); - pExprInfo->base.interBytes = sizeof(double); + pExprInfo->base.interBytes = 0; pExprInfo->base.resType = TSDB_DATA_TYPE_DOUBLE; pExprInfo->base.functionId = TSDB_FUNC_ARITHM; @@ -1934,14 +1934,14 @@ SExprInfo* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t colIndex, int32_t tab index.columnIndex = colIndex; } - return tscExprAppend(pQueryInfo, functionId, &index, pSchema->type, pSchema->bytes, colId, pSchema->bytes, + return tscExprAppend(pQueryInfo, functionId, &index, pSchema->type, pSchema->bytes, colId, 0, (functionId == TSDB_FUNC_TAGPRJ)); } SExprInfo* tscAddFuncInSelectClause(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId, SColumnIndex* pIndex, SSchema* pColSchema, int16_t flag, int16_t colId) { SExprInfo* pExpr = tscExprInsert(pQueryInfo, outputColIndex, functionId, pIndex, pColSchema->type, - pColSchema->bytes, colId, pColSchema->bytes, TSDB_COL_IS_TAG(flag)); + pColSchema->bytes, colId, 0, TSDB_COL_IS_TAG(flag)); tstrncpy(pExpr->base.aliasName, pColSchema->name, sizeof(pExpr->base.aliasName)); tstrncpy(pExpr->base.token, pColSchema->name, sizeof(pExpr->base.token)); @@ -2096,7 +2096,7 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS type = pSchema->type; bytes = pSchema->bytes; } - + SExprInfo* pExpr = tscExprAppend(pQueryInfo, functionID, pColIndex, type, bytes, getNewResColId(pCmd), bytes, false); tstrncpy(pExpr->base.aliasName, name, tListLen(pExpr->base.aliasName)); @@ -2168,6 +2168,17 @@ static void updateLastScanOrderIfNeeded(SQueryInfo* pQueryInfo) { } } +static UNUSED_FUNC void updateFunctionInterBuf(SQueryInfo* pQueryInfo, bool superTable) { + size_t numOfExpr = tscNumOfExprs(pQueryInfo); + for (int32_t i = 0; i < numOfExpr; ++i) { + SExprInfo* pExpr = tscExprGet(pQueryInfo, i); + + int32_t param = (int32_t)pExpr->base.param[0].i64; + getResultDataInfo(pExpr->base.colType, pExpr->base.colBytes, pExpr->base.functionId, param, &pExpr->base.resType, &pExpr->base.resBytes, + &pExpr->base.interBytes, 0, superTable); + } +} + int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t colIndex, tSqlExprItem* pItem, bool finalResult) { STableMetaInfo* pTableMetaInfo = NULL; int32_t functionId = pItem->pNode->functionId; @@ -2277,10 +2288,11 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col case TSDB_FUNC_LEASTSQR: { // 1. valid the number of parameters int32_t numOfParams = (pItem->pNode->pParam == NULL)? 0: (int32_t) taosArrayGetSize(pItem->pNode->pParam); + + // no parameters or more than one parameter for function if (pItem->pNode->pParam == NULL || (functionId != TSDB_FUNC_LEASTSQR && functionId != TSDB_FUNC_DERIVATIVE && numOfParams != 1) || ((functionId == TSDB_FUNC_LEASTSQR || functionId == TSDB_FUNC_DERIVATIVE) && numOfParams != 3)) { - /* no parameters or more than one parameter for function */ return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); } @@ -2294,14 +2306,15 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3); } - if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { + pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); + STableComInfo info = tscGetTableInfo(pTableMetaInfo->pTableMeta); + + // functions can not be applied to tags + if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX || (index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta))) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6); } // 2. check if sql function can be applied on this column data type - pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); - STableComInfo info = tscGetTableInfo(pTableMetaInfo->pTableMeta); - SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex); if (!IS_NUMERIC_TYPE(pSchema->type)) { @@ -2330,11 +2343,6 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS_DUMMY].name, pExpr); } - // functions can not be applied to tags - if (index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6); - } - SExprInfo* pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), intermediateResSize, false); if (functionId == TSDB_FUNC_LEASTSQR) { // set the leastsquares parameters @@ -2363,9 +2371,9 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col } if (info.precision == TSDB_TIME_PRECISION_MILLI) { - tickPerSec /= 1000000; + tickPerSec /= TSDB_TICK_PER_SECOND(TSDB_TIME_PRECISION_MICRO); } else if (info.precision == TSDB_TIME_PRECISION_MICRO) { - tickPerSec /= 1000; + tickPerSec /= TSDB_TICK_PER_SECOND(TSDB_TIME_PRECISION_MILLI); } if (tickPerSec <= 0 || tickPerSec < TSDB_TICK_PER_SECOND(info.precision)) { @@ -2598,7 +2606,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col // set the first column ts for top/bottom query SColumnIndex index1 = {index.tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX}; pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS, &index1, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, getNewResColId(pCmd), - TSDB_KEYSIZE, false); + 0, false); tstrncpy(pExpr->base.aliasName, aAggs[TSDB_FUNC_TS].name, sizeof(pExpr->base.aliasName)); const int32_t TS_COLUMN_INDEX = PRIMARYKEY_TIMESTAMP_COL_INDEX; @@ -3113,15 +3121,10 @@ bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) return true; } - if (pQueryInfo->groupbyExpr.numOfGroupCols != 1) { + SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, 0); + if (pColIndex->colIndex != TSDB_TBNAME_COLUMN_INDEX) { invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); return true; - } else { - SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, 0); - if (pColIndex->colIndex != TSDB_TBNAME_COLUMN_INDEX) { - invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); - return true; - } } } else if (tscIsSessionWindowQuery(pQueryInfo)) { invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3); @@ -3675,7 +3678,7 @@ static int32_t checkAndSetJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tS STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; index.columnIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta); - if (!tscColumnExists(pTableMetaInfo->tagColList, index.columnIndex, pTableMetaInfo->pTableMeta->id.uid)) { + if (tscColumnExists(pTableMetaInfo->tagColList, pTagSchema1->colId, pTableMetaInfo->pTableMeta->id.uid) < 0) { tscColumnListInsert(pTableMetaInfo->tagColList, index.columnIndex, pTableMeta->id.uid, pTagSchema1); if (taosArrayGetSize(pTableMetaInfo->tagColList) > 1) { @@ -3707,7 +3710,7 @@ static int32_t checkAndSetJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tS if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; index.columnIndex = index.columnIndex - tscGetNumOfColumns(pTableMeta); - if (!tscColumnExists(pTableMetaInfo->tagColList, index.columnIndex, pTableMeta->id.uid)) { + if (tscColumnExists(pTableMetaInfo->tagColList, pTagSchema2->colId, pTableMeta->id.uid) < 0) { tscColumnListInsert(pTableMetaInfo->tagColList, index.columnIndex, pTableMeta->id.uid, pTagSchema2); if (taosArrayGetSize(pTableMetaInfo->tagColList) > 1) { @@ -7905,6 +7908,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf return code; } +// updateFunctionInterBuf(pQueryInfo, false); updateLastScanOrderIfNeeded(pQueryInfo); } else { pQueryInfo->command = TSDB_SQL_SELECT; @@ -8033,6 +8037,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf updateLastScanOrderIfNeeded(pQueryInfo); tscFieldInfoUpdateOffset(pQueryInfo); +// updateFunctionInterBuf(pQueryInfo, isSTable); if ((code = validateFillNode(pCmd, pQueryInfo, pSqlNode)) != TSDB_CODE_SUCCESS) { return code; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 8c5e99474d..c3cf63bd26 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -795,6 +795,7 @@ static int32_t serializeSqlExpr(SSqlExpr* pExpr, STableMetaInfo* pTableMetaInfo, pSqlExpr->colBytes = htons(pExpr->colBytes); pSqlExpr->resType = htons(pExpr->resType); pSqlExpr->resBytes = htons(pExpr->resBytes); + pSqlExpr->interBytes = htonl(pExpr->interBytes); pSqlExpr->functionId = htons(pExpr->functionId); pSqlExpr->numOfParams = htons(pExpr->numOfParams); pSqlExpr->resColId = htons(pExpr->resColId); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 22a603b71e..c3df4773e1 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -103,13 +103,6 @@ bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) { pthread_mutex_lock(&subState->mutex); -// bool done = allSubqueryDone(pParentSql); -// if (done) { -// tscDebug("0x%"PRIx64" subquery:0x%"PRIx64",%d all subs already done", pParentSql->self, pSql->self, idx); -// pthread_mutex_unlock(&subState->mutex); -// return false; -// } - tscDebug("0x%"PRIx64" subquery:0x%"PRIx64", index:%d state set to 1", pParentSql->self, pSql->self, idx); subState->states[idx] = 1; @@ -2389,8 +2382,14 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) { SColumn *pCol = taosArrayGetP(pColList, i); if (pCol->info.flist.numOfFilters > 0) { // copy to the pNew->cmd.colList if it is filtered. - SColumn *p = tscColumnClone(pCol); - taosArrayPush(pNewQueryInfo->colList, &p); + int32_t index1 = tscColumnExists(pNewQueryInfo->colList, pCol->columnIndex, pCol->tableUid); + if (index1 >= 0) { + SColumn* x = taosArrayGetP(pNewQueryInfo->colList, index1); + tscColumnCopy(x, pCol); + } else { + SColumn *p = tscColumnClone(pCol); + taosArrayPush(pNewQueryInfo->colList, &p); + } } } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 9d2c500a92..74dbe42eeb 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1332,7 +1332,7 @@ void tscFreeSubobj(SSqlObj* pSql) { tscDebug("0x%"PRIx64" start to free sub SqlObj, numOfSub:%d", pSql->self, pSql->subState.numOfSub); for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { - tscDebug("0x%"PRIx64" free sub SqlObj:%p, index:%d", pSql->self, pSql->pSubs[i], i); + tscDebug("0x%"PRIx64" free sub SqlObj:0x%"PRIx64", index:%d", pSql->self, pSql->pSubs[i]->self, i); taos_free_result(pSql->pSubs[i]); pSql->pSubs[i] = NULL; } @@ -1784,7 +1784,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl tscSortRemoveDataBlockDupRows(pOneTableBlock); char* ekey = (char*)pBlocks->data + pOneTableBlock->rowSize*(pBlocks->numOfRows-1); - tscDebug("0x%"PRIx64" name:%s, name:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pInsertParam->objectId, tNameGetTableName(&pOneTableBlock->tableName), + tscDebug("0x%"PRIx64" name:%s, tid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pInsertParam->objectId, tNameGetTableName(&pOneTableBlock->tableName), pBlocks->tid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(ekey)); int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + expandSize) + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta); @@ -2270,18 +2270,14 @@ int32_t tscExprCopyAll(SArray* dst, const SArray* src, bool deepcopy) { return 0; } -bool tscColumnExists(SArray* pColumnList, int32_t columnIndex, uint64_t uid) { - // ignore the tbname columnIndex to be inserted into source list - if (columnIndex < 0) { - return false; - } - +// ignore the tbname columnIndex to be inserted into source list +int32_t tscColumnExists(SArray* pColumnList, int32_t columnId, uint64_t uid) { size_t numOfCols = taosArrayGetSize(pColumnList); int32_t i = 0; while (i < numOfCols) { SColumn* pCol = taosArrayGetP(pColumnList, i); - if ((pCol->columnIndex != columnIndex) || (pCol->tableUid != uid)) { + if ((pCol->info.colId != columnId) || (pCol->tableUid != uid)) { ++i; continue; } else { @@ -2290,10 +2286,10 @@ bool tscColumnExists(SArray* pColumnList, int32_t columnIndex, uint64_t uid) { } if (i >= numOfCols || numOfCols == 0) { - return false; + return -1; } - return true; + return i; } void tscExprAssign(SExprInfo* dst, const SExprInfo* src) { @@ -2379,13 +2375,7 @@ SColumn* tscColumnClone(const SColumn* src) { return NULL; } - dst->columnIndex = src->columnIndex; - dst->tableUid = src->tableUid; - dst->info.flist.numOfFilters = src->info.flist.numOfFilters; - dst->info.flist.filterInfo = tFilterInfoDup(src->info.flist.filterInfo, src->info.flist.numOfFilters); - dst->info.type = src->info.type; - dst->info.colId = src->info.colId; - dst->info.bytes = src->info.bytes; + tscColumnCopy(dst, src); return dst; } @@ -2394,6 +2384,18 @@ static void tscColumnDestroy(SColumn* pCol) { free(pCol); } +void tscColumnCopy(SColumn* pDest, const SColumn* pSrc) { + destroyFilterInfo(&pDest->info.flist); + + pDest->columnIndex = pSrc->columnIndex; + pDest->tableUid = pSrc->tableUid; + pDest->info.flist.numOfFilters = pSrc->info.flist.numOfFilters; + pDest->info.flist.filterInfo = tFilterInfoDup(pSrc->info.flist.filterInfo, pSrc->info.flist.numOfFilters); + pDest->info.type = pSrc->info.type; + pDest->info.colId = pSrc->info.colId; + pDest->info.bytes = pSrc->info.bytes; +} + void tscColumnListCopy(SArray* dst, const SArray* src, uint64_t tableUid) { assert(src != NULL && dst != NULL); diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index dac2dc84b6..365f24e126 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -100,7 +100,7 @@ extern const int32_t TYPE_BYTES[15]; #define TSDB_TIME_PRECISION_MICRO_STR "us" #define TSDB_TIME_PRECISION_NANO_STR "ns" -#define TSDB_TICK_PER_SECOND(precision) ((precision)==TSDB_TIME_PRECISION_MILLI ? 1e3L : ((precision)==TSDB_TIME_PRECISION_MICRO ? 1e6L : 1e9L)) +#define TSDB_TICK_PER_SECOND(precision) ((int64_t)((precision)==TSDB_TIME_PRECISION_MILLI ? 1e3L : ((precision)==TSDB_TIME_PRECISION_MICRO ? 1e6L : 1e9L))) #define T_MEMBER_SIZE(type, member) sizeof(((type *)0)->member) #define T_APPEND_MEMBER(dst, ptr, type, member) \ diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 032c6ee94b..ae37f74fa3 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -1740,16 +1740,22 @@ static int32_t mnodeGetSuperTableMeta(SMnodeMsg *pMsg) { return TSDB_CODE_SUCCESS; } -static int32_t calculateVgroupMsgLength(SSTableVgroupMsg* pInfo, int32_t numOfTable) { +static int32_t doGetVgroupInfoLength(char* name) { + SSTableObj *pTable = mnodeGetSuperTable(name); + int32_t len = 0; + if (pTable != NULL && pTable->vgHash != NULL) { + len = (taosHashGetSize(pTable->vgHash) * sizeof(SVgroupMsg) + sizeof(SVgroupsMsg)); + } + + mnodeDecTableRef(pTable); + return len; +} + +static int32_t getVgroupInfoLength(SSTableVgroupMsg* pInfo, int32_t numOfTable) { int32_t contLen = sizeof(SSTableVgroupRspMsg) + 32 * sizeof(SVgroupMsg) + sizeof(SVgroupsMsg); for (int32_t i = 0; i < numOfTable; ++i) { char *stableName = (char *)pInfo + sizeof(SSTableVgroupMsg) + (TSDB_TABLE_FNAME_LEN)*i; - SSTableObj *pTable = mnodeGetSuperTable(stableName); - if (pTable != NULL && pTable->vgHash != NULL) { - contLen += (taosHashGetSize(pTable->vgHash) * sizeof(SVgroupMsg) + sizeof(SVgroupsMsg)); - } - - mnodeDecTableRef(pTable); + contLen += doGetVgroupInfoLength(stableName); } return contLen; @@ -1820,7 +1826,7 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { int32_t numOfTable = htonl(pInfo->numOfTables); // calculate the required space. - int32_t contLen = calculateVgroupMsgLength(pInfo, numOfTable); + int32_t contLen = getVgroupInfoLength(pInfo, numOfTable); SSTableVgroupRspMsg *pRsp = rpcMallocCont(contLen); if (pRsp == NULL) { return TSDB_CODE_MND_OUT_OF_MEMORY; @@ -2860,6 +2866,27 @@ static void mnodeProcessAlterTableRsp(SRpcMsg *rpcMsg) { } } +static SMultiTableMeta* ensureMsgBufferSpace(SMultiTableMeta *pMultiMeta, SArray* pList, int32_t* totalMallocLen, int32_t numOfVgroupList) { + int32_t len = 0; + for (int32_t i = 0; i < numOfVgroupList; ++i) { + char *name = taosArrayGetP(pList, i); + len += doGetVgroupInfoLength(name); + } + + if (len + pMultiMeta->contLen > (*totalMallocLen)) { + while (len + pMultiMeta->contLen > (*totalMallocLen)) { + (*totalMallocLen) *= 2; + } + + pMultiMeta = rpcReallocCont(pMultiMeta, *totalMallocLen); + if (pMultiMeta == NULL) { + return NULL; + } + } + + return pMultiMeta; +} + static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { SMultiTableInfoMsg *pInfo = pMsg->rpcMsg.pCont; @@ -2950,8 +2977,6 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { } } - char* msg = (char*) pMultiMeta + pMultiMeta->contLen; - // add the additional super table names that needs the vgroup info for(;t < num; ++t) { taosArrayPush(pList, &nameList[t]); @@ -2961,6 +2986,13 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { int32_t numOfVgroupList = (int32_t) taosArrayGetSize(pList); pMultiMeta->numOfVgroup = htonl(numOfVgroupList); + pMultiMeta = ensureMsgBufferSpace(pMultiMeta, pList, &totalMallocLen, numOfVgroupList); + if (pMultiMeta == NULL) { + code = TSDB_CODE_MND_OUT_OF_MEMORY; + goto _end; + } + + char* msg = (char*) pMultiMeta + pMultiMeta->contLen; for(int32_t i = 0; i < numOfVgroupList; ++i) { char* name = taosArrayGetP(pList, i); diff --git a/src/query/inc/qAggMain.h b/src/query/inc/qAggMain.h index 57e7d2982f..044c538f47 100644 --- a/src/query/inc/qAggMain.h +++ b/src/query/inc/qAggMain.h @@ -204,7 +204,7 @@ typedef struct SAggFunctionInfo { bool (*init)(SQLFunctionCtx *pCtx); // setup the execute environment void (*xFunction)(SQLFunctionCtx *pCtx); // blocks version function - void (*xFunctionF)(SQLFunctionCtx *pCtx, int32_t position); // single-row function version, todo merge with blockwise function +// void (*xFunctionF)(SQLFunctionCtx *pCtx, int32_t position); // single-row function version, todo merge with blockwise function // finalizer must be called after all xFunction has been executed to generated final result. void (*xFinalize)(SQLFunctionCtx *pCtx); diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 955dd734cf..9cd1c5b033 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -295,7 +295,7 @@ enum OPERATOR_TYPE_E { OP_MultiTableAggregate = 14, OP_MultiTableTimeInterval = 15, OP_DummyInput = 16, //TODO remove it after fully refactor. - OP_MultiwayMergeSort = 17, // multi-way data merge into one input stream. + OP_MultiwayMergeSort = 17, // multi-way data merge into one input stream. OP_GlobalAggregate = 18, // global merge for the multi-way data sources. OP_Filter = 19, OP_Distinct = 20, diff --git a/src/query/inc/sql.y b/src/query/inc/sql.y index a54d46974a..ce2c3e3616 100644 --- a/src/query/inc/sql.y +++ b/src/query/inc/sql.y @@ -470,7 +470,7 @@ tagitem(A) ::= PLUS(X) FLOAT(Y). { //////////////////////// The SELECT statement ///////////////////////////////// %type select {SSqlNode*} %destructor select {destroySqlNode($$);} -select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) session_option(H) windowstate_option(D) fill_opt(F) sliding_opt(S) groupby_opt(P) orderby_opt(Z) having_opt(N) slimit_opt(G) limit_opt(L). { +select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) session_option(H) windowstate_option(D) fill_opt(F) sliding_opt(S) groupby_opt(P) having_opt(N) orderby_opt(Z) slimit_opt(G) limit_opt(L). { A = tSetQuerySqlNode(&T, W, X, Y, P, Z, &K, &H, &D, &S, F, &L, &G, N); } diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index bc14c75af5..8efc4aad4c 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -456,20 +456,6 @@ static void count_function(SQLFunctionCtx *pCtx) { SET_VAL(pCtx, numOfElem, 1); } -static void count_function_f(SQLFunctionCtx *pCtx, int32_t index) { - char *pData = GET_INPUT_DATA(pCtx, index); - if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { - return; - } - - SET_VAL(pCtx, 1, 1); - *((int64_t *)pCtx->pOutput) += pCtx->size; - - // do not need it actually - SResultRowCellInfo *pInfo = GET_RES_INFO(pCtx); - pInfo->hasResult = DATA_SET_FLAG; -} - static void count_func_merge(SQLFunctionCtx *pCtx) { int64_t *pData = (int64_t *)GET_INPUT_DATA_LIST(pCtx); for (int32_t i = 0; i < pCtx->size; ++i) { @@ -609,46 +595,6 @@ static void do_sum(SQLFunctionCtx *pCtx) { } } -static void do_sum_f(SQLFunctionCtx *pCtx, int32_t index) { - void *pData = GET_INPUT_DATA(pCtx, index); - if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { - return; - } - - SET_VAL(pCtx, 1, 1); - int64_t *res = (int64_t*) pCtx->pOutput; - - if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) { - *res += GET_INT8_VAL(pData); - } else if (pCtx->inputType == TSDB_DATA_TYPE_SMALLINT) { - *res += GET_INT16_VAL(pData); - } else if (pCtx->inputType == TSDB_DATA_TYPE_INT) { - *res += GET_INT32_VAL(pData); - } else if (pCtx->inputType == TSDB_DATA_TYPE_BIGINT) { - *res += GET_INT64_VAL(pData); - } else if (pCtx->inputType == TSDB_DATA_TYPE_UTINYINT) { - uint64_t *r = (uint64_t *)pCtx->pOutput; - *r += GET_UINT8_VAL(pData); - } else if (pCtx->inputType == TSDB_DATA_TYPE_USMALLINT) { - uint64_t *r = (uint64_t *)pCtx->pOutput; - *r += GET_UINT16_VAL(pData); - } else if (pCtx->inputType == TSDB_DATA_TYPE_UINT) { - uint64_t *r = (uint64_t *)pCtx->pOutput; - *r += GET_UINT32_VAL(pData); - } else if (pCtx->inputType == TSDB_DATA_TYPE_UBIGINT) { - uint64_t *r = (uint64_t *)pCtx->pOutput; - *r += GET_UINT64_VAL(pData); - } else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE) { - double *retVal = (double*) pCtx->pOutput; - *retVal += GET_DOUBLE_VAL(pData); - } else if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT) { - double *retVal = (double*) pCtx->pOutput; - *retVal += GET_FLOAT_VAL(pData); - } - - GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG; -} - static void sum_function(SQLFunctionCtx *pCtx) { do_sum(pCtx); @@ -661,17 +607,6 @@ static void sum_function(SQLFunctionCtx *pCtx) { } } -static void sum_function_f(SQLFunctionCtx *pCtx, int32_t index) { - do_sum_f(pCtx, index); - - // keep the result data in output buffer, not in the intermediate buffer - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - if (pResInfo->hasResult == DATA_SET_FLAG && pCtx->stableQuery) { - SSumInfo *pSum = (SSumInfo *)pCtx->pOutput; - pSum->hasResult = DATA_SET_FLAG; - } -} - static void sum_func_merge(SQLFunctionCtx *pCtx) { int32_t notNullElems = 0; @@ -847,53 +782,6 @@ static void avg_function(SQLFunctionCtx *pCtx) { } } -static void avg_function_f(SQLFunctionCtx *pCtx, int32_t index) { - void *pData = GET_INPUT_DATA(pCtx, index); - if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { - return; - } - - SET_VAL(pCtx, 1, 1); - - // NOTE: keep the intermediate result into the interResultBuf - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - - SAvgInfo *pAvgInfo = (SAvgInfo *)GET_ROWCELL_INTERBUF(pResInfo); - - if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) { - pAvgInfo->sum += GET_INT8_VAL(pData); - } else if (pCtx->inputType == TSDB_DATA_TYPE_SMALLINT) { - pAvgInfo->sum += GET_INT16_VAL(pData); - } else if (pCtx->inputType == TSDB_DATA_TYPE_INT) { - pAvgInfo->sum += GET_INT32_VAL(pData); - } else if (pCtx->inputType == TSDB_DATA_TYPE_BIGINT) { - pAvgInfo->sum += GET_INT64_VAL(pData); - } else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE) { - pAvgInfo->sum += GET_DOUBLE_VAL(pData); - } else if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT) { - pAvgInfo->sum += GET_FLOAT_VAL(pData); - } else if (pCtx->inputType == TSDB_DATA_TYPE_UTINYINT) { - pAvgInfo->sum += GET_UINT8_VAL(pData); - } else if (pCtx->inputType == TSDB_DATA_TYPE_USMALLINT) { - pAvgInfo->sum += GET_UINT16_VAL(pData); - } else if (pCtx->inputType == TSDB_DATA_TYPE_UINT) { - pAvgInfo->sum += GET_UINT32_VAL(pData); - } else if (pCtx->inputType == TSDB_DATA_TYPE_UBIGINT) { - pAvgInfo->sum += GET_UINT64_VAL(pData); - } - - // restore sum and count of elements - pAvgInfo->num += 1; - - // set has result flag - pResInfo->hasResult = DATA_SET_FLAG; - - // keep the data into the final output buffer for super table query since this execution may be the last one - if (pCtx->stableQuery) { - memcpy(pCtx->pOutput, GET_ROWCELL_INTERBUF(pResInfo), sizeof(SAvgInfo)); - } -} - static void avg_func_merge(SQLFunctionCtx *pCtx) { SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); @@ -1307,78 +1195,6 @@ static void max_func_merge(SQLFunctionCtx *pCtx) { } } -static void minMax_function_f(SQLFunctionCtx *pCtx, int32_t index, int32_t isMin) { - char *pData = GET_INPUT_DATA(pCtx, index); - TSKEY key = GET_TS_DATA(pCtx, index); - - int32_t num = 0; - if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) { - int8_t *output = (int8_t *)pCtx->pOutput; - int8_t i = GET_INT8_VAL(pData); - - UPDATE_DATA(pCtx, *output, i, num, isMin, key); - } else if (pCtx->inputType == TSDB_DATA_TYPE_SMALLINT) { - int16_t *output = (int16_t*) pCtx->pOutput; - int16_t i = GET_INT16_VAL(pData); - - UPDATE_DATA(pCtx, *output, i, num, isMin, key); - } else if (pCtx->inputType == TSDB_DATA_TYPE_INT) { - int32_t *output = (int32_t*) pCtx->pOutput; - int32_t i = GET_INT32_VAL(pData); - - UPDATE_DATA(pCtx, *output, i, num, isMin, key); - } else if (pCtx->inputType == TSDB_DATA_TYPE_BIGINT) { - int64_t *output = (int64_t*) pCtx->pOutput; - int64_t i = GET_INT64_VAL(pData); - - UPDATE_DATA(pCtx, *output, i, num, isMin, key); - } else if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT) { - float *output = (float*) pCtx->pOutput; - float i = GET_FLOAT_VAL(pData); - - UPDATE_DATA(pCtx, *output, i, num, isMin, key); - } else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE) { - double *output = (double*) pCtx->pOutput; - double i = GET_DOUBLE_VAL(pData); - - UPDATE_DATA(pCtx, *output, i, num, isMin, key); - } - - GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG; -} - -static void max_function_f(SQLFunctionCtx *pCtx, int32_t index) { - char *pData = GET_INPUT_DATA(pCtx, index); - if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { - return; - } - - SET_VAL(pCtx, 1, 1); - minMax_function_f(pCtx, index, 0); - - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - if (pResInfo->hasResult == DATA_SET_FLAG && pCtx->stableQuery) { - char *flag = pCtx->pOutput + pCtx->inputBytes; - *flag = DATA_SET_FLAG; - } -} - -static void min_function_f(SQLFunctionCtx *pCtx, int32_t index) { - char *pData = GET_INPUT_DATA(pCtx, index); - if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { - return; - } - - SET_VAL(pCtx, 1, 1); - minMax_function_f(pCtx, index, 1); - - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - if (pResInfo->hasResult == DATA_SET_FLAG && pCtx->stableQuery) { - char *flag = pCtx->pOutput + pCtx->inputBytes; - *flag = DATA_SET_FLAG; - } -} - #define LOOP_STDDEV_IMPL(type, r, d, ctx, delta, _type, num) \ for (int32_t i = 0; i < (ctx)->size; ++i) { \ if ((ctx)->hasNull && isNull((char *)&((type *)d)[i], (_type))) { \ @@ -1472,114 +1288,6 @@ static void stddev_function(SQLFunctionCtx *pCtx) { } } -static void stddev_function_f(SQLFunctionCtx *pCtx, int32_t index) { - // the second stage to calculate standard deviation - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - SStddevInfo *pStd = GET_ROWCELL_INTERBUF(pResInfo); - - if (pCtx->currentStage == REPEAT_SCAN && pStd->stage == 0) { - pStd->stage++; - avg_finalizer(pCtx); - - pResInfo->initialized = true; // set it initialized to avoid re-initialization - - // save average value into tmpBuf, for second stage scan - SAvgInfo *pAvg = GET_ROWCELL_INTERBUF(pResInfo); - - pStd->avg = GET_DOUBLE_VAL(pCtx->pOutput); - assert((isnan(pAvg->sum) && pAvg->num == 0) || (pStd->num == pAvg->num && pStd->avg == pAvg->sum)); - } - - /* the first stage is to calculate average value */ - if (pStd->stage == 0) { - avg_function_f(pCtx, index); - } else if (pStd->num > 0) { - double avg = pStd->avg; - void * pData = GET_INPUT_DATA(pCtx, index); - - if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { - return; - } - - switch (pCtx->inputType) { - case TSDB_DATA_TYPE_INT: { - pStd->res += POW2(GET_INT32_VAL(pData) - avg); - break; - } - case TSDB_DATA_TYPE_FLOAT: { - pStd->res += POW2(GET_FLOAT_VAL(pData) - avg); - break; - } - case TSDB_DATA_TYPE_DOUBLE: { - pStd->res += POW2(GET_DOUBLE_VAL(pData) - avg); - break; - } - case TSDB_DATA_TYPE_BIGINT: { - pStd->res += POW2(GET_INT64_VAL(pData) - avg); - break; - } - case TSDB_DATA_TYPE_SMALLINT: { - pStd->res += POW2(GET_INT16_VAL(pData) - avg); - break; - } - case TSDB_DATA_TYPE_TINYINT: { - pStd->res += POW2(GET_INT8_VAL(pData) - avg); - break; - } - case TSDB_DATA_TYPE_UINT: { - pStd->res += POW2(GET_UINT32_VAL(pData) - avg); - break; - } - case TSDB_DATA_TYPE_UBIGINT: { - pStd->res += POW2(GET_UINT64_VAL(pData) - avg); - break; - } - case TSDB_DATA_TYPE_USMALLINT: { - pStd->res += POW2(GET_UINT16_VAL(pData) - avg); - break; - } - case TSDB_DATA_TYPE_UTINYINT: { - pStd->res += POW2(GET_UINT8_VAL(pData) - avg); - break; - } - default: - qError("stddev function not support data type:%d", pCtx->inputType); - } - - SET_VAL(pCtx, 1, 1); - } -} - -static UNUSED_FUNC void stddev_next_step(SQLFunctionCtx *pCtx) { - /* - * the stddevInfo and the average info struct share the same buffer area - * And the position of each element in their struct is exactly the same matched - */ - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - SStddevInfo *pStd = GET_ROWCELL_INTERBUF(pResInfo); - - if (pStd->stage == 0) { - /* - * stddev is calculated in two stage: - * 1. get the average value of all data; - * 2. get final result, based on the average values; - * so, if this routine is in second stage, no further step is required - */ - pStd->stage++; - avg_finalizer(pCtx); - - pResInfo->initialized = true; // set it initialized to avoid re-initialization - - // save average value into tmpBuf, for second stage scan - SAvgInfo *pAvg = GET_ROWCELL_INTERBUF(pResInfo); - - pStd->avg = GET_DOUBLE_VAL(pCtx->pOutput); - assert((isnan(pAvg->sum) && pAvg->num == 0) || (pStd->num == pAvg->num && pStd->avg == pAvg->sum)); - } else { - pResInfo->complete = true; - } -} - static void stddev_finalizer(SQLFunctionCtx *pCtx) { SStddevInfo *pStd = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); @@ -1696,97 +1404,6 @@ static void stddev_dst_function(SQLFunctionCtx *pCtx) { memcpy(pCtx->pOutput, GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)), sizeof(SAvgInfo)); } -static void stddev_dst_function_f(SQLFunctionCtx *pCtx, int32_t index) { - void *pData = GET_INPUT_DATA(pCtx, index); - if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { - return; - } - - // the second stage to calculate standard deviation - SStddevdstInfo *pStd = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - double *retVal = &pStd->res; - - // all data are null, no need to proceed - SArray* resList = (SArray*) pCtx->param[0].pz; - if (resList == NULL) { - return; - } - - // find the correct group average results according to the tag value - int32_t len = (int32_t) taosArrayGetSize(resList); - assert(len > 0); - - double avg = 0; - if (len == 1) { - SResPair* p = taosArrayGet(resList, 0); - avg = p->avg; - } else { // todo opt performance by using iterator since the timestamp lsit is matched with the output result - SResPair* p = bsearch(&pCtx->startTs, resList->pData, len, sizeof(SResPair), tsCompare); - assert(p != NULL); - - avg = p->avg; - } - - int32_t num = 0; - switch (pCtx->inputType) { - case TSDB_DATA_TYPE_INT: { - for (int32_t i = 0; i < pCtx->size; ++i) { - if (pCtx->hasNull && isNull((const char*) (&((int32_t *)pData)[i]), pCtx->inputType)) { - continue; - } - num += 1; - *retVal += POW2(((int32_t *)pData)[i] - avg); - } - break; - } - case TSDB_DATA_TYPE_FLOAT: { - LOOP_STDDEV_IMPL(float, *retVal, pData, pCtx, avg, pCtx->inputType, num); - break; - } - case TSDB_DATA_TYPE_DOUBLE: { - LOOP_STDDEV_IMPL(double, *retVal, pData, pCtx, avg, pCtx->inputType, num); - break; - } - case TSDB_DATA_TYPE_TINYINT: { - LOOP_STDDEV_IMPL(int8_t, *retVal, pData, pCtx, avg, pCtx->inputType, num); - break; - } - case TSDB_DATA_TYPE_UTINYINT: { - LOOP_STDDEV_IMPL(int8_t, *retVal, pData, pCtx, avg, pCtx->inputType, num); - break; - } - case TSDB_DATA_TYPE_SMALLINT: { - LOOP_STDDEV_IMPL(int16_t, *retVal, pData, pCtx, avg, pCtx->inputType, num); - break; - } - case TSDB_DATA_TYPE_USMALLINT: { - LOOP_STDDEV_IMPL(uint16_t, *retVal, pData, pCtx, avg, pCtx->inputType, num); - break; - } - case TSDB_DATA_TYPE_UINT: { - LOOP_STDDEV_IMPL(uint32_t, *retVal, pData, pCtx, avg, pCtx->inputType, num); - break; - } - case TSDB_DATA_TYPE_BIGINT: { - LOOP_STDDEV_IMPL(int64_t, *retVal, pData, pCtx, avg, pCtx->inputType, num); - break; - } - case TSDB_DATA_TYPE_UBIGINT: { - LOOP_STDDEV_IMPL(uint64_t, *retVal, pData, pCtx, avg, pCtx->inputType, num); - break; - } - default: - qError("stddev function not support data type:%d", pCtx->inputType); - } - - pStd->num += num; - SET_VAL(pCtx, num, 1); - - // copy to the final output buffer for super table - memcpy(pCtx->pOutput, GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)), sizeof(SAvgInfo)); -} - - static void stddev_dst_merge(SQLFunctionCtx *pCtx) { SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SStddevdstInfo* pRes = GET_ROWCELL_INTERBUF(pResInfo); @@ -1833,7 +1450,7 @@ static bool first_last_function_setup(SQLFunctionCtx *pCtx) { // todo opt for null block static void first_function(SQLFunctionCtx *pCtx) { - if (pCtx->order == TSDB_ORDER_DESC /*|| pCtx->preAggVals.dataBlockLoaded == false*/) { + if (pCtx->order == TSDB_ORDER_DESC) { return; } @@ -1862,27 +1479,6 @@ static void first_function(SQLFunctionCtx *pCtx) { SET_VAL(pCtx, notNullElems, 1); } -static void first_function_f(SQLFunctionCtx *pCtx, int32_t index) { - if (pCtx->order == TSDB_ORDER_DESC) { - return; - } - - void *pData = GET_INPUT_DATA(pCtx, index); - if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { - return; - } - - SET_VAL(pCtx, 1, 1); - memcpy(pCtx->pOutput, pData, pCtx->inputBytes); - - TSKEY ts = GET_TS_DATA(pCtx, index); - DO_UPDATE_TAG_COLUMNS(pCtx, ts); - - SResultRowCellInfo *pInfo = GET_RES_INFO(pCtx); - pInfo->hasResult = DATA_SET_FLAG; - pInfo->complete = true; // get the first not-null data, completed -} - static void first_data_assign_impl(SQLFunctionCtx *pCtx, char *pData, int32_t index) { int64_t *timestamp = GET_TS_LIST(pCtx); @@ -1932,21 +1528,6 @@ static void first_dist_function(SQLFunctionCtx *pCtx) { SET_VAL(pCtx, notNullElems, 1); } -static void first_dist_function_f(SQLFunctionCtx *pCtx, int32_t index) { - char *pData = GET_INPUT_DATA(pCtx, index); - if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { - return; - } - - if (pCtx->order == TSDB_ORDER_DESC) { - return; - } - - first_data_assign_impl(pCtx, pData, index); - - SET_VAL(pCtx, 1, 1); -} - static void first_dist_func_merge(SQLFunctionCtx *pCtx) { assert(pCtx->stableQuery); @@ -1978,70 +1559,55 @@ static void first_dist_func_merge(SQLFunctionCtx *pCtx) { * least one data in this block that is not null.(TODO opt for this case) */ static void last_function(SQLFunctionCtx *pCtx) { - if (pCtx->order != pCtx->param[0].i64/* || pCtx->preAggVals.dataBlockLoaded == false*/) { - return; - } - - int32_t notNullElems = 0; - - for (int32_t i = pCtx->size - 1; i >= 0; --i) { - char *data = GET_INPUT_DATA(pCtx, i); - if (pCtx->hasNull && isNull(data, pCtx->inputType)) { - if (!pCtx->requireNull) { - continue; - } - } - - memcpy(pCtx->pOutput, data, pCtx->inputBytes); - - TSKEY ts = GET_TS_DATA(pCtx, i); - DO_UPDATE_TAG_COLUMNS(pCtx, ts); - - SResultRowCellInfo *pInfo = GET_RES_INFO(pCtx); - pInfo->hasResult = DATA_SET_FLAG; - - pInfo->complete = true; // set query completed on this column - notNullElems++; - break; - } - - SET_VAL(pCtx, notNullElems, 1); -} - -static void last_function_f(SQLFunctionCtx *pCtx, int32_t index) { - void *pData = GET_INPUT_DATA(pCtx, index); - if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { - return; - } - - // the scan order is not the required order, ignore it if (pCtx->order != pCtx->param[0].i64) { return; } + SResultRowCellInfo* pResInfo = GET_RES_INFO(pCtx); + + int32_t notNullElems = 0; if (pCtx->order == TSDB_ORDER_DESC) { - SET_VAL(pCtx, 1, 1); - memcpy(pCtx->pOutput, pData, pCtx->inputBytes); - TSKEY ts = GET_TS_DATA(pCtx, index); - DO_UPDATE_TAG_COLUMNS(pCtx, ts); + for (int32_t i = pCtx->size - 1; i >= 0; --i) { + char *data = GET_INPUT_DATA(pCtx, i); + if (pCtx->hasNull && isNull(data, pCtx->inputType) && (!pCtx->requireNull)) { + continue; + } - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - pResInfo->hasResult = DATA_SET_FLAG; - pResInfo->complete = true; // set query completed - } else { // in case of ascending order check, all data needs to be checked - SResultRowCellInfo* pResInfo = GET_RES_INFO(pCtx); - TSKEY ts = GET_TS_DATA(pCtx, index); + memcpy(pCtx->pOutput, data, pCtx->inputBytes); - char* buf = GET_ROWCELL_INTERBUF(pResInfo); - if (pResInfo->hasResult != DATA_SET_FLAG || (*(TSKEY*)buf) < ts) { - pResInfo->hasResult = DATA_SET_FLAG; - memcpy(pCtx->pOutput, pData, pCtx->inputBytes); - - *(TSKEY*)buf = ts; + TSKEY ts = GET_TS_DATA(pCtx, i); DO_UPDATE_TAG_COLUMNS(pCtx, ts); + + pResInfo->hasResult = DATA_SET_FLAG; + pResInfo->complete = true; // set query completed on this column + notNullElems++; + break; + } + } else { // ascending order + for (int32_t i = pCtx->size - 1; i >= 0; --i) { + char *data = GET_INPUT_DATA(pCtx, i); + if (pCtx->hasNull && isNull(data, pCtx->inputType) && (!pCtx->requireNull)) { + continue; + } + + TSKEY ts = GET_TS_DATA(pCtx, i); + + char* buf = GET_ROWCELL_INTERBUF(pResInfo); + if (pResInfo->hasResult != DATA_SET_FLAG || (*(TSKEY*)buf) < ts) { + pResInfo->hasResult = DATA_SET_FLAG; + memcpy(pCtx->pOutput, data, pCtx->inputBytes); + + *(TSKEY*)buf = ts; + DO_UPDATE_TAG_COLUMNS(pCtx, ts); + } + + notNullElems++; + break; } } + + SET_VAL(pCtx, notNullElems, 1); } static void last_data_assign_impl(SQLFunctionCtx *pCtx, char *pData, int32_t index) { @@ -2092,29 +1658,6 @@ static void last_dist_function(SQLFunctionCtx *pCtx) { SET_VAL(pCtx, notNullElems, 1); } -static void last_dist_function_f(SQLFunctionCtx *pCtx, int32_t index) { - if (pCtx->size == 0) { - return; - } - - char *pData = GET_INPUT_DATA(pCtx, index); - if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { - return; - } - - /* - * 1. for scan data in asc order, no need to check data - * 2. for data blocks that are not loaded, no need to check data - */ - if (pCtx->order != pCtx->param[0].i64) { - return; - } - - last_data_assign_impl(pCtx, pData, index); - - SET_VAL(pCtx, 1, 1); -} - /* * in the secondary merge(local reduce), the output is limited by the * final output size, so the main difference between last_dist_func_merge and second_merge @@ -2616,28 +2159,6 @@ static void top_function(SQLFunctionCtx *pCtx) { } } -static void top_function_f(SQLFunctionCtx *pCtx, int32_t index) { - char *pData = GET_INPUT_DATA(pCtx, index); - if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { - return; - } - - STopBotInfo *pRes = getTopBotOutputInfo(pCtx); - assert(pRes->num >= 0); - - if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotInfo) + POINTER_BYTES * pCtx->param[0].i64)) { - buildTopBotStruct(pRes, pCtx); - } - - SET_VAL(pCtx, 1, 1); - TSKEY ts = GET_TS_DATA(pCtx, index); - - do_top_function_add(pRes, (int32_t)pCtx->param[0].i64, pData, ts, pCtx->inputType, &pCtx->tagInfo, NULL, 0); - - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - pResInfo->hasResult = DATA_SET_FLAG; -} - static void top_func_merge(SQLFunctionCtx *pCtx) { STopBotInfo *pInput = (STopBotInfo *)GET_INPUT_DATA_LIST(pCtx); @@ -2695,27 +2216,6 @@ static void bottom_function(SQLFunctionCtx *pCtx) { } } -static void bottom_function_f(SQLFunctionCtx *pCtx, int32_t index) { - char *pData = GET_INPUT_DATA(pCtx, index); - TSKEY ts = GET_TS_DATA(pCtx, index); - - if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { - return; - } - - STopBotInfo *pRes = getTopBotOutputInfo(pCtx); - - if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotInfo) + POINTER_BYTES * pCtx->param[0].i64)) { - buildTopBotStruct(pRes, pCtx); - } - - SET_VAL(pCtx, 1, 1); - do_bottom_function_add(pRes, (int32_t)pCtx->param[0].i64, pData, ts, pCtx->inputType, &pCtx->tagInfo, NULL, 0); - - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - pResInfo->hasResult = DATA_SET_FLAG; -} - static void bottom_func_merge(SQLFunctionCtx *pCtx) { STopBotInfo *pInput = (STopBotInfo *)GET_INPUT_DATA_LIST(pCtx); @@ -2868,50 +2368,6 @@ static void percentile_function(SQLFunctionCtx *pCtx) { pResInfo->hasResult = DATA_SET_FLAG; } -static void percentile_function_f(SQLFunctionCtx *pCtx, int32_t index) { - void *pData = GET_INPUT_DATA(pCtx, index); - if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { - return; - } - - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - SPercentileInfo *pInfo = (SPercentileInfo *)GET_ROWCELL_INTERBUF(pResInfo); - - if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) { - pInfo->stage += 1; - - // all data are null, set it completed - if (pInfo->numOfElems == 0) { - pResInfo->complete = true; - - return; - } else { - pInfo->pMemBucket = tMemBucketCreate(pCtx->inputBytes, pCtx->inputType, pInfo->minval, pInfo->maxval); - } - } - - if (pInfo->stage == 0) { - double v = 0; - GET_TYPED_DATA(v, double, pCtx->inputType, pData); - - if (v < GET_DOUBLE_VAL(&pInfo->minval)) { - SET_DOUBLE_VAL(&pInfo->minval, v); - } - - if (v > GET_DOUBLE_VAL(&pInfo->maxval)) { - SET_DOUBLE_VAL(&pInfo->maxval, v); - } - - pInfo->numOfElems += 1; - return; - } - - tMemBucketPut(pInfo->pMemBucket, pData, 1); - - SET_VAL(pCtx, 1, 1); - pResInfo->hasResult = DATA_SET_FLAG; -} - static void percentile_finalizer(SQLFunctionCtx *pCtx) { double v = pCtx->param[0].nType == TSDB_DATA_TYPE_INT ? pCtx->param[0].i64 : pCtx->param[0].dKey; @@ -2930,24 +2386,6 @@ static void percentile_finalizer(SQLFunctionCtx *pCtx) { doFinalizer(pCtx); } -static UNUSED_FUNC void percentile_next_step(SQLFunctionCtx *pCtx) { - SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx); - SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); - - if (pInfo->stage == 0) { - // all data are null, set it completed - if (pInfo->numOfElems == 0) { - pResInfo->complete = true; - } else { - pInfo->pMemBucket = tMemBucketCreate(pCtx->inputBytes, pCtx->inputType, pInfo->minval, pInfo->maxval); - } - - pInfo->stage += 1; - } else { - pResInfo->complete = true; - } -} - ////////////////////////////////////////////////////////////////////////////////// static void buildHistogramInfo(SAPercentileInfo* pInfo) { pInfo->pHisto = (SHistogramInfo*) ((char*) pInfo + sizeof(SAPercentileInfo)); @@ -3012,24 +2450,6 @@ static void apercentile_function(SQLFunctionCtx *pCtx) { } } -static void apercentile_function_f(SQLFunctionCtx *pCtx, int32_t index) { - void *pData = GET_INPUT_DATA(pCtx, index); - if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { - return; - } - - SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx); - SAPercentileInfo *pInfo = getAPerctInfo(pCtx); - - double v = 0; - GET_TYPED_DATA(v, double, pCtx->inputType, pData); - - tHistogramAdd(&pInfo->pHisto, v); - - SET_VAL(pCtx, 1, 1); - pResInfo->hasResult = DATA_SET_FLAG; -} - static void apercentile_func_merge(SQLFunctionCtx *pCtx) { SAPercentileInfo *pInput = (SAPercentileInfo *)GET_INPUT_DATA_LIST(pCtx); @@ -3213,60 +2633,6 @@ static void leastsquares_function(SQLFunctionCtx *pCtx) { SET_VAL(pCtx, numOfElem, 1); } -static void leastsquares_function_f(SQLFunctionCtx *pCtx, int32_t index) { - void *pData = GET_INPUT_DATA(pCtx, index); - if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { - return; - } - - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - SLeastsquaresInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); - - double(*param)[3] = pInfo->mat; - - switch (pCtx->inputType) { - case TSDB_DATA_TYPE_INT: { - int32_t *p = pData; - LEASTSQR_CAL(param, pInfo->startVal, p, 0, pCtx->param[1].dKey); - break; - } - case TSDB_DATA_TYPE_TINYINT: { - int8_t *p = pData; - LEASTSQR_CAL(param, pInfo->startVal, p, 0, pCtx->param[1].dKey); - break; - } - case TSDB_DATA_TYPE_SMALLINT: { - int16_t *p = pData; - LEASTSQR_CAL(param, pInfo->startVal, p, 0, pCtx->param[1].dKey); - break; - } - case TSDB_DATA_TYPE_BIGINT: { - int64_t *p = pData; - LEASTSQR_CAL(param, pInfo->startVal, p, 0, pCtx->param[1].dKey); - break; - } - case TSDB_DATA_TYPE_FLOAT: { - float *p = pData; - LEASTSQR_CAL(param, pInfo->startVal, p, 0, pCtx->param[1].dKey); - break; - } - case TSDB_DATA_TYPE_DOUBLE: { - double *p = pData; - LEASTSQR_CAL(param, pInfo->startVal, p, 0, pCtx->param[1].dKey); - break; - } - default: - qError("error data type in leastsquare function:%d", pCtx->inputType); - }; - - SET_VAL(pCtx, 1, 1); - pInfo->num += 1; - - if (pInfo->num > 0) { - pResInfo->hasResult = DATA_SET_FLAG; - } -} - static void leastsquares_finalizer(SQLFunctionCtx *pCtx) { // no data in query SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx); @@ -3304,25 +2670,23 @@ static void date_col_output_function(SQLFunctionCtx *pCtx) { *(int64_t *)(pCtx->pOutput) = pCtx->startTs; } -static FORCE_INLINE void date_col_output_function_f(SQLFunctionCtx *pCtx, int32_t index) { - date_col_output_function(pCtx); -} - static void col_project_function(SQLFunctionCtx *pCtx) { // the number of output rows should not affect the final number of rows, so set it to be 0 if (pCtx->numOfParams == 2) { return; } + + // only one row is required. if (pCtx->param[0].i64 == 1) { SET_VAL(pCtx, pCtx->size, 1); } else { INC_INIT_VAL(pCtx, pCtx->size); } - char *pData = GET_INPUT_DATA_LIST(pCtx); if (pCtx->order == TSDB_ORDER_ASC) { - memcpy(pCtx->pOutput, pData, (size_t) pCtx->size * pCtx->inputBytes); + int32_t numOfRows = (pCtx->param[0].i64 == 1)? 1:pCtx->size; + memcpy(pCtx->pOutput, pData, (size_t) numOfRows * pCtx->inputBytes); } else { for(int32_t i = 0; i < pCtx->size; ++i) { memcpy(pCtx->pOutput + (pCtx->size - 1 - i) * pCtx->inputBytes, pData + i * pCtx->inputBytes, @@ -3331,22 +2695,6 @@ static void col_project_function(SQLFunctionCtx *pCtx) { } } -static void col_project_function_f(SQLFunctionCtx *pCtx, int32_t index) { - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - if (pCtx->numOfParams == 2) { // the number of output rows should not affect the final number of rows, so set it to be 0 - return; - } - - // only one output - if (pCtx->param[0].i64 == 1 && pResInfo->numOfRes >= 1) { - return; - } - - INC_INIT_VAL(pCtx, 1); - char *pData = GET_INPUT_DATA(pCtx, index); - memcpy(pCtx->pOutput, pData, pCtx->inputBytes); -} - /** * only used for tag projection query in select clause * @param pCtx @@ -3368,13 +2716,6 @@ static void tag_project_function(SQLFunctionCtx *pCtx) { } } -static void tag_project_function_f(SQLFunctionCtx *pCtx, int32_t index) { - INC_INIT_VAL(pCtx, 1); - - tVariantDump(&pCtx->tag, pCtx->pOutput, pCtx->tag.nType, true); - pCtx->pOutput += pCtx->outputBytes; -} - /** * used in group by clause. when applying group by tags, the tags value is * assign by using tag function. @@ -3393,11 +2734,6 @@ static void tag_function(SQLFunctionCtx *pCtx) { } } -static void tag_function_f(SQLFunctionCtx *pCtx, int32_t index) { - SET_VAL(pCtx, 1, 1); - tVariantDump(&pCtx->tag, pCtx->pOutput, pCtx->outputType, true); -} - static void copy_function(SQLFunctionCtx *pCtx) { SET_VAL(pCtx, pCtx->size, 1); @@ -3793,61 +3129,6 @@ static void diff_function(SQLFunctionCtx *pCtx) { } } -static void diff_function_f(SQLFunctionCtx *pCtx, int32_t index) { - char *pData = GET_INPUT_DATA(pCtx, index); - if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { - return; - } - - // the output start from the second source element - if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is set - GET_RES_INFO(pCtx)->numOfRes += 1; - } - - int32_t step = 1/*GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/; - - switch (pCtx->inputType) { - case TSDB_DATA_TYPE_INT: { - if (pCtx->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet - pCtx->param[1].nType = pCtx->inputType; - pCtx->param[1].i64 = *(int32_t *)pData; - } else { - *(int32_t *)pCtx->pOutput = *(int32_t *)pData - (int32_t)pCtx->param[1].i64; - pCtx->param[1].i64 = *(int32_t *)pData; - *(int64_t *)pCtx->ptsOutputBuf = GET_TS_DATA(pCtx, index); - } - break; - }; - case TSDB_DATA_TYPE_BIGINT: { - DIFF_IMPL(pCtx, pData, int64_t); - break; - }; - case TSDB_DATA_TYPE_DOUBLE: { - DIFF_IMPL(pCtx, pData, double); - break; - }; - case TSDB_DATA_TYPE_FLOAT: { - DIFF_IMPL(pCtx, pData, float); - break; - }; - case TSDB_DATA_TYPE_SMALLINT: { - DIFF_IMPL(pCtx, pData, int16_t); - break; - }; - case TSDB_DATA_TYPE_TINYINT: { - DIFF_IMPL(pCtx, pData, int8_t); - break; - }; - default: - qError("error input type"); - } - - if (GET_RES_INFO(pCtx)->numOfRes > 0) { - pCtx->pOutput += pCtx->outputBytes * step; - pCtx->ptsOutputBuf = (char *)pCtx->ptsOutputBuf + TSDB_KEYSIZE * step; - } -} - char *getArithColumnData(void *param, const char* name, int32_t colId) { SArithmeticSupport *pSupport = (SArithmeticSupport *)param; @@ -3870,16 +3151,6 @@ static void arithmetic_function(SQLFunctionCtx *pCtx) { arithmeticTreeTraverse(sas->pExprInfo->pExpr, pCtx->size, pCtx->pOutput, sas, pCtx->order, getArithColumnData); } -static void arithmetic_function_f(SQLFunctionCtx *pCtx, int32_t index) { - INC_INIT_VAL(pCtx, 1); - SArithmeticSupport *sas = (SArithmeticSupport *)pCtx->param[1].pz; - - sas->offset = index; - arithmeticTreeTraverse(sas->pExprInfo->pExpr, 1, pCtx->pOutput, sas, pCtx->order, getArithColumnData); - - pCtx->pOutput += pCtx->outputBytes; -} - #define LIST_MINMAX_N(ctx, minOutput, maxOutput, elemCnt, data, type, tsdbType, numOfNotNullElem) \ { \ type *inputData = (type *)data; \ @@ -3998,49 +3269,6 @@ static void spread_function(SQLFunctionCtx *pCtx) { } } -static void spread_function_f(SQLFunctionCtx *pCtx, int32_t index) { - void *pData = GET_INPUT_DATA(pCtx, index); - if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { - return; - } - - SET_VAL(pCtx, 1, 1); - - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - SSpreadInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); - - double val = 0.0; - if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) { - val = GET_INT8_VAL(pData); - } else if (pCtx->inputType == TSDB_DATA_TYPE_SMALLINT) { - val = GET_INT16_VAL(pData); - } else if (pCtx->inputType == TSDB_DATA_TYPE_INT) { - val = GET_INT32_VAL(pData); - } else if (pCtx->inputType == TSDB_DATA_TYPE_BIGINT || pCtx->inputType == TSDB_DATA_TYPE_TIMESTAMP) { - val = (double)(GET_INT64_VAL(pData)); - } else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE) { - val = GET_DOUBLE_VAL(pData); - } else if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT) { - val = GET_FLOAT_VAL(pData); - } - - // keep the result data in output buffer, not in the intermediate buffer - if (val > pInfo->max) { - pInfo->max = val; - } - - if (val < pInfo->min) { - pInfo->min = val; - } - - pResInfo->hasResult = DATA_SET_FLAG; - pInfo->hasResult = DATA_SET_FLAG; - - if (pCtx->stableQuery) { - memcpy(pCtx->pOutput, GET_ROWCELL_INTERBUF(pResInfo), sizeof(SSpreadInfo)); - } -} - /* * here we set the result value back to the intermediate buffer, to apply the finalize the function * the final result is generated in spread_function_finalizer @@ -4393,26 +3621,6 @@ static void twa_function(SQLFunctionCtx *pCtx) { } } -static void twa_function_f(SQLFunctionCtx *pCtx, int32_t index) { - void *pData = GET_INPUT_DATA(pCtx, index); - if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { - return; - } - - int32_t notNullElems = twa_function_impl(pCtx, index, 1); - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - - SET_VAL(pCtx, notNullElems, 1); - - if (notNullElems > 0) { - pResInfo->hasResult = DATA_SET_FLAG; - } - - if (pCtx->stableQuery) { - memcpy(pCtx->pOutput, GET_ROWCELL_INTERBUF(pResInfo), sizeof(STwaInfo)); - } -} - /* * To copy the input to interResBuf to avoid the input buffer space be over writen * by next input data. The TWA function only applies to each table, so no merge procedure @@ -4590,23 +3798,6 @@ static void ts_comp_function(SQLFunctionCtx *pCtx) { pResInfo->hasResult = DATA_SET_FLAG; } -static void ts_comp_function_f(SQLFunctionCtx *pCtx, int32_t index) { - void *pData = GET_INPUT_DATA(pCtx, index); - if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { - return; - } - - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - STSCompInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); - - STSBuf *pTSbuf = pInfo->pTSBuf; - - tsBufAppend(pTSbuf, (int32_t)pCtx->param[0].i64, &pCtx->tag, pData, TSDB_KEYSIZE); - SET_VAL(pCtx, pCtx->size, 1); - - pResInfo->hasResult = DATA_SET_FLAG; -} - static void ts_comp_finalize(SQLFunctionCtx *pCtx) { SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); @@ -4736,46 +3927,6 @@ static void rate_function(SQLFunctionCtx *pCtx) { } } -static void rate_function_f(SQLFunctionCtx *pCtx, int32_t index) { - void *pData = GET_INPUT_DATA(pCtx, index); - if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { - return; - } - - // NOTE: keep the intermediate result into the interResultBuf - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo); - TSKEY *primaryKey = GET_TS_LIST(pCtx); - - double v = 0; - GET_TYPED_DATA(v, double, pCtx->inputType, pData); - - if ((INT64_MIN == pRateInfo->firstValue) || (INT64_MIN == pRateInfo->firstKey)) { - pRateInfo->firstValue = v; - pRateInfo->firstKey = primaryKey[index]; - } - - if (INT64_MIN == pRateInfo->lastValue) { - pRateInfo->lastValue = v; - } else if (v < pRateInfo->lastValue) { - pRateInfo->correctionValue += pRateInfo->lastValue; - } - - pRateInfo->lastValue = v; - pRateInfo->lastKey = primaryKey[index]; - - SET_VAL(pCtx, 1, 1); - - // set has result flag - pRateInfo->hasResult = DATA_SET_FLAG; - pResInfo->hasResult = DATA_SET_FLAG; - - // keep the data into the final output buffer for super table query since this execution may be the last one - if (pCtx->stableQuery) { - memcpy(pCtx->pOutput, GET_ROWCELL_INTERBUF(pResInfo), sizeof(SRateInfo)); - } -} - static void rate_func_copy(SQLFunctionCtx *pCtx) { assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); @@ -4793,7 +3944,7 @@ static void rate_finalizer(SQLFunctionCtx *pCtx) { return; } - *(double*) pCtx->pOutput = do_calc_rate(pRateInfo, TSDB_TICK_PER_SECOND(pCtx->param[0].i64)); + *(double*) pCtx->pOutput = do_calc_rate(pRateInfo, (double) TSDB_TICK_PER_SECOND(pCtx->param[0].i64)); // cannot set the numOfIteratedElems again since it is set during previous iteration pResInfo->numOfRes = 1; @@ -4846,39 +3997,6 @@ static void irate_function(SQLFunctionCtx *pCtx) { } } -static void irate_function_f(SQLFunctionCtx *pCtx, int32_t index) { - void *pData = GET_INPUT_DATA(pCtx, index); - if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { - return; - } - - // NOTE: keep the intermediate result into the interResultBuf - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo); - TSKEY *primaryKey = GET_TS_LIST(pCtx); - - double v = 0; - GET_TYPED_DATA(v, double, pCtx->inputType, pData); - - pRateInfo->firstKey = pRateInfo->lastKey; - pRateInfo->firstValue = pRateInfo->lastValue; - - pRateInfo->lastValue = v; - pRateInfo->lastKey = primaryKey[index]; - -// qDebug("====%p irate_function_f() index:%d lastValue:%" PRId64 " lastKey:%" PRId64 " firstValue:%" PRId64 " firstKey:%" PRId64, pCtx, index, pRateInfo->lastValue, pRateInfo->lastKey, pRateInfo->firstValue , pRateInfo->firstKey); - SET_VAL(pCtx, 1, 1); - - // set has result flag - pRateInfo->hasResult = DATA_SET_FLAG; - pResInfo->hasResult = DATA_SET_FLAG; - - // keep the data into the final output buffer for super table query since this execution may be the last one - if (pCtx->stableQuery) { - memcpy(pCtx->pOutput, GET_ROWCELL_INTERBUF(pResInfo), sizeof(SRateInfo)); - } -} - void blockInfo_func(SQLFunctionCtx* pCtx) { SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); STableBlockDist* pDist = (STableBlockDist*) GET_ROWCELL_INTERBUF(pResInfo); @@ -5047,8 +4165,7 @@ void blockinfo_func_finalizer(SQLFunctionCtx* pCtx) { * function compatible list. * tag and ts are not involved in the compatibility check * - * 1. functions that are not simultaneously present with any other functions. e.g., - * diff/ts_z/top/bottom + * 1. functions that are not simultaneously present with any other functions. e.g., diff/ts_z/top/bottom * 2. functions that are only allowed to be present only with same functions. e.g., last_row, interp * 3. functions that are allowed to be present with other functions. * e.g., count/sum/avg/min/max/stddev/percentile/apercentile/first/last... @@ -5062,7 +4179,7 @@ int32_t functionCompatList[] = { // tag, colprj, tagprj, arithmetic, diff, first_dist, last_dist, stddev_dst, interp rate irate 1, 1, 1, 1, -1, 1, 1, 1, 5, 1, 1, // tid_tag, blk_info - 6, 7 + 6, 7 }; SAggFunctionInfo aAggs[] = {{ @@ -5073,7 +4190,6 @@ SAggFunctionInfo aAggs[] = {{ TSDB_BASE_FUNC_SO, function_setup, count_function, - count_function_f, doFinalizer, count_func_merge, countRequired, @@ -5086,7 +4202,6 @@ SAggFunctionInfo aAggs[] = {{ TSDB_BASE_FUNC_SO, function_setup, sum_function, - sum_function_f, function_finalizer, sum_func_merge, statisRequired, @@ -5099,7 +4214,6 @@ SAggFunctionInfo aAggs[] = {{ TSDB_BASE_FUNC_SO, function_setup, avg_function, - avg_function_f, avg_finalizer, avg_func_merge, statisRequired, @@ -5112,7 +4226,6 @@ SAggFunctionInfo aAggs[] = {{ TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY, min_func_setup, min_function, - min_function_f, function_finalizer, min_func_merge, statisRequired, @@ -5125,7 +4238,6 @@ SAggFunctionInfo aAggs[] = {{ TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY, max_func_setup, max_function, - max_function_f, function_finalizer, max_func_merge, statisRequired, @@ -5138,7 +4250,6 @@ SAggFunctionInfo aAggs[] = {{ TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_OF, function_setup, stddev_function, - stddev_function_f, stddev_finalizer, noop1, dataBlockRequired, @@ -5151,7 +4262,6 @@ SAggFunctionInfo aAggs[] = {{ TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_OF, percentile_function_setup, percentile_function, - percentile_function_f, percentile_finalizer, noop1, dataBlockRequired, @@ -5164,7 +4274,6 @@ SAggFunctionInfo aAggs[] = {{ TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_STABLE, apercentile_function_setup, apercentile_function, - apercentile_function_f, apercentile_finalizer, apercentile_func_merge, dataBlockRequired, @@ -5177,7 +4286,6 @@ SAggFunctionInfo aAggs[] = {{ TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY, function_setup, first_function, - first_function_f, function_finalizer, noop1, firstFuncRequired, @@ -5190,7 +4298,6 @@ SAggFunctionInfo aAggs[] = {{ TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY, function_setup, last_function, - last_function_f, function_finalizer, noop1, lastFuncRequired, @@ -5204,7 +4311,6 @@ SAggFunctionInfo aAggs[] = {{ TSDB_FUNCSTATE_SELECTIVITY, first_last_function_setup, last_row_function, - noop2, last_row_finalizer, last_dist_func_merge, dataBlockRequired, @@ -5218,7 +4324,6 @@ SAggFunctionInfo aAggs[] = {{ TSDB_FUNCSTATE_SELECTIVITY, top_bottom_function_setup, top_function, - top_function_f, top_bottom_func_finalizer, top_func_merge, dataBlockRequired, @@ -5232,7 +4337,6 @@ SAggFunctionInfo aAggs[] = {{ TSDB_FUNCSTATE_SELECTIVITY, top_bottom_function_setup, bottom_function, - bottom_function_f, top_bottom_func_finalizer, bottom_func_merge, dataBlockRequired, @@ -5245,7 +4349,6 @@ SAggFunctionInfo aAggs[] = {{ TSDB_BASE_FUNC_SO, spread_function_setup, spread_function, - spread_function_f, spread_function_finalizer, spread_func_merge, countRequired, @@ -5258,7 +4361,6 @@ SAggFunctionInfo aAggs[] = {{ TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, twa_function_setup, twa_function, - twa_function_f, twa_function_finalizer, twa_function_copy, dataBlockRequired, @@ -5271,7 +4373,6 @@ SAggFunctionInfo aAggs[] = {{ TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_OF, leastsquares_function_setup, leastsquares_function, - leastsquares_function_f, leastsquares_finalizer, noop1, dataBlockRequired, @@ -5284,7 +4385,6 @@ SAggFunctionInfo aAggs[] = {{ TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, function_setup, date_col_output_function, - date_col_output_function_f, doFinalizer, copy_function, noDataRequired, @@ -5297,7 +4397,6 @@ SAggFunctionInfo aAggs[] = {{ TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, function_setup, noop1, - noop2, doFinalizer, copy_function, dataBlockRequired, @@ -5310,7 +4409,6 @@ SAggFunctionInfo aAggs[] = {{ TSDB_BASE_FUNC_SO, function_setup, tag_function, - noop2, doFinalizer, copy_function, noDataRequired, @@ -5323,7 +4421,6 @@ SAggFunctionInfo aAggs[] = {{ TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_NEED_TS, ts_comp_function_setup, ts_comp_function, - ts_comp_function_f, ts_comp_finalize, copy_function, dataBlockRequired, @@ -5336,7 +4433,6 @@ SAggFunctionInfo aAggs[] = {{ TSDB_BASE_FUNC_SO, function_setup, tag_function, - tag_function_f, doFinalizer, copy_function, noDataRequired, @@ -5349,7 +4445,6 @@ SAggFunctionInfo aAggs[] = {{ TSDB_BASE_FUNC_MO | TSDB_FUNCSTATE_NEED_TS, function_setup, col_project_function, - col_project_function_f, doFinalizer, copy_function, dataBlockRequired, @@ -5362,7 +4457,6 @@ SAggFunctionInfo aAggs[] = {{ TSDB_BASE_FUNC_MO, function_setup, tag_project_function, - tag_project_function_f, doFinalizer, copy_function, noDataRequired, @@ -5375,7 +4469,6 @@ SAggFunctionInfo aAggs[] = {{ TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS, function_setup, arithmetic_function, - arithmetic_function_f, doFinalizer, copy_function, dataBlockRequired, @@ -5388,7 +4481,6 @@ SAggFunctionInfo aAggs[] = {{ TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY, diff_function_setup, diff_function, - diff_function_f, doFinalizer, noop1, dataBlockRequired, @@ -5402,7 +4494,6 @@ SAggFunctionInfo aAggs[] = {{ TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY, first_last_function_setup, first_dist_function, - first_dist_function_f, function_finalizer, first_dist_func_merge, firstDistFuncRequired, @@ -5415,7 +4506,6 @@ SAggFunctionInfo aAggs[] = {{ TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY, first_last_function_setup, last_dist_function, - last_dist_function_f, function_finalizer, last_dist_func_merge, lastDistFuncRequired, @@ -5428,7 +4518,6 @@ SAggFunctionInfo aAggs[] = {{ TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STABLE, function_setup, stddev_dst_function, - stddev_dst_function_f, stddev_dst_finalizer, stddev_dst_merge, dataBlockRequired, @@ -5441,7 +4530,6 @@ SAggFunctionInfo aAggs[] = {{ TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS , function_setup, interp_function, - do_sum_f, // todo filter handle doFinalizer, copy_function, dataBlockRequired, @@ -5454,7 +4542,6 @@ SAggFunctionInfo aAggs[] = {{ TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, rate_function_setup, rate_function, - rate_function_f, rate_finalizer, rate_func_copy, dataBlockRequired, @@ -5467,7 +4554,6 @@ SAggFunctionInfo aAggs[] = {{ TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, rate_function_setup, irate_function, - irate_function_f, rate_finalizer, rate_func_copy, dataBlockRequired, @@ -5480,7 +4566,6 @@ SAggFunctionInfo aAggs[] = {{ TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE, function_setup, noop1, - noop2, noop1, noop1, dataBlockRequired, @@ -5492,7 +4577,6 @@ SAggFunctionInfo aAggs[] = {{ TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY, deriv_function_setup, deriv_function, - noop2, doFinalizer, noop1, dataBlockRequired, @@ -5505,7 +4589,6 @@ SAggFunctionInfo aAggs[] = {{ TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STABLE, function_setup, blockInfo_func, - noop2, blockinfo_func_finalizer, block_func_merge, dataBlockRequired, diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 69b17504d3..97a6cf807c 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -161,7 +161,7 @@ static void setResultOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pResul int32_t numOfCols, int32_t* rowCellInfoOffset); void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset); -static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId); +static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx); static void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColIndex* pColIndex); @@ -309,7 +309,7 @@ static bool isProjQuery(SQueryAttr *pQueryAttr) { return true; } -static bool hasNullRv(SColIndex* pColIndex, SDataStatis *pStatis) { +static bool hasNull(SColIndex* pColIndex, SDataStatis *pStatis) { if (TSDB_COL_IS_TAG(pColIndex->flag) || TSDB_COL_IS_UD_COL(pColIndex->flag) || pColIndex->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { return false; } @@ -708,12 +708,13 @@ static int32_t getNumOfRowsInTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, SDataBloc static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput) { SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; - bool hasPrev = pCtx[0].preAggVals.isSet; + bool hasAggregates = pCtx[0].preAggVals.isSet; for (int32_t k = 0; k < numOfOutput; ++k) { - pCtx[k].size = forwardStep; + pCtx[k].size = forwardStep; pCtx[k].startTs = pWin->skey; + // keep it temprarily char* start = pCtx[k].pInput; int32_t pos = (QUERY_IS_ASC_QUERY(pQueryAttr)) ? offset : offset - (forwardStep - 1); @@ -725,20 +726,18 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx pCtx[k].ptsList = &tsCol[pos]; } - int32_t functionId = pCtx[k].functionId; - // not a whole block involved in query processing, statistics data can not be used // NOTE: the original value of isSet have been changed here if (pCtx[k].preAggVals.isSet && forwardStep < numOfTotal) { pCtx[k].preAggVals.isSet = false; } - if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { - aAggs[functionId].xFunction(&pCtx[k]); + if (functionNeedToExecute(pRuntimeEnv, &pCtx[k])) { + aAggs[pCtx[k].functionId].xFunction(&pCtx[k]); } // restore it - pCtx[k].preAggVals.isSet = hasPrev; + pCtx[k].preAggVals.isSet = hasAggregates; pCtx[k].pInput = start; } } @@ -847,9 +846,6 @@ static void setNotInterpoWindowKey(SQLFunctionCtx* pCtx, int32_t numOfOutput, in } } -// window start key interpolation - - static void saveDataBlockLastRow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pDataBlockInfo, SArray* pDataBlock, int32_t rowIndex) { if (pDataBlock == NULL) { @@ -975,10 +971,9 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; for (int32_t k = 0; k < pOperator->numOfOutput; ++k) { - int32_t functionId = pCtx[k].functionId; - if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { + if (functionNeedToExecute(pRuntimeEnv, &pCtx[k])) { pCtx[k].startTs = startTs;// this can be set during create the struct - aAggs[functionId].xFunction(&pCtx[k]); + aAggs[pCtx[k].functionId].xFunction(&pCtx[k]); } } } @@ -1287,6 +1282,15 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn return; } + int64_t* tsList = NULL; + SColumnInfoData* pFirstColData = taosArrayGet(pSDataBlock->pDataBlock, 0); + if (pFirstColData->info.type == TSDB_DATA_TYPE_TIMESTAMP) { + tsList = (int64_t*) pFirstColData->pData; + } + + STimeWindow w = TSWINDOW_INITIALIZER; + + int32_t num = 0; for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) { char* val = ((char*)pColInfoData->pData) + bytes * j; if (isNull(val, type)) { @@ -1294,33 +1298,59 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn } // Compare with the previous row of this column, and do not set the output buffer again if they are identical. - if (pInfo->prevData == NULL || (memcmp(pInfo->prevData, val, bytes) != 0)) { - if (pInfo->prevData == NULL) { - pInfo->prevData = malloc(bytes); - } - + if (pInfo->prevData == NULL) { + pInfo->prevData = malloc(bytes); memcpy(pInfo->prevData, val, bytes); + num++; + continue; + } - if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) { - setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, val, bytes); + if (IS_VAR_DATA_TYPE(type)) { + int32_t len = varDataLen(val); + if(len == varDataLen(pInfo->prevData) && memcmp(varDataVal(pInfo->prevData), varDataVal(val), len) == 0) { + num++; + continue; } - - int32_t ret = - setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, val, type, bytes, item->groupIndex); - if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); + } else { + if (memcmp(pInfo->prevData, val, bytes) == 0) { + num++; + continue; } } - - // todo opt perf - for (int32_t k = 0; k < pOperator->numOfOutput; ++k) { - pInfo->binfo.pCtx[k].size = 1; - int32_t functionId = pInfo->binfo.pCtx[k].functionId; - if (functionNeedToExecute(pRuntimeEnv, &pInfo->binfo.pCtx[k], functionId)) { - aAggs[functionId].xFunctionF(&pInfo->binfo.pCtx[k], j); - } + if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) { + setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, pInfo->prevData, + bytes); } + + int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, pInfo->prevData, type, bytes, + item->groupIndex); + if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code + longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); + } + + doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, &w, j - num, num, tsList, pSDataBlock->info.rows, pOperator->numOfOutput); + + num = 1; + memcpy(pInfo->prevData, val, bytes); + } + + if (num > 0) { + char* val = ((char*)pColInfoData->pData) + bytes * (pSDataBlock->info.rows - num); + memcpy(pInfo->prevData, val, bytes); + + if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) { + setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, val, + bytes); + } + + int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, val, type, bytes, + item->groupIndex); + if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code + longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); + } + + doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, &w, pSDataBlock->info.rows - num, num, tsList, pSDataBlock->info.rows, pOperator->numOfOutput); } } @@ -1394,9 +1424,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf } static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { - int64_t v = -1; - GET_TYPED_DATA(v, int64_t, type, pData); - if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { + if (IS_VAR_DATA_TYPE(type)) { if (pResultRow->key == NULL) { pResultRow->key = malloc(varDataTLen(pData)); varDataCopy(pResultRow->key, pData); @@ -1404,6 +1432,9 @@ static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { assert(memcmp(pResultRow->key, pData, varDataTLen(pData)) == 0); } } else { + int64_t v = -1; + GET_TYPED_DATA(v, int64_t, type, pData); + pResultRow->win.skey = v; pResultRow->win.ekey = v; } @@ -1419,7 +1450,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasic // not assign result buffer yet, add new result buffer, TODO remove it char* d = pData; int16_t len = bytes; - if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { + if (IS_VAR_DATA_TYPE(type)) { d = varDataVal(pData); len = varDataLen(pData); } @@ -1461,11 +1492,12 @@ static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pD return -1; } -static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId) { +static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx) { SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; // in case of timestamp column, always generated results. + int32_t functionId = pCtx->functionId; if (functionId == TSDB_FUNC_TS) { return true; } @@ -1505,7 +1537,7 @@ void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColInde pCtx->preAggVals.isSet = false; } - pCtx->hasNull = hasNullRv(pColIndex, pStatis); + pCtx->hasNull = hasNull(pColIndex, pStatis); // set the statistics data for primary time stamp column if (pCtx->functionId == TSDB_FUNC_SPREAD && pColIndex->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { @@ -3478,6 +3510,7 @@ int32_t setTimestampListJoinInfo(SQueryRuntimeEnv* pRuntimeEnv, tVariant* pTag, return 0; } +// TODO refactor: this funciton should be merged with setparamForStableStddevColumnData function. void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExprInfo) { SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; @@ -4683,8 +4716,7 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo)); pInfo->resultRowFactor = - (int32_t)(GET_ROW_PARAM_FOR_MULTIOUTPUT(pRuntimeEnv->pQueryAttr, pRuntimeEnv->pQueryAttr->topBotQuery, - false)); + (int32_t)(GET_ROW_PARAM_FOR_MULTIOUTPUT(pRuntimeEnv->pQueryAttr, pRuntimeEnv->pQueryAttr->topBotQuery, false)); pRuntimeEnv->scanFlag = MERGE_STAGE; // TODO init when creating pCtx @@ -5256,6 +5288,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList, pSDataBlock->info.rows, pOperator->numOfOutput); } + static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { @@ -6268,7 +6301,7 @@ static bool validateQueryMsg(SQueryTableMsg *pQueryMsg) { return true; } -static UNUSED_FUNC bool validateQueryTableCols(SQueriedTableInfo* pTableInfo, SSqlExpr** pExpr, int32_t numOfOutput, +static bool validateQueryTableCols(SQueriedTableInfo* pTableInfo, SSqlExpr** pExpr, int32_t numOfOutput, SColumnInfo* pTagCols, void* pMsg) { int32_t numOfTotal = pTableInfo->numOfCols + pTableInfo->numOfTags; if (pTableInfo->numOfCols < 0 || pTableInfo->numOfTags < 0 || numOfTotal > TSDB_MAX_COLUMNS) { @@ -6453,6 +6486,7 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { pExprMsg->resType = htons(pExprMsg->resType); pExprMsg->resBytes = htons(pExprMsg->resBytes); + pExprMsg->interBytes = htonl(pExprMsg->interBytes); pExprMsg->functionId = htons(pExprMsg->functionId); pExprMsg->numOfParams = htons(pExprMsg->numOfParams); @@ -6660,41 +6694,41 @@ _cleanup: return code; } - int32_t cloneExprFilterInfo(SColumnFilterInfo **dst, SColumnFilterInfo* src, int32_t filterNum) { - if (filterNum <= 0) { - return TSDB_CODE_SUCCESS; - } - - *dst = calloc(filterNum, sizeof(*src)); - if (*dst == NULL) { - return TSDB_CODE_QRY_OUT_OF_MEMORY; - } - - memcpy(*dst, src, sizeof(*src) * filterNum); - - for (int32_t i = 0; i < filterNum; i++) { - if ((*dst)[i].filterstr && dst[i]->len > 0) { - void *pz = calloc(1, (size_t)(*dst)[i].len + 1); - - if (pz == NULL) { - if (i == 0) { - free(*dst); - } else { - freeColumnFilterInfo(*dst, i); - } - - return TSDB_CODE_QRY_OUT_OF_MEMORY; - } - - memcpy(pz, (void *)src->pz, (size_t)src->len + 1); - - (*dst)[i].pz = (int64_t)pz; - } - } - +int32_t cloneExprFilterInfo(SColumnFilterInfo **dst, SColumnFilterInfo* src, int32_t filterNum) { + if (filterNum <= 0) { return TSDB_CODE_SUCCESS; } + *dst = calloc(filterNum, sizeof(*src)); + if (*dst == NULL) { + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + + memcpy(*dst, src, sizeof(*src) * filterNum); + + for (int32_t i = 0; i < filterNum; i++) { + if ((*dst)[i].filterstr && dst[i]->len > 0) { + void *pz = calloc(1, (size_t)(*dst)[i].len + 1); + + if (pz == NULL) { + if (i == 0) { + free(*dst); + } else { + freeColumnFilterInfo(*dst, i); + } + + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + + memcpy(pz, (void *)src->pz, (size_t)src->len + 1); + + (*dst)[i].pz = (int64_t)pz; + } + } + + return TSDB_CODE_SUCCESS; +} + int32_t buildArithmeticExprFromMsg(SExprInfo *pExprInfo, void *pQueryMsg) { qDebug("qmsg:%p create arithmetic expr from binary", pQueryMsg); @@ -6753,8 +6787,8 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp for (int32_t i = 0; i < numOfOutput; ++i) { pExprs[i].base = *pExprMsg[i]; - memset(pExprs[i].base.param, 0, sizeof(tVariant) * tListLen(pExprs[i].base.param)); + memset(pExprs[i].base.param, 0, sizeof(tVariant) * tListLen(pExprs[i].base.param)); for (int32_t j = 0; j < pExprMsg[i]->numOfParams; ++j) { tVariantAssign(&pExprs[i].base.param[j], &pExprMsg[i]->param[j]); } @@ -6829,6 +6863,7 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp return TSDB_CODE_QRY_INVALID_MSG; } + // todo remove it if (getResultDataInfo(type, bytes, pExprs[i].base.functionId, param, &pExprs[i].base.resType, &pExprs[i].base.resBytes, &pExprs[i].base.interBytes, 0, isSuperTable) != TSDB_CODE_SUCCESS) { tfree(pExprs); diff --git a/tests/script/general/parser/groupby.sim b/tests/script/general/parser/groupby.sim index 124e76e85c..507431f536 100644 --- a/tests/script/general/parser/groupby.sim +++ b/tests/script/general/parser/groupby.sim @@ -654,53 +654,91 @@ if $data31 != @20-03-27 05:10:19.000@ then return -1 endi -#sql select irate(c) from st where t1="1" and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' interval(1m) sliding(15s) group by tbname,t1,t2; -#if $rows != 40 then -# return -1 -#endi -# -#if $data01 != 1.000000000 then -# return -1 -#endi -#if $data02 != t1 then -# return -1 -#endi -#if $data03 != 1 then -# return -1 -#endi -#if $data04 != 1 then -# return -1 -#endi -# -#if $data11 != 1.000000000 then -# return -1 -#endi -#if $data12 != t1 then -# return -1 -#endi -#if $data13 != 1 then -# return -1 -#endi -#if $data14 != 1 then -# return -1 -#endi -# -#sql select irate(c) from st where t1="1" and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' interval(1m) sliding(15s) group by tbname,t1,t2 limit 1; -#if $rows != 2 then -# return -1 -#endi -# -#if $data11 != 1.000000000 then -# return -1 -#endi -#if $data12 != t2 then -# return -1 -#endi -#if $data13 != 1 then -# return -1 -#endi -#if $data14 != 2 then -# return -1 -#endi +print ===============> +sql select stddev(c),c from st where t2=1 or t2=2 group by c; +if $rows != 4 then + return -1 +endi + +if $data00 != 0.000000000 then + return -1 +endi + +if $data01 != 1 then + return -1 +endi + +if $data10 != 0.000000000 then + return -1 +endi + +if $data11 != 2 then + return -1 +endi + +if $data20 != 0.000000000 then + return -1 +endi + +if $data21 != 3 then + return -1 +endi + +if $data30 != 0.000000000 then + return -1 +endi + +if $data31 != 4 then + return -1 +endi + +sql select irate(c) from st where t1="1" and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' interval(1m) sliding(15s) group by tbname,t1,t2; +if $rows != 40 then + return -1 +endi + +if $data01 != 1.000000000 then + return -1 +endi +if $data02 != t1 then + return -1 +endi +if $data03 != 1 then + return -1 +endi +if $data04 != 1 then + return -1 +endi + +if $data11 != 1.000000000 then + return -1 +endi +if $data12 != t1 then + return -1 +endi +if $data13 != 1 then + return -1 +endi +if $data14 != 1 then + return -1 +endi + +sql select irate(c) from st where t1="1" and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' interval(1m) sliding(15s) group by tbname,t1,t2 limit 1; +if $rows != 2 then + return -1 +endi + +if $data11 != 1.000000000 then + return -1 +endi +if $data12 != t2 then + return -1 +endi +if $data13 != 1 then + return -1 +endi +if $data14 != 2 then + return -1 +endi system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/general/parser/having.sim b/tests/script/general/parser/having.sim index ddafdd7329..a8d2102bef 100644 --- a/tests/script/general/parser/having.sim +++ b/tests/script/general/parser/having.sim @@ -1,6 +1,6 @@ system sh/stop_dnodes.sh system sh/deploy.sh -n dnode1 -i 1 -system sh/cfg.sh -n dnode1 -c walLevel -v 0 +system sh/cfg.sh -n dnode1 -c walLevel -v 1 system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 2 system sh/exec.sh -n dnode1 -s start diff --git a/tests/script/general/parser/testSuite.sim b/tests/script/general/parser/testSuite.sim index 545e19edec..5f71138966 100644 --- a/tests/script/general/parser/testSuite.sim +++ b/tests/script/general/parser/testSuite.sim @@ -63,4 +63,3 @@ run general/parser/between_and.sim run general/parser/last_cache.sim run general/parser/nestquery.sim run general/parser/precision_ns.sim -