[td-171] refactor codes

This commit is contained in:
hjxilinx 2020-04-23 14:00:46 +08:00
parent 79f3d77852
commit 5e221d7231
14 changed files with 168 additions and 192 deletions

View File

@ -69,8 +69,9 @@ typedef struct SJoinSubquerySupporter {
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name, int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name,
STableMeta* pTableMeta, STableDataBlocks** dataBlocks); STableMeta* pTableMeta, STableDataBlocks** dataBlocks);
void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks); void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks);
void tscDestroyDataBlock(STableDataBlocks* pDataBlock); void tscDestroyDataBlock(STableDataBlocks* pDataBlock);
void tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf);
SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes, SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes,
uint32_t offset); uint32_t offset);
@ -133,9 +134,8 @@ SFieldSupInfo* tscFieldInfoInsert(SFieldInfo* pFieldInfo, int32_t index, TAOS_FI
SFieldSupInfo* tscFieldInfoGetSupp(SFieldInfo* pFieldInfo, int32_t index); SFieldSupInfo* tscFieldInfoGetSupp(SFieldInfo* pFieldInfo, int32_t index);
TAOS_FIELD* tscFieldInfoGetField(SFieldInfo* pFieldInfo, int32_t index); TAOS_FIELD* tscFieldInfoGetField(SFieldInfo* pFieldInfo, int32_t index);
void tscFieldInfoCalOffset(SQueryInfo* pQueryInfo); void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo);
void tscFieldInfoCopy(SFieldInfo* src, SFieldInfo* dst, const int32_t* indexList, int32_t size); void tscFieldInfoCopy(SFieldInfo* dst, const SFieldInfo* src);
void tscFieldInfoCopyAll(SFieldInfo* dst, SFieldInfo* src);
void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo); void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo);
int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index); int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index);
@ -235,11 +235,6 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex);
int16_t tscGetJoinTagColIndexByUid(STagCond* pTagCond, uint64_t uid); int16_t tscGetJoinTagColIndexByUid(STagCond* pTagCond, uint64_t uid);
TAOS* taos_connect_a(char* ip, char* user, char* pass, char* db, uint16_t port, void (*fp)(void*, TAOS_RES*, int),
void* param, void** taos);
void sortRemoveDuplicates(STableDataBlocks* dataBuf);
void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex); void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex);
bool hasMoreVnodesToTry(SSqlObj *pSql); bool hasMoreVnodesToTry(SSqlObj *pSql);

View File

