diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 723e5bf9bc..f366277e27 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -69,8 +69,9 @@ typedef struct SJoinSubquerySupporter { int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks); -void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks); -void tscDestroyDataBlock(STableDataBlocks* pDataBlock); +void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks); +void tscDestroyDataBlock(STableDataBlocks* pDataBlock); +void tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf); SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes, uint32_t offset); @@ -133,9 +134,8 @@ SFieldSupInfo* tscFieldInfoInsert(SFieldInfo* pFieldInfo, int32_t index, TAOS_FI SFieldSupInfo* tscFieldInfoGetSupp(SFieldInfo* pFieldInfo, int32_t index); TAOS_FIELD* tscFieldInfoGetField(SFieldInfo* pFieldInfo, int32_t index); -void tscFieldInfoCalOffset(SQueryInfo* pQueryInfo); -void tscFieldInfoCopy(SFieldInfo* src, SFieldInfo* dst, const int32_t* indexList, int32_t size); -void tscFieldInfoCopyAll(SFieldInfo* dst, SFieldInfo* src); +void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo); +void tscFieldInfoCopy(SFieldInfo* dst, const SFieldInfo* src); void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo); int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index); @@ -235,11 +235,6 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex); int16_t tscGetJoinTagColIndexByUid(STagCond* pTagCond, uint64_t uid); -TAOS* taos_connect_a(char* ip, char* user, char* pass, char* db, uint16_t port, void (*fp)(void*, TAOS_RES*, int), - void* param, void** taos); - -void sortRemoveDuplicates(STableDataBlocks* dataBuf); - void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex); bool hasMoreVnodesToTry(SSqlObj *pSql); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 59cba3105f..89bed123c4 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -22,20 +22,28 @@ extern "C" { #include "os.h" -#include "qsqlparser.h" -#include "qsqltype.h" -#include "qtsbuf.h" #include "taos.h" #include "taosdef.h" #include "taosmsg.h" #include "tarray.h" #include "tglobal.h" -#include "tutil.h" #include "tsqlfunction.h" +#include "tutil.h" + +#include "qsqlparser.h" +#include "qsqltype.h" +#include "qtsbuf.h" #include "queryExecutor.h" // forward declaration struct SSqlInfo; +struct SLocalReducer; + +// data source from sql string or from file +enum { + DATA_FROM_SQL_STRING = 1, + DATA_FROM_DATA_FILE = 2, +}; typedef SCMSTableVgroupRspMsg SVgroupsInfo; @@ -47,20 +55,23 @@ typedef struct STableComInfo { } STableComInfo; typedef struct STableMeta { - //super table if it is created according to super table, otherwise, tableInfo is used - union { struct STableMeta* pSTable; STableComInfo tableInfo; }; - uint8_t tableType; - int16_t sversion; + // super table if it is created according to super table, otherwise, tableInfo is used + union { + struct STableMeta *pSTable; + STableComInfo tableInfo; + }; + uint8_t tableType; + int16_t sversion; SCMVgroupInfo vgroupInfo; - int32_t sid; // the index of one table in a virtual node - uint64_t uid; // unique id of a table - SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info + int32_t sid; // the index of one table in a virtual node + uint64_t uid; // unique id of a table + SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info } STableMeta; typedef struct STableMetaInfo { - STableMeta * pTableMeta; // table meta, cached in client side and acquried by name - SVgroupsInfo* vgroupList; - + STableMeta * pTableMeta; // table meta, cached in client side and acquried by name + SVgroupsInfo *vgroupList; + /* * 1. keep the vnode index during the multi-vnode super table projection query * 2. keep the vnode index for multi-vnode insertion @@ -73,16 +84,16 @@ typedef struct STableMetaInfo { /* the structure for sql function in select clause */ typedef struct SSqlExpr { - char aliasName[TSDB_COL_NAME_LEN]; // as aliasName - SColIndex colInfo; - int64_t uid; // refactor use the pointer - int16_t functionId; // function id in aAgg array - int16_t resType; // return value type - int16_t resBytes; // length of return value - int16_t interResBytes; // inter result buffer size - int16_t numOfParams; // argument value of each function - tVariant param[3]; // parameters are not more than 3 - int32_t offset; // sub result column value of arithmetic expression. + char aliasName[TSDB_COL_NAME_LEN]; // as aliasName + SColIndex colInfo; + int64_t uid; // refactor use the pointer + int16_t functionId; // function id in aAgg array + int16_t resType; // return value type + int16_t resBytes; // length of return value + int16_t interResBytes; // inter result buffer size + int16_t numOfParams; // argument value of each function + tVariant param[3]; // parameters are not more than 3 + int32_t offset; // sub result column value of arithmetic expression. } SSqlExpr; typedef struct SColumnIndex { @@ -92,14 +103,14 @@ typedef struct SColumnIndex { typedef struct SFieldSupInfo { bool visible; - SArithExprInfo* pArithExprInfo; - SSqlExpr* pSqlExpr; + SArithExprInfo *pArithExprInfo; + SSqlExpr * pSqlExpr; } SFieldSupInfo; typedef struct SFieldInfo { - int16_t numOfOutput; // number of column in result - SArray* pFields; // SArray - SArray* pSupportInfo; // SArray + int16_t numOfOutput; // number of column in result + SArray *pFields; // SArray + SArray *pSupportInfo; // SArray } SFieldInfo; typedef struct SColumn { @@ -108,11 +119,9 @@ typedef struct SColumn { SColumnFilterInfo *filterInfo; } SColumn; -struct SLocalReducer; - typedef struct SCond { uint64_t uid; - int32_t len; // length of tag query condition data + int32_t len; // length of tag query condition data char * cond; } SCond; @@ -139,7 +148,7 @@ typedef struct STagCond { SJoinInfo joinInfo; // for different table, the query condition must be seperated - SArray* pCond; + SArray *pCond; } STagCond; typedef struct SParamInfo { @@ -180,7 +189,7 @@ typedef struct STableDataBlocks { SParamInfo *params; } STableDataBlocks; -typedef struct SDataBlockList { // todo remove +typedef struct SDataBlockList { // todo remove uint32_t nSize; uint32_t nAlloc; STableDataBlocks **pData; @@ -196,9 +205,9 @@ typedef struct SQueryInfo { int64_t slidingTime; // sliding window in mseconds SSqlGroupbyExpr groupbyExpr; // group by tags info - SArray* colList; // SArray + SArray * colList; // SArray SFieldInfo fieldsInfo; - SArray* exprsInfo; // SArray + SArray * exprsInfo; // SArray SLimitVal limit; SLimitVal slimit; STagCond tagCond; @@ -215,19 +224,13 @@ typedef struct SQueryInfo { int64_t prjOffset; } SQueryInfo; -// data source from sql string or from file -enum { - DATA_FROM_SQL_STRING = 1, - DATA_FROM_DATA_FILE = 2, -}; - typedef struct { int command; uint8_t msgType; union { bool existsCheck; // check if the table exists or not - bool autoCreated; // if the table is missing, on-the-fly create it. during getmeterMeta + bool autoCreated; // if the table is missing, on-the-fly create it. during getmeterMeta int8_t dataSourceType; // load data from file or not }; @@ -244,11 +247,11 @@ typedef struct { int32_t payloadLen; SQueryInfo **pQueryInfo; int32_t numOfClause; - - SDataBlockList *pDataBlocks; // submit data blocks after parsing sql - char * curSql; // current sql, resume position of sql after parsing paused - void * pTableList; // referred table involved in sql - + + SDataBlockList *pDataBlocks; // submit data blocks after parsing sql + char * curSql; // current sql, resume position of sql after parsing paused + void * pTableList; // referred table involved in sql + // for parameter ('?') binding and batch processing int32_t batchSize; int32_t numOfParams; @@ -259,50 +262,48 @@ typedef struct SResRec { int numOfTotal; } SResRec; -struct STSBuf; - typedef struct { - int64_t numOfRows; // num of results in current retrieved - int64_t numOfTotal; // num of total results - int64_t numOfTotalInCurrentClause; // num of total result in current subclause - char * pRsp; - int32_t rspType; - int32_t rspLen; - uint64_t qhandle; - int64_t uid; - int64_t useconds; - int64_t offset; // offset value from vnode during projection query of stable - int32_t row; - int16_t numOfCols; - int16_t precision; - bool completed; - int32_t code; - int32_t numOfGroups; - SResRec * pGroupRec; - char * data; - void ** tsrow; - char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t) - SColumnIndex *pColumnIndex; + int64_t numOfRows; // num of results in current retrieved + int64_t numOfTotal; // num of total results + int64_t numOfTotalInCurrentClause; // num of total result in current subclause + char * pRsp; + int32_t rspType; + int32_t rspLen; + uint64_t qhandle; + int64_t uid; + int64_t useconds; + int64_t offset; // offset value from vnode during projection query of stable + int32_t row; + int16_t numOfCols; + int16_t precision; + bool completed; + int32_t code; + int32_t numOfGroups; + SResRec * pGroupRec; + char * data; + void ** tsrow; + char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t) + SColumnIndex * pColumnIndex; struct SLocalReducer *pLocalReducer; } SSqlRes; typedef struct STscObj { - void * signature; - void * pTimer; - char mgmtIp[TSDB_USER_LEN]; - uint16_t mgmtPort; - char user[TSDB_USER_LEN]; - char pass[TSDB_KEY_LEN]; - char acctId[TSDB_DB_NAME_LEN]; - char db[TSDB_TABLE_ID_LEN]; - char sversion[TSDB_VERSION_LEN]; - char writeAuth : 1; - char superAuth : 1; - struct SSqlObj *pSql; - struct SSqlObj *pHb; - struct SSqlObj *sqlList; + void * signature; + void * pTimer; + char mgmtIp[TSDB_USER_LEN]; + uint16_t mgmtPort; + char user[TSDB_USER_LEN]; + char pass[TSDB_KEY_LEN]; + char acctId[TSDB_DB_NAME_LEN]; + char db[TSDB_TABLE_ID_LEN]; + char sversion[TSDB_VERSION_LEN]; + char writeAuth : 1; + char superAuth : 1; + struct SSqlObj * pSql; + struct SSqlObj * pHb; + struct SSqlObj * sqlList; struct SSqlStream *streamList; - pthread_mutex_t mutex; + pthread_mutex_t mutex; } STscObj; typedef struct SSqlObj { @@ -310,23 +311,23 @@ typedef struct SSqlObj { STscObj *pTscObj; void (*fp)(); void (*fetchFp)(); - void * param; - uint32_t ip; - short vnode; - int64_t stime; - uint32_t queryId; - void * pStream; - void * pSubscription; - char * sqlstr; - char retry; - char maxRetry; - SRpcIpSet ipList; - char freed : 4; - char listed : 4; - tsem_t rspSem; - SSqlCmd cmd; - SSqlRes res; - uint8_t numOfSubs; + void * param; + uint32_t ip; + short vnode; + int64_t stime; + uint32_t queryId; + void * pStream; + void * pSubscription; + char * sqlstr; + char retry; + char maxRetry; + SRpcIpSet ipList; + char freed : 4; + char listed : 4; + tsem_t rspSem; + SSqlCmd cmd; + SSqlRes res; + uint8_t numOfSubs; struct SSqlObj **pSubs; struct SSqlObj * prev, *next; } SSqlObj; @@ -360,13 +361,10 @@ typedef struct SSqlStream { } SSqlStream; int32_t tscInitRpc(const char *user, const char *secret); +void tscInitMsgsFp(); -// tscSql API int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion); -void tscInitMsgsFp(); -extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); - void tscProcessMsgFromServer(SRpcMsg *rpcMsg); int tscProcessSql(SSqlObj *pSql); @@ -379,12 +377,8 @@ int tscProcessLocalCmd(SSqlObj *pSql); int tscCfgDynamicOptions(char *msg); int taos_retrieve(TAOS_RES *res); -/* - * transfer function for metric query in stream computing, the function need to be change - * before send query message to vnode - */ -int32_t tscTansformSQLFunctionForSTableQuery(SQueryInfo *pQueryInfo); -void tscRestoreSQLFunctionForMetricQuery(SQueryInfo *pQueryInfo); +int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo *pQueryInfo); +void tscRestoreSQLFuncForSTableQuery(SQueryInfo *pQueryInfo); int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo); void tscDestroyResPointerInfo(SSqlRes *pRes); @@ -414,10 +408,13 @@ void tscFreeSqlObj(SSqlObj *pObj); void tscCloseTscObj(STscObj *pObj); -void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen); +TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int), + void *param, void **taos); + +void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, void (*fp)(), void *param, const char *sqlstr, size_t sqlLen); void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql); -void tscKillMetricQuery(SSqlObj *pSql); +void tscKillSTableQuery(SSqlObj *pSql); void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen); bool tscIsUpdateQuery(STscObj *pObj); bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes); @@ -426,20 +423,22 @@ char *tscGetErrorMsgPayload(SSqlCmd *pCmd); int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql); -void tscQueueAsyncFreeResult(SSqlObj *pSql); -int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo); -char* tscGetResultColumnChr(SSqlRes* pRes, SQueryInfo* pQueryInfo, int32_t column); +void tscQueueAsyncFreeResult(SSqlObj *pSql); +int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo); +char * tscGetResultColumnChr(SSqlRes *pRes, SQueryInfo *pQueryInfo, int32_t column); -extern void * pVnodeConn; -extern void * pTscMgmtConn; -extern void * tscCacheHandle; -extern int slaveIndex; -extern void * tscTmr; -extern void * tscQhandle; -extern int tscKeepConn[]; -extern int tsInsertHeadSize; -extern int tscNumOfThreads; -extern SRpcIpSet tscMgmtIpSet; +extern void * pVnodeConn; +extern void * pTscMgmtConn; +extern void * tscCacheHandle; +extern int slaveIndex; +extern void * tscTmr; +extern void * tscQhandle; +extern int tscKeepConn[]; +extern int tsInsertHeadSize; +extern int tscNumOfThreads; +extern SRpcIpSet tscMgmtIpSet; + +extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int numOfRows); diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 7357da5d17..99d20de48b 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -488,7 +488,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { */ SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - tscTansformSQLFunctionForSTableQuery(pQueryInfo); + tscTansformSQLFuncForSTableQuery(pQueryInfo); tscIncStreamExecutionCount(pSql->pStream); } else { tscTrace("%p get tableMeta successfully", pSql); diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index 6482b00654..8272de7d16 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -296,7 +296,7 @@ static int32_t tscProcessDescribeTable(SSqlObj *pSql) { int32_t rowLen = tscBuildMeterSchemaResultFields(pSql, NUM_OF_DESCRIBE_TABLE_COLUMNS, TYPE_COLUMN_LENGTH, note_field_length); - tscFieldInfoCalOffset(pQueryInfo); + tscFieldInfoUpdateOffset(pQueryInfo); return tscSetValueToResObj(pSql, rowLen); } diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 254ad1cb9f..cab7e15023 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -613,7 +613,7 @@ static void tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, in } // data block is disordered, sort it in ascending order -void sortRemoveDuplicates(STableDataBlocks *dataBuf) { +void tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf) { SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData; // size is less than the total size, since duplicated rows may be removed yet. diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index c7be16c457..8b7cea356f 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1258,7 +1258,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel * transfer sql functions that need secondary merge into another format * in dealing with metric queries such as: count/first/last */ - tscTansformSQLFunctionForSTableQuery(pQueryInfo); + tscTansformSQLFuncForSTableQuery(pQueryInfo); if (hasUnsupportFunctionsForSTableQuery(pQueryInfo)) { return TSDB_CODE_INVALID_SQL; @@ -2242,7 +2242,7 @@ bool validateIpAddress(const char* ip, size_t size) { return ipAddr != INADDR_NONE; } -int32_t tscTansformSQLFunctionForSTableQuery(SQueryInfo* pQueryInfo) { +int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); if (pTableMetaInfo->pTableMeta == NULL || !UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { @@ -2282,7 +2282,7 @@ int32_t tscTansformSQLFunctionForSTableQuery(SQueryInfo* pQueryInfo) { } /* transfer the field-info back to original input format */ -void tscRestoreSQLFunctionForMetricQuery(SQueryInfo* pQueryInfo) { +void tscRestoreSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { return; @@ -2547,7 +2547,7 @@ void setColumnOffsetValueInResultset(SQueryInfo* pQueryInfo) { if (QUERY_IS_STABLE_QUERY(pQueryInfo->type)) { tscFieldInfoUpdateOffsetForInterResult(pQueryInfo); } else { - tscFieldInfoCalOffset(pQueryInfo); + tscFieldInfoUpdateOffset(pQueryInfo); } } @@ -2828,7 +2828,7 @@ static int32_t extractColumnFilterInfo(SQueryInfo* pQueryInfo, SColumnIndex* pIn const char* msg1 = "non binary column not support like operator"; const char* msg2 = "binary column not support this operator"; - SColumn* pColumn = tscColumnListInsert(pQueryInfo->colList, pIndex); + SColumn* pColumn = tscColumnListInsert(pQueryInfo->colList, pIndex); SColumnFilterInfo* pColFilter = NULL; /* diff --git a/src/client/src/tscSchemaUtil.c b/src/client/src/tscSchemaUtil.c index 8b1ea1f328..7ad3c08034 100644 --- a/src/client/src/tscSchemaUtil.c +++ b/src/client/src/tscSchemaUtil.c @@ -168,7 +168,7 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size pTableMeta->sid = pTableMetaMsg->sid; pTableMeta->uid = pTableMetaMsg->uid; - pTableMeta->vgroupInfo = pTableMetaMsg->vgroup; +// pTableMeta->vgroupInfo = pTableMetaMsg->vgroup; memcpy(pTableMeta->schema, pTableMetaMsg->schema, schemaSize); diff --git a/src/client/src/tscSecondaryMerge.c b/src/client/src/tscSecondaryMerge.c index 6631add297..bd2e6e9019 100644 --- a/src/client/src/tscSecondaryMerge.c +++ b/src/client/src/tscSecondaryMerge.c @@ -261,8 +261,8 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd pReducer->pCtx = (SQLFunctionCtx *)calloc(size, sizeof(SQLFunctionCtx)); pReducer->rowSize = pMemBuffer[0]->nElemSize; - tscRestoreSQLFunctionForMetricQuery(pQueryInfo); - tscFieldInfoCalOffset(pQueryInfo); + tscRestoreSQLFuncForSTableQuery(pQueryInfo); + tscFieldInfoUpdateOffset(pQueryInfo); if (pReducer->rowSize > pMemBuffer[0]->pageSize) { assert(false); // todo fixed row size is larger than the minimum page size; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index ec4d2a927f..332d56c1b1 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -458,7 +458,7 @@ int tscProcessSql(SSqlObj *pSql) { return doProcessSql(pSql); } -void tscKillMetricQuery(SSqlObj *pSql) { +void tscKillSTableQuery(SSqlObj *pSql) { SSqlCmd* pCmd = &pSql->cmd; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); @@ -2202,7 +2202,7 @@ int tscProcessShowRsp(SSqlObj *pSql) { pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes); } - tscFieldInfoCalOffset(pQueryInfo); + tscFieldInfoUpdateOffset(pQueryInfo); tfree(pTableMeta); return 0; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 6855ea8295..f70beab166 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -964,7 +964,7 @@ void taos_stop_query(TAOS_RES *res) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { - tscKillMetricQuery(pSql); + tscKillSTableQuery(pSql); return; } diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 4802bb293a..bae1c0f86a 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -86,7 +86,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) { if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; } - tscTansformSQLFunctionForSTableQuery(pQueryInfo); + tscTansformSQLFuncForSTableQuery(pQueryInfo); // failed to get meter/metric meta, retry in 10sec. if (code != TSDB_CODE_SUCCESS) { diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 46e859c221..00a4c8c448 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -305,7 +305,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond); pQueryInfo->exprsInfo = tscSqlExprCopy(pSupporter->exprsInfo, pSupporter->uid, false); - tscFieldInfoCopyAll(&pQueryInfo->fieldsInfo, &pSupporter->fieldsInfo); + tscFieldInfoCopy(&pQueryInfo->fieldsInfo, &pSupporter->fieldsInfo); pSupporter->fieldsInfo.numOfOutput = 0; @@ -321,7 +321,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); assert(pNew->numOfSubs == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1); - tscFieldInfoCalOffset(pNewQueryInfo); + tscFieldInfoUpdateOffset(pNewQueryInfo); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0); @@ -859,7 +859,7 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu tscColumnListAssign(pSupporter->colList, pNewQueryInfo->colList, 0); pSupporter->exprsInfo = tscSqlExprCopy(pNewQueryInfo->exprsInfo, pSupporter->uid, false); - tscFieldInfoCopyAll(&pSupporter->fieldsInfo, &pNewQueryInfo->fieldsInfo); + tscFieldInfoCopy(&pSupporter->fieldsInfo, &pNewQueryInfo->fieldsInfo); tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 7de2f0ce70..d0fbc4c373 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -475,6 +475,7 @@ SDataBlockList* tscCreateBlockArrayList() { if (pDataBlockArrayList == NULL) { return NULL; } + pDataBlockArrayList->nAlloc = DEFAULT_INITIAL_NUM_OF_BLOCK; pDataBlockArrayList->pData = calloc(1, POINTER_BYTES * pDataBlockArrayList->nAlloc); if (pDataBlockArrayList->pData == NULL) { @@ -716,7 +717,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi } SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData; - sortRemoveDuplicates(pOneTableBlock); + tscSortRemoveDataBlockDupRows(pOneTableBlock); char* e = (char*)pBlocks->data + pOneTableBlock->rowSize*(pBlocks->numOfRows-1); @@ -838,7 +839,7 @@ SFieldSupInfo* tscFieldInfoInsert(SFieldInfo* pFieldInfo, int32_t index, TAOS_FI return taosArrayInsert(pFieldInfo->pSupportInfo, index, &info); } -void tscFieldInfoCalOffset(SQueryInfo* pQueryInfo) { +void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo) { size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0); @@ -869,26 +870,7 @@ void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo) { } } -void tscFieldInfoCopy(SFieldInfo* src, SFieldInfo* dst, const int32_t* indexList, int32_t size) { - if (src == NULL) { - return; - } - - if (size <= 0) { - tscFieldInfoCopyAll(dst, src); - } else { // only copy the required column - for (int32_t i = 0; i < size; ++i) { - assert(indexList[i] >= 0 && indexList[i] <= src->numOfOutput); - TAOS_FIELD* p = taosArrayGet(src->pFields, indexList[i]); - SFieldSupInfo* pInfo = taosArrayGet(src->pSupportInfo, indexList[i]); - - SFieldSupInfo* pInfo1 = tscFieldInfoAppend(dst, p); - *pInfo1 = *pInfo; - } - } -} - -void tscFieldInfoCopyAll(SFieldInfo* dst, SFieldInfo* src) { +void tscFieldInfoCopy(SFieldInfo* dst, const SFieldInfo* src) { dst->numOfOutput = src->numOfOutput; taosArrayCopy(dst->pFields, src->pFields); @@ -1818,20 +1800,22 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void int32_t numOfOutput = tscSqlExprNumOfExprs(pNewQueryInfo); - if (numOfOutput > 0) { - int32_t* indexList = calloc(1, numOfOutput * sizeof(int32_t)); + if (numOfOutput > 0) { // todo refactor to extract method size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); - - for (int32_t i = 0, j = 0; i < numOfExprs; ++i) { + SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo; + + for (int32_t i = 0; i < numOfExprs; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); + if (pExpr->uid == uid) { - indexList[j++] = i; + TAOS_FIELD* p = tscFieldInfoGetField(pFieldInfo, i); + SFieldSupInfo* pInfo = tscFieldInfoGetSupp(pFieldInfo, i); + + SFieldSupInfo* pInfo1 = tscFieldInfoAppend(&pNewQueryInfo->fieldsInfo, p); + *pInfo1 = *pInfo; } } - tscFieldInfoCopy(&pQueryInfo->fieldsInfo, &pNewQueryInfo->fieldsInfo, indexList, numOfOutput); - free(indexList); - // make sure the the sqlExpr for each fields is correct // todo handle the agg arithmetic expression for(int32_t f = 0; f < pNewQueryInfo->fieldsInfo.numOfOutput; ++f) { @@ -1930,9 +1914,7 @@ bool tscIsUpdateQuery(STscObj* pObj) { SSqlCmd* pCmd = &pObj->pSql->cmd; return ((pCmd->command >= TSDB_SQL_INSERT && pCmd->command <= TSDB_SQL_DROP_DNODE) || - TSDB_SQL_USE_DB == pCmd->command) - ? 1 - : 0; + TSDB_SQL_USE_DB == pCmd->command); } int32_t tscInvalidSQLErrMsg(char* msg, const char* additionalInfo, const char* sql) { diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index caaae9e988..3b5784986e 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -364,7 +364,7 @@ static void mgmtAddTableIntoStable(SSuperTableObj *pStable, SChildTableObj *pCta bool find = false; int32_t pos = 0; - for (int pos = 0; pos < pStable->vgLen; ++pos) { + for (pos = 0; pos < pStable->vgLen; ++pos) { if (pStable->vgList[pos] == 0) break; if (pStable->vgList[pos] == pCtable->vgId) { find = true; @@ -1134,7 +1134,7 @@ void mgmtDropAllSuperTables(SDbObj *pDropDb) { static int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable) { int32_t numOfCols = pTable->numOfColumns + pTable->numOfTags; for (int32_t i = 0; i < numOfCols; ++i) { - strcpy(pSchema->name, pTable->schema[i].name); + strncpy(pSchema->name, pTable->schema[i].name, TSDB_TABLE_ID_LEN); pSchema->type = pTable->schema[i].type; pSchema->bytes = htons(pTable->schema[i].bytes); pSchema->colId = htons(pTable->schema[i].colId); @@ -1154,7 +1154,7 @@ static void mgmtGetSuperTableMeta(SQueuedMsg *pMsg) { pMeta->numOfColumns = htons((int16_t)pTable->numOfColumns); pMeta->tableType = pTable->info.type; pMeta->contLen = sizeof(STableMetaMsg) + mgmtSetSchemaFromSuperTable(pMeta->schema, pTable); - strcpy(pMeta->tableId, pTable->info.tableId); + strncpy(pMeta->tableId, pTable->info.tableId, TSDB_TABLE_ID_LEN); SRpcMsg rpcRsp = { .handle = pMsg->thandle,