add the union support in sql parser: fix some bugs. #1032. [TBASE-1140]
This commit is contained in:
parent
6287be5a59
commit
35a534ec9a
|
@ -208,9 +208,10 @@ typedef struct SDataBlockList {
|
|||
} SDataBlockList;
|
||||
|
||||
typedef struct SQueryInfo {
|
||||
uint16_t type; // query/insert/import type
|
||||
char intervalTimeUnit;
|
||||
|
||||
int16_t command; // the command may be different for each subclause, so keep it seperately.
|
||||
uint16_t type; // query/insert/import type
|
||||
char intervalTimeUnit;
|
||||
|
||||
int64_t etime, stime;
|
||||
int64_t nAggTimeInterval; // aggregation time interval
|
||||
int64_t nSlidingTime; // sliding window in mseconds
|
||||
|
@ -229,6 +230,7 @@ typedef struct SQueryInfo {
|
|||
struct STSBuf * tsBuf;
|
||||
int64_t * defaultVal; // default value for interpolation
|
||||
char * msg; // pointer to the pCmd->payload to keep error message temporarily
|
||||
int64_t clauseLimit; // limit for this sub clause
|
||||
} SQueryInfo;
|
||||
|
||||
// data source from sql string or from file
|
||||
|
@ -251,6 +253,7 @@ typedef struct {
|
|||
union {
|
||||
int32_t count;
|
||||
int32_t numOfTablesInSubmit;
|
||||
int32_t clauseIndex; // index of multiple subclause query
|
||||
};
|
||||
|
||||
short numOfCols;
|
||||
|
|
|
@ -439,8 +439,6 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
|
|||
taos_fetch_rows_a(tres, joinRetrieveCallback, param);
|
||||
} else if (numOfRows == 0) { // no data from this vnode anymore
|
||||
if (tscProjectionQueryOnSTable(&pParentSql->cmd, 0)) {
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
|
||||
|
||||
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
|
||||
assert(pQueryInfo->numOfTables == 1);
|
||||
|
||||
|
|
|
@ -68,7 +68,7 @@ static int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQ
|
|||
static int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnList* pIdList, int16_t bytes,
|
||||
int8_t type, char* fieldName);
|
||||
static int32_t changeFunctionID(int32_t optr, int16_t* functionId);
|
||||
static int32_t parseSelectClause(SSqlCmd* pCmd, tSQLExprList* pSelection, bool isMetric);
|
||||
static int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isMetric);
|
||||
|
||||
static bool validateIpAddress(const char* ip, size_t size);
|
||||
static bool hasUnsupportFunctionsForMetricQuery(SQueryInfo* pQueryInfo);
|
||||
|
@ -82,7 +82,7 @@ static int32_t setSlidingClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql);
|
|||
|
||||
static int32_t addProjectionExprAndResultField(SQueryInfo* pQueryInfo, tSQLExprItem* pItem);
|
||||
|
||||
static int32_t parseWhereClause(SSqlObj* pSql, tSQLExpr** pExpr);
|
||||
static int32_t parseWhereClause(SQueryInfo* pQueryInfo, tSQLExpr** pExpr, SSqlObj* pSql);
|
||||
static int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL);
|
||||
static int32_t parseOrderbyClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSchema* pSchema, int32_t numOfCols);
|
||||
|
||||
|
@ -101,7 +101,7 @@ static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField);
|
|||
static bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo);
|
||||
static void updateTagColumnIndex(SQueryInfo* pQueryInfo, int32_t tableIndex);
|
||||
|
||||
static int32_t parseLimitClause(SSqlObj* pSql, int32_t subClauseIndex, SQuerySQL* pQuerySql);
|
||||
static int32_t parseLimitClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSqlObj* pSql);
|
||||
static int32_t parseCreateDBOptions(SSqlCmd* pCmd, SCreateDBInfo* pCreateDbSql);
|
||||
static int32_t getColumnIndexByNameEx(SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex);
|
||||
static int32_t getTableIndexByName(SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex);
|
||||
|
@ -518,6 +518,15 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
|||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
// set the command/globallimit parameters from the first subclause to the sqlcmd object
|
||||
SQueryInfo* pQueryInfo1 = tscGetQueryInfoDetail(pCmd, 0);
|
||||
pCmd->command = pQueryInfo1->command;
|
||||
|
||||
// if there is only one element, the limit of clause is the limit of global result.
|
||||
if (pCmd->numOfClause == 1) {
|
||||
pCmd->globalLimit = pQueryInfo1->clauseLimit;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS; // do not build query message here
|
||||
}
|
||||
|
@ -1051,13 +1060,13 @@ static void extractColumnNameFromString(tSQLExprItem* pItem) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t parseSelectClause(SSqlCmd* pCmd, tSQLExprList* pSelection, bool isSTable) {
|
||||
int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable) {
|
||||
assert(pSelection != NULL && pCmd != NULL);
|
||||
|
||||
const char* msg1 = "invalid column name/illegal column type in arithmetic expression";
|
||||
const char* msg2 = "functions can not be mixed up";
|
||||
const char* msg3 = "not support query expression";
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
|
||||
|
||||
for (int32_t i = 0; i < pSelection->nExpr; ++i) {
|
||||
int32_t outputIndex = pQueryInfo->fieldsInfo.numOfOutputCols;
|
||||
|
@ -1141,7 +1150,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, tSQLExprList* pSelection, bool isSTable
|
|||
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
|
||||
|
||||
if (tscQueryMetricTags(pQueryInfo)) { // local handle the metric tag query
|
||||
pCmd->command = TSDB_SQL_RETRIEVE_TAGS;
|
||||
pCmd->command = TSDB_SQL_RETRIEVE_TAGS;//todo !!!!!!==================================
|
||||
pCmd->count = pMeterMetaInfo->pMeterMeta->numOfColumns; // the number of meter schema, tricky.
|
||||
}
|
||||
|
||||
|
@ -3543,7 +3552,7 @@ static int32_t getTagQueryCondExpr(SQueryInfo* pQueryInfo, SCondExpr* pCondExpr,
|
|||
|
||||
return ret;
|
||||
}
|
||||
int32_t parseWhereClause(SSqlObj* pSql, tSQLExpr** pExpr) {
|
||||
int32_t parseWhereClause(SQueryInfo* pQueryInfo, tSQLExpr** pExpr, SSqlObj* pSql) {
|
||||
if (pExpr == NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -3553,8 +3562,6 @@ int32_t parseWhereClause(SSqlObj* pSql, tSQLExpr** pExpr) {
|
|||
|
||||
int32_t ret = TSDB_CODE_SUCCESS;
|
||||
|
||||
SSqlCmd* pCmd = &pSql->cmd;
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
||||
pQueryInfo->stime = 0;
|
||||
pQueryInfo->etime = INT64_MAX;
|
||||
|
||||
|
@ -4403,9 +4410,7 @@ bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo) {
|
|||
return (pQueryInfo->stime == pQueryInfo->etime) && (pQueryInfo->stime != 0);
|
||||
}
|
||||
|
||||
int32_t parseLimitClause(SSqlObj* pSql, int32_t subClauseIndex, SQuerySQL* pQuerySql) {
|
||||
SSqlCmd* pCmd = &pSql->cmd;
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex);
|
||||
int32_t parseLimitClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSqlObj* pSql) {
|
||||
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
|
||||
|
||||
const char* msg0 = "soffset/offset can not be less than 0";
|
||||
|
@ -4415,17 +4420,18 @@ int32_t parseLimitClause(SSqlObj* pSql, int32_t subClauseIndex, SQuerySQL* pQuer
|
|||
|
||||
// handle the limit offset value, validate the limit
|
||||
pQueryInfo->limit = pQuerySql->limit;
|
||||
pCmd->globalLimit = pQueryInfo->limit.limit;
|
||||
pQueryInfo->clauseLimit = pQueryInfo->limit.limit;
|
||||
// pCmd->globalLimit = pQueryInfo->limit.limit;
|
||||
|
||||
pQueryInfo->slimit = pQuerySql->slimit;
|
||||
|
||||
if (pQueryInfo->slimit.offset < 0 || pQueryInfo->limit.offset < 0) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg0);
|
||||
return invalidSqlErrMsg(pQueryInfo->msg, msg0);
|
||||
}
|
||||
|
||||
if (pQueryInfo->limit.limit == 0) {
|
||||
tscTrace("%p limit 0, no output result", pSql);
|
||||
pCmd->command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
|
||||
pQueryInfo->command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
|
||||
}
|
||||
|
||||
if (UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) {
|
||||
|
@ -4435,17 +4441,17 @@ int32_t parseLimitClause(SSqlObj* pSql, int32_t subClauseIndex, SQuerySQL* pQuer
|
|||
}
|
||||
|
||||
if (queryOnTags == true) { // local handle the metric tag query
|
||||
pCmd->command = TSDB_SQL_RETRIEVE_TAGS;
|
||||
pQueryInfo->command = TSDB_SQL_RETRIEVE_TAGS;
|
||||
} else {
|
||||
if (tscProjectionQueryOnSTable(&pSql->cmd, 0) &&
|
||||
(pQueryInfo->slimit.limit > 0 || pQueryInfo->slimit.offset > 0)) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
|
||||
return invalidSqlErrMsg(pQueryInfo->msg, msg3);
|
||||
}
|
||||
}
|
||||
|
||||
if (pQueryInfo->slimit.limit == 0) {
|
||||
tscTrace("%p limit 0, no output result", pSql);
|
||||
pCmd->command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
|
||||
pQueryInfo->command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -4464,11 +4470,11 @@ int32_t parseLimitClause(SSqlObj* pSql, int32_t subClauseIndex, SQuerySQL* pQuer
|
|||
SMetricMeta* pMetricMeta = pMeterMetaInfo->pMetricMeta;
|
||||
if (pMeterMetaInfo->pMeterMeta == NULL || pMetricMeta == NULL || pMetricMeta->numOfMeters == 0) {
|
||||
tscTrace("%p no table in metricmeta, no output result", pSql);
|
||||
pCmd->command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
|
||||
pQueryInfo->command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
|
||||
}
|
||||
|
||||
// keep original limitation value in globalLimit
|
||||
pCmd->globalLimit = pQueryInfo->limit.limit;
|
||||
pQueryInfo->clauseLimit = pQueryInfo->limit.limit;
|
||||
} else {
|
||||
if (pQueryInfo->slimit.limit != -1 || pQueryInfo->slimit.offset != 0) {
|
||||
return invalidSqlErrMsg(pQueryInfo->msg, msg1);
|
||||
|
@ -5315,12 +5321,12 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
|
|||
}
|
||||
|
||||
bool isSTable = UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo);
|
||||
if (parseSelectClause(&pSql->cmd, pQuerySql->pSelection, isSTable) != TSDB_CODE_SUCCESS) {
|
||||
if (parseSelectClause(&pSql->cmd, 0, pQuerySql->pSelection, isSTable) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_INVALID_SQL;
|
||||
}
|
||||
|
||||
if (pQuerySql->pWhere != NULL) { // query condition in stream computing
|
||||
if (parseWhereClause(pSql, &pQuerySql->pWhere) != TSDB_CODE_SUCCESS) {
|
||||
if (parseWhereClause(pQueryInfo, &pQuerySql->pWhere, pSql) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_INVALID_SQL;
|
||||
}
|
||||
}
|
||||
|
@ -5401,7 +5407,10 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
|
|||
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, index);
|
||||
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
|
||||
|
||||
if (pMeterMetaInfo == NULL) {
|
||||
pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pQueryInfo);
|
||||
}
|
||||
|
||||
// too many result columns not support order by in query
|
||||
if (pQuerySql->pSelection->nExpr > TSDB_MAX_COLUMNS) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg8);
|
||||
|
@ -5424,6 +5433,8 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
|
|||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg7);
|
||||
}
|
||||
|
||||
pQueryInfo->command = TSDB_SQL_SELECT;
|
||||
|
||||
// set all query tables, which are maybe more than one.
|
||||
for (int32_t i = 0; i < pQuerySql->from->nExpr; ++i) {
|
||||
tVariant* pTableItem = &pQuerySql->from->a[i].pVar;
|
||||
|
@ -5464,7 +5475,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
|
|||
}
|
||||
|
||||
bool isSTable = UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo);
|
||||
if (parseSelectClause(pCmd, pQuerySql->pSelection, isSTable) != TSDB_CODE_SUCCESS) {
|
||||
if (parseSelectClause(pCmd, index, pQuerySql->pSelection, isSTable) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_INVALID_SQL;
|
||||
}
|
||||
|
||||
|
@ -5508,7 +5519,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
|
|||
|
||||
// set where info
|
||||
if (pQuerySql->pWhere != NULL) {
|
||||
if (parseWhereClause(pSql, &pQuerySql->pWhere) != TSDB_CODE_SUCCESS) {
|
||||
if (parseWhereClause(pQueryInfo, &pQuerySql->pWhere, pSql) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_INVALID_SQL;
|
||||
}
|
||||
|
||||
|
@ -5553,7 +5564,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
|
|||
pQueryInfo->limit = pQuerySql->limit;
|
||||
|
||||
// temporarily save the original limitation value
|
||||
if ((code = parseLimitClause(pSql, 0, pQuerySql)) != TSDB_CODE_SUCCESS) {
|
||||
if ((code = parseLimitClause(pQueryInfo, pQuerySql, pSql)) != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -760,8 +760,8 @@ int tscProcessSql(SSqlObj *pSql) {
|
|||
char * name = NULL;
|
||||
SSqlRes *pRes = &pSql->res;
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
|
||||
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
||||
|
||||
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||
SMeterMetaInfo *pMeterMetaInfo = NULL;
|
||||
int16_t type = 0;
|
||||
|
||||
|
@ -1492,14 +1492,14 @@ void tscUpdateVnodeInQueryMsg(SSqlObj *pSql, char *buf) {
|
|||
* for meter query, simply return the size <= 1k
|
||||
* for metric query, estimate size according to meter tags
|
||||
*/
|
||||
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd) {
|
||||
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
|
||||
const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
|
||||
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
||||
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
|
||||
|
||||
int32_t srcColListSize = pQueryInfo->colList.numOfCols * sizeof(SColumnInfo);
|
||||
|
||||
int32_t exprSize = sizeof(SSqlFuncExprMsg) * pQueryInfo->fieldsInfo.numOfOutputCols;
|
||||
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
|
||||
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
|
||||
|
||||
// meter query without tags values
|
||||
if (!UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) {
|
||||
|
@ -1564,14 +1564,14 @@ static char *doSerializeTableInfo(SSqlObj *pSql, int32_t numOfMeters, int32_t vn
|
|||
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
|
||||
int32_t size = tscEstimateQueryMsgSize(pCmd);
|
||||
int32_t size = tscEstimateQueryMsgSize(pCmd, pCmd->clauseIndex);
|
||||
|
||||
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
|
||||
tscError("%p failed to malloc for query msg", pSql);
|
||||
return -1;
|
||||
}
|
||||
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||
|
||||
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
|
||||
char * pStart = pCmd->payload + tsRpcHeadSize;
|
||||
|
@ -1623,7 +1623,6 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
pQueryMsg->ekey = htobe64(pQueryInfo->stime);
|
||||
}
|
||||
|
||||
pQueryMsg->num = htonl(0);
|
||||
pQueryMsg->order = htons(pQueryInfo->order.order);
|
||||
pQueryMsg->orderColId = htons(pQueryInfo->order.orderColId);
|
||||
|
||||
|
@ -2880,29 +2879,6 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
return msgLen;
|
||||
}
|
||||
|
||||
int tscProcessRetrieveRspFromMgmt(SSqlObj *pSql) {
|
||||
SSqlRes *pRes = &pSql->res;
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
STscObj *pObj = pSql->pTscObj;
|
||||
|
||||
SRetrieveMeterRsp *pRetrieve = (SRetrieveMeterRsp *)(pRes->pRsp);
|
||||
pRes->numOfRows = htonl(pRetrieve->numOfRows);
|
||||
pRes->precision = htons(pRes->precision);
|
||||
|
||||
pRes->data = pRetrieve->data;
|
||||
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
||||
tscSetResultPointer(pQueryInfo, pRes);
|
||||
|
||||
if (pRes->numOfRows == 0) {
|
||||
taosAddConnIntoCache(tscConnCache, pSql->thandle, pSql->ip, pSql->vnode, pObj->user);
|
||||
pSql->thandle = NULL;
|
||||
}
|
||||
|
||||
pRes->row = 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tscProcessMeterMetaRsp(SSqlObj *pSql) {
|
||||
SMeterMeta *pMeta;
|
||||
SSchema * pSchema;
|
||||
|
@ -3399,8 +3375,6 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
|
|||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
STscObj *pObj = pSql->pTscObj;
|
||||
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
||||
|
||||
SRetrieveMeterRsp *pRetrieve = (SRetrieveMeterRsp *)pRes->pRsp;
|
||||
|
||||
pRes->numOfRows = htonl(pRetrieve->numOfRows);
|
||||
|
@ -3410,7 +3384,8 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
|
|||
pRes->useconds = htobe64(pRetrieve->useconds);
|
||||
pRes->data = pRetrieve->data;
|
||||
|
||||
tscSetResultPointer(tscGetQueryInfoDetail(pCmd, 0), pRes);
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||
tscSetResultPointer(pQueryInfo, pRes);
|
||||
pRes->row = 0;
|
||||
|
||||
/**
|
||||
|
|
|
@ -388,10 +388,10 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) {
|
|||
static void **doSetResultRowData(SSqlObj *pSql) {
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
SSqlRes *pRes = &pSql->res;
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
||||
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||
|
||||
int32_t num = 0;
|
||||
|
||||
for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
|
||||
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order) + pRes->bytes[i] * pRes->row;
|
||||
|
||||
|
@ -575,7 +575,7 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
} else if (pRes->row >= pRes->numOfRows) {
|
||||
} else if (pRes->row >= pRes->numOfRows) { // not a join query
|
||||
tscResetForNextRetrieve(pRes);
|
||||
|
||||
if (pCmd->command < TSDB_SQL_LOCAL) {
|
||||
|
@ -587,7 +587,10 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
// local reducer has handle this situation
|
||||
/*
|
||||
* local reducer has handle this case,
|
||||
* so no need to add the pRes->numOfRows for metric retrieve
|
||||
*/
|
||||
if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC) {
|
||||
pRes->numOfTotal += pRes->numOfRows;
|
||||
}
|
||||
|
@ -610,7 +613,9 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
|
|||
TAOS_ROW rows = taos_fetch_row_impl(res);
|
||||
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
|
||||
|
||||
if (rows == NULL) {
|
||||
int32_t k = 1;
|
||||
}
|
||||
while (rows == NULL && tscProjectionQueryOnSTable(pCmd, 0)) {
|
||||
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
|
||||
|
||||
|
@ -648,6 +653,17 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
|
|||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// current subclause is completed, try the next subclause
|
||||
if (rows == NULL && pCmd->clauseIndex < pCmd->numOfClause - 1) {
|
||||
pSql->cmd.command = TSDB_SQL_SELECT;
|
||||
pCmd->clauseIndex++;
|
||||
|
||||
assert(pSql->fp == NULL);
|
||||
tscProcessSql(pSql);
|
||||
|
||||
rows = taos_fetch_row_impl(res);
|
||||
}
|
||||
|
||||
return rows;
|
||||
}
|
||||
|
|
|
@ -1591,17 +1591,17 @@ SMeterMetaInfo* tscGetMeterMetaInfoFromQueryInfo(SQueryInfo* pQueryInfo, int32_t
|
|||
}
|
||||
|
||||
SQueryInfo* tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t subClauseIndex) {
|
||||
if (pCmd->pQueryInfo == NULL) {
|
||||
assert(pCmd != NULL && subClauseIndex >= 0 && subClauseIndex < TSDB_MAX_UNION_CLAUSE);
|
||||
|
||||
if (pCmd->pQueryInfo == NULL || subClauseIndex >= pCmd->numOfClause) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
assert(pCmd != NULL && subClauseIndex >= 0 && subClauseIndex < pCmd->numOfClause);
|
||||
return pCmd->pQueryInfo[subClauseIndex];
|
||||
}
|
||||
|
||||
int32_t tscGetQueryInfoDetailSafely(SSqlCmd *pCmd, int32_t subClauseIndex, SQueryInfo** pQueryInfo) {
|
||||
int32_t ret = TSDB_CODE_SUCCESS;
|
||||
assert(subClauseIndex >= 0 && subClauseIndex < TSDB_MAX_UNION_CLAUSE);
|
||||
|
||||
*pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex);
|
||||
|
||||
|
|
|
@ -506,7 +506,6 @@ typedef struct {
|
|||
uint64_t uid;
|
||||
TSKEY skey;
|
||||
TSKEY ekey;
|
||||
int32_t num;
|
||||
|
||||
int16_t order;
|
||||
int16_t orderColId;
|
||||
|
|
|
@ -229,7 +229,6 @@ typedef struct _qinfo {
|
|||
int killed;
|
||||
struct _qinfo *prev, *next;
|
||||
SQuery query;
|
||||
int num;
|
||||
int totalPoints;
|
||||
int pointsRead;
|
||||
int pointsReturned;
|
||||
|
|
|
@ -637,7 +637,6 @@ void *vnodeQueryOnSingleTable(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE
|
|||
pQuery->lastKey = pQuery->skey;
|
||||
|
||||
pQInfo->fp = pQueryFunc[pQueryMsg->order];
|
||||
pQInfo->num = pQueryMsg->num;
|
||||
|
||||
if (sem_init(&(pQInfo->dataReady), 0, 0) != 0) {
|
||||
dError("QInfo:%p vid:%d sid:%d meterId:%s, init dataReady sem failed, reason:%s", pQInfo, pMeterObj->vnode,
|
||||
|
@ -737,7 +736,6 @@ void *vnodeQueryOnMultiMeters(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE
|
|||
pQuery->ekey = pQueryMsg->ekey;
|
||||
|
||||
pQInfo->fp = pQueryFunc[pQueryMsg->order];
|
||||
pQInfo->num = pQueryMsg->num;
|
||||
|
||||
if (sem_init(&(pQInfo->dataReady), 0, 0) != 0) {
|
||||
dError("QInfo:%p vid:%d sid:%d id:%s, init dataReady sem failed, reason:%s", pQInfo, pMetersObj[0]->vnode,
|
||||
|
@ -976,8 +974,6 @@ int32_t vnodeConvertQueryMeterMsg(SQueryMeterMsg *pQueryMsg) {
|
|||
pQueryMsg->ekey = htobe64(pQueryMsg->ekey);
|
||||
#endif
|
||||
|
||||
pQueryMsg->num = htonl(pQueryMsg->num);
|
||||
|
||||
pQueryMsg->order = htons(pQueryMsg->order);
|
||||
pQueryMsg->orderColId = htons(pQueryMsg->orderColId);
|
||||
|
||||
|
|
Loading…
Reference in New Issue