@ -22,20 +22,28 @@ extern "C" {
#include "os.h" #include "os.h"
#include "qsqlparser.h"
#include "qsqltype.h"
#include "qtsbuf.h"
#include "taos.h" #include "taos.h"
#include "taosdef.h" #include "taosdef.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tarray.h" #include "tarray.h"
#include "tglobal.h" #include "tglobal.h"
#include "tutil.h"
#include "tsqlfunction.h" #include "tsqlfunction.h"
#include "tutil.h"
#include "qsqlparser.h"
#include "qsqltype.h"
#include "qtsbuf.h"
#include "queryExecutor.h" #include "queryExecutor.h"
// forward declaration // forward declaration
struct SSqlInfo; struct SSqlInfo;
struct SLocalReducer;
// data source from sql string or from file
enum {
DATA_FROM_SQL_STRING = 1,
DATA_FROM_DATA_FILE = 2,
};
typedef SCMSTableVgroupRspMsg SVgroupsInfo; typedef SCMSTableVgroupRspMsg SVgroupsInfo;
@ -47,19 +55,22 @@ typedef struct STableComInfo {
} STableComInfo; } STableComInfo;
typedef struct STableMeta { typedef struct STableMeta {
//super table if it is created according to super table, otherwise, tableInfo is used // super table if it is created according to super table, otherwise, tableInfo is used
union { struct STableMeta* pSTable; STableComInfo tableInfo; }; union {
uint8_t tableType; struct STableMeta *pSTable;
int16_t sversion; STableComInfo tableInfo;
};
uint8_t tableType;
int16_t sversion;
SCMVgroupInfo vgroupInfo; SCMVgroupInfo vgroupInfo;
int32_t sid; // the index of one table in a virtual node int32_t sid; // the index of one table in a virtual node
uint64_t uid; // unique id of a table uint64_t uid; // unique id of a table
SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info
} STableMeta; } STableMeta;
typedef struct STableMetaInfo { typedef struct STableMetaInfo {
STableMeta * pTableMeta; // table meta, cached in client side and acquried by name STableMeta * pTableMeta; // table meta, cached in client side and acquried by name
SVgroupsInfo* vgroupList; SVgroupsInfo *vgroupList;
/* /*
* 1. keep the vnode index during the multi-vnode super table projection query * 1. keep the vnode index during the multi-vnode super table projection query
@ -73,16 +84,16 @@ typedef struct STableMetaInfo {
/* the structure for sql function in select clause */ /* the structure for sql function in select clause */
typedef struct SSqlExpr { typedef struct SSqlExpr {
char aliasName[TSDB_COL_NAME_LEN]; // as aliasName char aliasName[TSDB_COL_NAME_LEN]; // as aliasName
SColIndex colInfo; SColIndex colInfo;
int64_t uid; // refactor use the pointer int64_t uid; // refactor use the pointer
int16_t functionId; // function id in aAgg array int16_t functionId; // function id in aAgg array
int16_t resType; // return value type int16_t resType; // return value type
int16_t resBytes; // length of return value int16_t resBytes; // length of return value
int16_t interResBytes; // inter result buffer size int16_t interResBytes; // inter result buffer size
int16_t numOfParams; // argument value of each function int16_t numOfParams; // argument value of each function
tVariant param[3]; // parameters are not more than 3 tVariant param[3]; // parameters are not more than 3
int32_t offset; // sub result column value of arithmetic expression. int32_t offset; // sub result column value of arithmetic expression.
} SSqlExpr; } SSqlExpr;
typedef struct SColumnIndex { typedef struct SColumnIndex {
@ -92,14 +103,14 @@ typedef struct SColumnIndex {
typedef struct SFieldSupInfo { typedef struct SFieldSupInfo {
bool visible; bool visible;
SArithExprInfo* pArithExprInfo; SArithExprInfo *pArithExprInfo;
SSqlExpr* pSqlExpr; SSqlExpr * pSqlExpr;
} SFieldSupInfo; } SFieldSupInfo;
typedef struct SFieldInfo { typedef struct SFieldInfo {
int16_t numOfOutput; // number of column in result int16_t numOfOutput; // number of column in result
SArray* pFields; // SArray<TAOS_FIELD> SArray *pFields; // SArray<TAOS_FIELD>
SArray* pSupportInfo; // SArray<SFieldSupInfo> SArray *pSupportInfo; // SArray<SFieldSupInfo>
} SFieldInfo; } SFieldInfo;
typedef struct SColumn { typedef struct SColumn {
@ -108,11 +119,9 @@ typedef struct SColumn {
SColumnFilterInfo *filterInfo; SColumnFilterInfo *filterInfo;
} SColumn; } SColumn;
struct SLocalReducer;
typedef struct SCond { typedef struct SCond {
uint64_t uid; uint64_t uid;
int32_t len; // length of tag query condition data int32_t len; // length of tag query condition data
char * cond; char * cond;
} SCond; } SCond;
@ -139,7 +148,7 @@ typedef struct STagCond {
SJoinInfo joinInfo; SJoinInfo joinInfo;
// for different table, the query condition must be seperated // for different table, the query condition must be seperated
SArray* pCond; SArray *pCond;
} STagCond; } STagCond;
typedef struct SParamInfo { typedef struct SParamInfo {
@ -180,7 +189,7 @@ typedef struct STableDataBlocks {
SParamInfo *params; SParamInfo *params;
} STableDataBlocks; } STableDataBlocks;
typedef struct SDataBlockList { // todo remove typedef struct SDataBlockList { // todo remove
uint32_t nSize; uint32_t nSize;
uint32_t nAlloc; uint32_t nAlloc;
STableDataBlocks **pData; STableDataBlocks **pData;
@ -196,9 +205,9 @@ typedef struct SQueryInfo {
int64_t slidingTime; // sliding window in mseconds int64_t slidingTime; // sliding window in mseconds
SSqlGroupbyExpr groupbyExpr; // group by tags info SSqlGroupbyExpr groupbyExpr; // group by tags info
SArray* colList; // SArray<SColumn*> SArray * colList; // SArray<SColumn*>
SFieldInfo fieldsInfo; SFieldInfo fieldsInfo;
SArray* exprsInfo; // SArray<SSqlExpr*> SArray * exprsInfo; // SArray<SSqlExpr*>
SLimitVal limit; SLimitVal limit;
SLimitVal slimit; SLimitVal slimit;
STagCond tagCond; STagCond tagCond;
@ -215,19 +224,13 @@ typedef struct SQueryInfo {
int64_t prjOffset; int64_t prjOffset;
} SQueryInfo; } SQueryInfo;
// data source from sql string or from file
enum {
DATA_FROM_SQL_STRING = 1,
DATA_FROM_DATA_FILE = 2,
};
typedef struct { typedef struct {
int command; int command;
uint8_t msgType; uint8_t msgType;
union { union {
bool existsCheck; // check if the table exists or not bool existsCheck; // check if the table exists or not
bool autoCreated; // if the table is missing, on-the-fly create it. during getmeterMeta bool autoCreated; // if the table is missing, on-the-fly create it. during getmeterMeta
int8_t dataSourceType; // load data from file or not int8_t dataSourceType; // load data from file or not
}; };
@ -245,9 +248,9 @@ typedef struct {
SQueryInfo **pQueryInfo; SQueryInfo **pQueryInfo;
int32_t numOfClause; int32_t numOfClause;
SDataBlockList *pDataBlocks; // submit data blocks after parsing sql SDataBlockList *pDataBlocks; // submit data blocks after parsing sql
char * curSql; // current sql, resume position of sql after parsing paused char * curSql; // current sql, resume position of sql after parsing paused
void * pTableList; // referred table involved in sql void * pTableList; // referred table involved in sql
// for parameter ('?') binding and batch processing // for parameter ('?') binding and batch processing
int32_t batchSize; int32_t batchSize;
@ -259,50 +262,48 @@ typedef struct SResRec {
int numOfTotal; int numOfTotal;
} SResRec; } SResRec;
struct STSBuf;
typedef struct { typedef struct {
int64_t numOfRows; // num of results in current retrieved int64_t numOfRows; // num of results in current retrieved
int64_t numOfTotal; // num of total results int64_t numOfTotal; // num of total results
int64_t numOfTotalInCurrentClause; // num of total result in current subclause int64_t numOfTotalInCurrentClause; // num of total result in current subclause
char * pRsp; char * pRsp;
int32_t rspType; int32_t rspType;
int32_t rspLen; int32_t rspLen;
uint64_t qhandle; uint64_t qhandle;
int64_t uid; int64_t uid;
int64_t useconds; int64_t useconds;
int64_t offset; // offset value from vnode during projection query of stable int64_t offset; // offset value from vnode during projection query of stable
int32_t row; int32_t row;
int16_t numOfCols; int16_t numOfCols;
int16_t precision; int16_t precision;
bool completed; bool completed;
int32_t code; int32_t code;
int32_t numOfGroups; int32_t numOfGroups;
SResRec * pGroupRec; SResRec * pGroupRec;
char * data; char * data;
void ** tsrow; void ** tsrow;
char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t) char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t)
SColumnIndex *pColumnIndex; SColumnIndex * pColumnIndex;
struct SLocalReducer *pLocalReducer; struct SLocalReducer *pLocalReducer;
} SSqlRes; } SSqlRes;
typedef struct STscObj { typedef struct STscObj {
void * signature; void * signature;
void * pTimer; void * pTimer;
char mgmtIp[TSDB_USER_LEN]; char mgmtIp[TSDB_USER_LEN];
uint16_t mgmtPort; uint16_t mgmtPort;
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN]; char pass[TSDB_KEY_LEN];
char acctId[TSDB_DB_NAME_LEN]; char acctId[TSDB_DB_NAME_LEN];
char db[TSDB_TABLE_ID_LEN]; char db[TSDB_TABLE_ID_LEN];
char sversion[TSDB_VERSION_LEN]; char sversion[TSDB_VERSION_LEN];
char writeAuth : 1; char writeAuth : 1;
char superAuth : 1; char superAuth : 1;
struct SSqlObj *pSql; struct SSqlObj * pSql;
struct SSqlObj *pHb; struct SSqlObj * pHb;
struct SSqlObj *sqlList; struct SSqlObj * sqlList;
struct SSqlStream *streamList; struct SSqlStream *streamList;
pthread_mutex_t mutex; pthread_mutex_t mutex;
} STscObj; } STscObj;
typedef struct SSqlObj { typedef struct SSqlObj {
@ -310,23 +311,23 @@ typedef struct SSqlObj {
STscObj *pTscObj; STscObj *pTscObj;
void (*fp)(); void (*fp)();
void (*fetchFp)(); void (*fetchFp)();
void * param; void * param;
uint32_t ip; uint32_t ip;
short vnode; short vnode;
int64_t stime; int64_t stime;
uint32_t queryId; uint32_t queryId;
void * pStream; void * pStream;
void * pSubscription; void * pSubscription;
char * sqlstr; char * sqlstr;
char retry; char retry;
char maxRetry; char maxRetry;
SRpcIpSet ipList; SRpcIpSet ipList;
char freed : 4; char freed : 4;
char listed : 4; char listed : 4;
tsem_t rspSem; tsem_t rspSem;
SSqlCmd cmd; SSqlCmd cmd;
SSqlRes res; SSqlRes res;
uint8_t numOfSubs; uint8_t numOfSubs;
struct SSqlObj **pSubs; struct SSqlObj **pSubs;
struct SSqlObj * prev, *next; struct SSqlObj * prev, *next;
} SSqlObj; } SSqlObj;
@ -360,13 +361,10 @@ typedef struct SSqlStream {
} SSqlStream; } SSqlStream;
int32_t tscInitRpc(const char *user, const char *secret); int32_t tscInitRpc(const char *user, const char *secret);
void tscInitMsgsFp();
// tscSql API
int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion); int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion);
void tscInitMsgsFp();
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
void tscProcessMsgFromServer(SRpcMsg *rpcMsg); void tscProcessMsgFromServer(SRpcMsg *rpcMsg);
int tscProcessSql(SSqlObj *pSql); int tscProcessSql(SSqlObj *pSql);
@ -379,12 +377,8 @@ int tscProcessLocalCmd(SSqlObj *pSql);
int tscCfgDynamicOptions(char *msg); int tscCfgDynamicOptions(char *msg);
int taos_retrieve(TAOS_RES *res); int taos_retrieve(TAOS_RES *res);
/* int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo *pQueryInfo);
* transfer function for metric query in stream computing, the function need to be change void tscRestoreSQLFuncForSTableQuery(SQueryInfo *pQueryInfo);
* before send query message to vnode
*/
int32_t tscTansformSQLFunctionForSTableQuery(SQueryInfo *pQueryInfo);
void tscRestoreSQLFunctionForMetricQuery(SQueryInfo *pQueryInfo);
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo); int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscDestroyResPointerInfo(SSqlRes *pRes); void tscDestroyResPointerInfo(SSqlRes *pRes);
@ -414,10 +408,13 @@ void tscFreeSqlObj(SSqlObj *pObj);
void tscCloseTscObj(STscObj *pObj); void tscCloseTscObj(STscObj *pObj);
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen); TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
void *param, void **taos);
void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, void (*fp)(), void *param, const char *sqlstr, size_t sqlLen);
void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql); void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql);
void tscKillMetricQuery(SSqlObj *pSql); void tscKillSTableQuery(SSqlObj *pSql);
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen); void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen);
bool tscIsUpdateQuery(STscObj *pObj); bool tscIsUpdateQuery(STscObj *pObj);
bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes); bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes);
@ -426,20 +423,22 @@ char *tscGetErrorMsgPayload(SSqlCmd *pCmd);
int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql); int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql);
void tscQueueAsyncFreeResult(SSqlObj *pSql); void tscQueueAsyncFreeResult(SSqlObj *pSql);
int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo); int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo);
char* tscGetResultColumnChr(SSqlRes* pRes, SQueryInfo* pQueryInfo, int32_t column); char * tscGetResultColumnChr(SSqlRes *pRes, SQueryInfo *pQueryInfo, int32_t column);
extern void * pVnodeConn; extern void * pVnodeConn;
extern void * pTscMgmtConn; extern void * pTscMgmtConn;
extern void * tscCacheHandle; extern void * tscCacheHandle;
extern int slaveIndex; extern int slaveIndex;
extern void * tscTmr; extern void * tscTmr;
extern void * tscQhandle; extern void * tscQhandle;
extern int tscKeepConn[]; extern int tscKeepConn[];
extern int tsInsertHeadSize; extern int tsInsertHeadSize;
extern int tscNumOfThreads; extern int tscNumOfThreads;
extern SRpcIpSet tscMgmtIpSet; extern SRpcIpSet tscMgmtIpSet;
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int numOfRows); typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int numOfRows);

View File

@ -488,7 +488,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
*/ */
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
tscTansformSQLFunctionForSTableQuery(pQueryInfo); tscTansformSQLFuncForSTableQuery(pQueryInfo);
tscIncStreamExecutionCount(pSql->pStream); tscIncStreamExecutionCount(pSql->pStream);
} else { } else {
tscTrace("%p get tableMeta successfully", pSql); tscTrace("%p get tableMeta successfully", pSql);

View File

@ -296,7 +296,7 @@ static int32_t tscProcessDescribeTable(SSqlObj *pSql) {
int32_t rowLen = int32_t rowLen =
tscBuildMeterSchemaResultFields(pSql, NUM_OF_DESCRIBE_TABLE_COLUMNS, TYPE_COLUMN_LENGTH, note_field_length); tscBuildMeterSchemaResultFields(pSql, NUM_OF_DESCRIBE_TABLE_COLUMNS, TYPE_COLUMN_LENGTH, note_field_length);
tscFieldInfoCalOffset(pQueryInfo); tscFieldInfoUpdateOffset(pQueryInfo);
return tscSetValueToResObj(pSql, rowLen); return tscSetValueToResObj(pSql, rowLen);
} }

View File

@ -613,7 +613,7 @@ static void tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, in
} }
// data block is disordered, sort it in ascending order // data block is disordered, sort it in ascending order
void sortRemoveDuplicates(STableDataBlocks *dataBuf) { void tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf) {
SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData; SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData;
// size is less than the total size, since duplicated rows may be removed yet. // size is less than the total size, since duplicated rows may be removed yet.

View File

@ -1258,7 +1258,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
* transfer sql functions that need secondary merge into another format * transfer sql functions that need secondary merge into another format
* in dealing with metric queries such as: count/first/last * in dealing with metric queries such as: count/first/last
*/ */
tscTansformSQLFunctionForSTableQuery(pQueryInfo); tscTansformSQLFuncForSTableQuery(pQueryInfo);
if (hasUnsupportFunctionsForSTableQuery(pQueryInfo)) { if (hasUnsupportFunctionsForSTableQuery(pQueryInfo)) {
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
@ -2242,7 +2242,7 @@ bool validateIpAddress(const char* ip, size_t size) {
return ipAddr != INADDR_NONE; return ipAddr != INADDR_NONE;
} }
int32_t tscTansformSQLFunctionForSTableQuery(SQueryInfo* pQueryInfo) { int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (pTableMetaInfo->pTableMeta == NULL || !UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { if (pTableMetaInfo->pTableMeta == NULL || !UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
@ -2282,7 +2282,7 @@ int32_t tscTansformSQLFunctionForSTableQuery(SQueryInfo* pQueryInfo) {
} }
/* transfer the field-info back to original input format */ /* transfer the field-info back to original input format */
void tscRestoreSQLFunctionForMetricQuery(SQueryInfo* pQueryInfo) { void tscRestoreSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
return; return;
@ -2547,7 +2547,7 @@ void setColumnOffsetValueInResultset(SQueryInfo* pQueryInfo) {
if (QUERY_IS_STABLE_QUERY(pQueryInfo->type)) { if (QUERY_IS_STABLE_QUERY(pQueryInfo->type)) {
tscFieldInfoUpdateOffsetForInterResult(pQueryInfo); tscFieldInfoUpdateOffsetForInterResult(pQueryInfo);
} else { } else {
tscFieldInfoCalOffset(pQueryInfo); tscFieldInfoUpdateOffset(pQueryInfo);
} }
} }
@ -2828,7 +2828,7 @@ static int32_t extractColumnFilterInfo(SQueryInfo* pQueryInfo, SColumnIndex* pIn
const char* msg1 = "non binary column not support like operator"; const char* msg1 = "non binary column not support like operator";
const char* msg2 = "binary column not support this operator"; const char* msg2 = "binary column not support this operator";
SColumn* pColumn = tscColumnListInsert(pQueryInfo->colList, pIndex); SColumn* pColumn = tscColumnListInsert(pQueryInfo->colList, pIndex);
SColumnFilterInfo* pColFilter = NULL; SColumnFilterInfo* pColFilter = NULL;
/* /*

View File

@ -168,7 +168,7 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
pTableMeta->sid = pTableMetaMsg->sid; pTableMeta->sid = pTableMetaMsg->sid;
pTableMeta->uid = pTableMetaMsg->uid; pTableMeta->uid = pTableMetaMsg->uid;
pTableMeta->vgroupInfo = pTableMetaMsg->vgroup; // pTableMeta->vgroupInfo = pTableMetaMsg->vgroup;
memcpy(pTableMeta->schema, pTableMetaMsg->schema, schemaSize); memcpy(pTableMeta->schema, pTableMetaMsg->schema, schemaSize);

View File

@ -261,8 +261,8 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pReducer->pCtx = (SQLFunctionCtx *)calloc(size, sizeof(SQLFunctionCtx)); pReducer->pCtx = (SQLFunctionCtx *)calloc(size, sizeof(SQLFunctionCtx));
pReducer->rowSize = pMemBuffer[0]->nElemSize; pReducer->rowSize = pMemBuffer[0]->nElemSize;
tscRestoreSQLFunctionForMetricQuery(pQueryInfo); tscRestoreSQLFuncForSTableQuery(pQueryInfo);
tscFieldInfoCalOffset(pQueryInfo); tscFieldInfoUpdateOffset(pQueryInfo);
if (pReducer->rowSize > pMemBuffer[0]->pageSize) { if (pReducer->rowSize > pMemBuffer[0]->pageSize) {
assert(false); // todo fixed row size is larger than the minimum page size; assert(false); // todo fixed row size is larger than the minimum page size;

View File

@ -458,7 +458,7 @@ int tscProcessSql(SSqlObj *pSql) {
return doProcessSql(pSql); return doProcessSql(pSql);
} }
void tscKillMetricQuery(SSqlObj *pSql) { void tscKillSTableQuery(SSqlObj *pSql) {
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
@ -2202,7 +2202,7 @@ int tscProcessShowRsp(SSqlObj *pSql) {
pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes); pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes);
} }
tscFieldInfoCalOffset(pQueryInfo); tscFieldInfoUpdateOffset(pQueryInfo);
tfree(pTableMeta); tfree(pTableMeta);
return 0; return 0;

View File

@ -964,7 +964,7 @@ void taos_stop_query(TAOS_RES *res) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
tscKillMetricQuery(pSql); tscKillSTableQuery(pSql);
return; return;
} }

View File

@ -86,7 +86,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
} }
tscTansformSQLFunctionForSTableQuery(pQueryInfo); tscTansformSQLFuncForSTableQuery(pQueryInfo);
// failed to get meter/metric meta, retry in 10sec. // failed to get meter/metric meta, retry in 10sec.
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {

View File

@ -305,7 +305,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond); tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond);
pQueryInfo->exprsInfo = tscSqlExprCopy(pSupporter->exprsInfo, pSupporter->uid, false); pQueryInfo->exprsInfo = tscSqlExprCopy(pSupporter->exprsInfo, pSupporter->uid, false);
tscFieldInfoCopyAll(&pQueryInfo->fieldsInfo, &pSupporter->fieldsInfo); tscFieldInfoCopy(&pQueryInfo->fieldsInfo, &pSupporter->fieldsInfo);
pSupporter->fieldsInfo.numOfOutput = 0; pSupporter->fieldsInfo.numOfOutput = 0;
@ -321,7 +321,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
assert(pNew->numOfSubs == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1); assert(pNew->numOfSubs == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
tscFieldInfoCalOffset(pNewQueryInfo); tscFieldInfoUpdateOffset(pNewQueryInfo);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0);
@ -859,7 +859,7 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
tscColumnListAssign(pSupporter->colList, pNewQueryInfo->colList, 0); tscColumnListAssign(pSupporter->colList, pNewQueryInfo->colList, 0);
pSupporter->exprsInfo = tscSqlExprCopy(pNewQueryInfo->exprsInfo, pSupporter->uid, false); pSupporter->exprsInfo = tscSqlExprCopy(pNewQueryInfo->exprsInfo, pSupporter->uid, false);
tscFieldInfoCopyAll(&pSupporter->fieldsInfo, &pNewQueryInfo->fieldsInfo); tscFieldInfoCopy(&pSupporter->fieldsInfo, &pNewQueryInfo->fieldsInfo);
tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond); tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond);

View File

@ -475,6 +475,7 @@ SDataBlockList* tscCreateBlockArrayList() {
if (pDataBlockArrayList == NULL) { if (pDataBlockArrayList == NULL) {
return NULL; return NULL;
} }
pDataBlockArrayList->nAlloc = DEFAULT_INITIAL_NUM_OF_BLOCK; pDataBlockArrayList->nAlloc = DEFAULT_INITIAL_NUM_OF_BLOCK;
pDataBlockArrayList->pData = calloc(1, POINTER_BYTES * pDataBlockArrayList->nAlloc); pDataBlockArrayList->pData = calloc(1, POINTER_BYTES * pDataBlockArrayList->nAlloc);
if (pDataBlockArrayList->pData == NULL) { if (pDataBlockArrayList->pData == NULL) {
@ -716,7 +717,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi
} }
SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData; SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
sortRemoveDuplicates(pOneTableBlock); tscSortRemoveDataBlockDupRows(pOneTableBlock);
char* e = (char*)pBlocks->data + pOneTableBlock->rowSize*(pBlocks->numOfRows-1); char* e = (char*)pBlocks->data + pOneTableBlock->rowSize*(pBlocks->numOfRows-1);
@ -838,7 +839,7 @@ SFieldSupInfo* tscFieldInfoInsert(SFieldInfo* pFieldInfo, int32_t index, TAOS_FI
return taosArrayInsert(pFieldInfo->pSupportInfo, index, &info); return taosArrayInsert(pFieldInfo->pSupportInfo, index, &info);
} }
void tscFieldInfoCalOffset(SQueryInfo* pQueryInfo) { void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo) {
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0); SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0);
@ -869,26 +870,7 @@ void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo) {
} }
} }
void tscFieldInfoCopy(SFieldInfo* src, SFieldInfo* dst, const int32_t* indexList, int32_t size) { void tscFieldInfoCopy(SFieldInfo* dst, const SFieldInfo* src) {
if (src == NULL) {
return;
}
if (size <= 0) {
tscFieldInfoCopyAll(dst, src);
} else { // only copy the required column
for (int32_t i = 0; i < size; ++i) {
assert(indexList[i] >= 0 && indexList[i] <= src->numOfOutput);
TAOS_FIELD* p = taosArrayGet(src->pFields, indexList[i]);
SFieldSupInfo* pInfo = taosArrayGet(src->pSupportInfo, indexList[i]);
SFieldSupInfo* pInfo1 = tscFieldInfoAppend(dst, p);
*pInfo1 = *pInfo;
}
}
}
void tscFieldInfoCopyAll(SFieldInfo* dst, SFieldInfo* src) {
dst->numOfOutput = src->numOfOutput; dst->numOfOutput = src->numOfOutput;
taosArrayCopy(dst->pFields, src->pFields); taosArrayCopy(dst->pFields, src->pFields);
@ -1818,20 +1800,22 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
int32_t numOfOutput = tscSqlExprNumOfExprs(pNewQueryInfo); int32_t numOfOutput = tscSqlExprNumOfExprs(pNewQueryInfo);
if (numOfOutput > 0) { if (numOfOutput > 0) { // todo refactor to extract method
int32_t* indexList = calloc(1, numOfOutput * sizeof(int32_t));
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
for (int32_t i = 0, j = 0; i < numOfExprs; ++i) { for (int32_t i = 0; i < numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->uid == uid) { if (pExpr->uid == uid) {
indexList[j++] = i; TAOS_FIELD* p = tscFieldInfoGetField(pFieldInfo, i);
SFieldSupInfo* pInfo = tscFieldInfoGetSupp(pFieldInfo, i);
SFieldSupInfo* pInfo1 = tscFieldInfoAppend(&pNewQueryInfo->fieldsInfo, p);
*pInfo1 = *pInfo;
} }
} }
tscFieldInfoCopy(&pQueryInfo->fieldsInfo, &pNewQueryInfo->fieldsInfo, indexList, numOfOutput);
free(indexList);
// make sure the the sqlExpr for each fields is correct // make sure the the sqlExpr for each fields is correct
// todo handle the agg arithmetic expression // todo handle the agg arithmetic expression
for(int32_t f = 0; f < pNewQueryInfo->fieldsInfo.numOfOutput; ++f) { for(int32_t f = 0; f < pNewQueryInfo->fieldsInfo.numOfOutput; ++f) {
@ -1930,9 +1914,7 @@ bool tscIsUpdateQuery(STscObj* pObj) {
SSqlCmd* pCmd = &pObj->pSql->cmd; SSqlCmd* pCmd = &pObj->pSql->cmd;
return ((pCmd->command >= TSDB_SQL_INSERT && pCmd->command <= TSDB_SQL_DROP_DNODE) || return ((pCmd->command >= TSDB_SQL_INSERT && pCmd->command <= TSDB_SQL_DROP_DNODE) ||
TSDB_SQL_USE_DB == pCmd->command) TSDB_SQL_USE_DB == pCmd->command);
? 1
: 0;
} }
int32_t tscInvalidSQLErrMsg(char* msg, const char* additionalInfo, const char* sql) { int32_t tscInvalidSQLErrMsg(char* msg, const char* additionalInfo, const char* sql) {

View File

@ -364,7 +364,7 @@ static void mgmtAddTableIntoStable(SSuperTableObj *pStable, SChildTableObj *pCta
bool find = false; bool find = false;
int32_t pos = 0; int32_t pos = 0;
for (int pos = 0; pos < pStable->vgLen; ++pos) { for (pos = 0; pos < pStable->vgLen; ++pos) {
if (pStable->vgList[pos] == 0) break; if (pStable->vgList[pos] == 0) break;
if (pStable->vgList[pos] == pCtable->vgId) { if (pStable->vgList[pos] == pCtable->vgId) {
find = true; find = true;
@ -1134,7 +1134,7 @@ void mgmtDropAllSuperTables(SDbObj *pDropDb) {
static int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable) { static int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable) {
int32_t numOfCols = pTable->numOfColumns + pTable->numOfTags; int32_t numOfCols = pTable->numOfColumns + pTable->numOfTags;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
strcpy(pSchema->name, pTable->schema[i].name); strncpy(pSchema->name, pTable->schema[i].name, TSDB_TABLE_ID_LEN);
pSchema->type = pTable->schema[i].type; pSchema->type = pTable->schema[i].type;
pSchema->bytes = htons(pTable->schema[i].bytes); pSchema->bytes = htons(pTable->schema[i].bytes);
pSchema->colId = htons(pTable->schema[i].colId); pSchema->colId = htons(pTable->schema[i].colId);
@ -1154,7 +1154,7 @@ static void mgmtGetSuperTableMeta(SQueuedMsg *pMsg) {
pMeta->numOfColumns = htons((int16_t)pTable->numOfColumns); pMeta->numOfColumns = htons((int16_t)pTable->numOfColumns);
pMeta->tableType = pTable->info.type; pMeta->tableType = pTable->info.type;
pMeta->contLen = sizeof(STableMetaMsg) + mgmtSetSchemaFromSuperTable(pMeta->schema, pTable); pMeta->contLen = sizeof(STableMetaMsg) + mgmtSetSchemaFromSuperTable(pMeta->schema, pTable);
strcpy(pMeta->tableId, pTable->info.tableId); strncpy(pMeta->tableId, pTable->info.tableId, TSDB_TABLE_ID_LEN);
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.handle = pMsg->thandle, .handle = pMsg->thandle,