[td-225] fix bugs in group by normal columns
This commit is contained in:
parent
21d8e66d4b
commit
a3b1dbb845
|
@ -191,14 +191,14 @@ typedef struct SDataBlockList { // todo remove
|
|||
} SDataBlockList;
|
||||
|
||||
typedef struct SQueryInfo {
|
||||
int16_t command; // the command may be different for each subclause, so keep it seperately.
|
||||
uint32_t type; // query/insert/import type
|
||||
char slidingTimeUnit;
|
||||
|
||||
STimeWindow window;
|
||||
int64_t intervalTime; // aggregation time interval
|
||||
int64_t slidingTime; // sliding window in mseconds
|
||||
SSqlGroupbyExpr groupbyExpr; // group by tags info
|
||||
int16_t command; // the command may be different for each subclause, so keep it seperately.
|
||||
uint32_t type; // query/insert/import type
|
||||
char slidingTimeUnit;
|
||||
|
||||
STimeWindow window;
|
||||
int64_t intervalTime; // aggregation time interval
|
||||
int64_t slidingTime; // sliding window in mseconds
|
||||
SSqlGroupbyExpr groupbyExpr; // group by tags info
|
||||
|
||||
SArray * colList; // SArray<SColumn*>
|
||||
SFieldInfo fieldsInfo;
|
||||
|
@ -207,11 +207,11 @@ typedef struct SQueryInfo {
|
|||
SLimitVal slimit;
|
||||
STagCond tagCond;
|
||||
SOrderVal order;
|
||||
int16_t fillType; // interpolate type
|
||||
int16_t fillType; // final result fill type
|
||||
int16_t numOfTables;
|
||||
STableMetaInfo **pTableMetaInfo;
|
||||
struct STSBuf * tsBuf;
|
||||
int64_t * fillVal; // default value for interpolation
|
||||
int64_t * fillVal; // default value for fill
|
||||
char * msg; // pointer to the pCmd->payload to keep error message temporarily
|
||||
int64_t clauseLimit; // limit for current sub clause
|
||||
|
||||
|
@ -222,15 +222,15 @@ typedef struct SQueryInfo {
|
|||
typedef struct {
|
||||
int command;
|
||||
uint8_t msgType;
|
||||
|
||||
bool autoCreated; // if the table is missing, on-the-fly create it. during getmeterMeta
|
||||
int8_t dataSourceType; // load data from file or not
|
||||
bool autoCreated; // if the table is missing, on-the-fly create it. during getmeterMeta
|
||||
int8_t dataSourceType; // load data from file or not
|
||||
|
||||
union {
|
||||
int32_t count;
|
||||
int32_t numOfTablesInSubmit;
|
||||
};
|
||||
|
||||
int32_t insertType;
|
||||
int32_t clauseIndex; // index of multiple subclause query
|
||||
int8_t parseFinished;
|
||||
short numOfCols;
|
||||
|
@ -239,14 +239,12 @@ typedef struct {
|
|||
int32_t payloadLen;
|
||||
SQueryInfo **pQueryInfo;
|
||||
int32_t numOfClause;
|
||||
char * curSql; // current sql, resume position of sql after parsing paused
|
||||
void * pTableList; // referred table involved in sql
|
||||
int32_t batchSize; // for parameter ('?') binding and batch processing
|
||||
int32_t numOfParams;
|
||||
|
||||
SDataBlockList *pDataBlocks; // submit data blocks after parsing sql
|
||||
char * curSql; // current sql, resume position of sql after parsing paused
|
||||
void * pTableList; // referred table involved in sql
|
||||
|
||||
// for parameter ('?') binding and batch processing
|
||||
int32_t batchSize;
|
||||
int32_t numOfParams;
|
||||
} SSqlCmd;
|
||||
|
||||
typedef struct SResRec {
|
||||
|
@ -316,7 +314,6 @@ typedef struct SSqlObj {
|
|||
SRpcIpSet ipList;
|
||||
char freed : 4;
|
||||
char listed : 4;
|
||||
uint32_t insertType;
|
||||
tsem_t rspSem;
|
||||
SSqlCmd cmd;
|
||||
SSqlRes res;
|
||||
|
|
|
@ -1293,7 +1293,7 @@ static void max_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
|||
minMax_function_f(pCtx, index, 0);
|
||||
|
||||
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
if (pResInfo->hasResult == DATA_SET_FLAG) {
|
||||
if (pResInfo->hasResult == DATA_SET_FLAG && pResInfo->superTableQ) {
|
||||
char *flag = pCtx->aOutputBuf + pCtx->inputBytes;
|
||||
*flag = DATA_SET_FLAG;
|
||||
}
|
||||
|
@ -1309,7 +1309,7 @@ static void min_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
|||
minMax_function_f(pCtx, index, 1);
|
||||
|
||||
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
if (pResInfo->hasResult == DATA_SET_FLAG) {
|
||||
if (pResInfo->hasResult == DATA_SET_FLAG && pResInfo->superTableQ) {
|
||||
char *flag = pCtx->aOutputBuf + pCtx->inputBytes;
|
||||
*flag = DATA_SET_FLAG;
|
||||
}
|
||||
|
|
|
@ -1314,7 +1314,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
|
|||
tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo);
|
||||
|
||||
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT);
|
||||
TSDB_QUERY_SET_TYPE(pQueryInfo->type, pSql->insertType);
|
||||
TSDB_QUERY_SET_TYPE(pQueryInfo->type, pCmd->insertType);
|
||||
|
||||
sToken = tStrGetToken(pSql->sqlstr, &index, false, 0, NULL);
|
||||
if (sToken.type != TK_INTO) {
|
||||
|
@ -1342,7 +1342,7 @@ int tsParseSql(SSqlObj *pSql, bool initialParse) {
|
|||
* Set the fp before parse the sql string, in case of getTableMeta failed, in which
|
||||
* the error handle callback function can rightfully restore the user-defined callback function (fp).
|
||||
*/
|
||||
if (initialParse && (pSql->insertType != TSDB_QUERY_TYPE_STMT_INSERT)) {
|
||||
if (initialParse && (pSql->cmd.insertType != TSDB_QUERY_TYPE_STMT_INSERT)) {
|
||||
pSql->fetchFp = pSql->fp;
|
||||
pSql->fp = (void(*)())tscHandleMultivnodeInsert;
|
||||
}
|
||||
|
@ -1354,9 +1354,7 @@ int tsParseSql(SSqlObj *pSql, bool initialParse) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
SSqlInfo SQLInfo = {0};
|
||||
tSQLParse(&SQLInfo, pSql->sqlstr);
|
||||
|
||||
SSqlInfo SQLInfo = qSQLParse(pSql->sqlstr);
|
||||
ret = tscToSQLCmd(pSql, &SQLInfo);
|
||||
SQLInfoDestroy(&SQLInfo);
|
||||
}
|
||||
|
|
|
@ -451,7 +451,7 @@ static int insertStmtExecute(STscStmt* stmt) {
|
|||
|
||||
pRes->qhandle = 0;
|
||||
|
||||
pSql->insertType = 0;
|
||||
pSql->cmd.insertType = 0;
|
||||
pSql->fetchFp = waitForQueryRsp;
|
||||
pSql->fp = (void(*)())tscHandleMultivnodeInsert;
|
||||
|
||||
|
@ -515,7 +515,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
|
|||
SSqlRes *pRes = &pSql->res;
|
||||
pSql->param = (void*) pSql;
|
||||
pSql->fp = waitForQueryRsp;
|
||||
pSql->insertType = TSDB_QUERY_TYPE_STMT_INSERT;
|
||||
pSql->cmd.insertType = TSDB_QUERY_TYPE_STMT_INSERT;
|
||||
|
||||
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
|
||||
tscError("%p failed to malloc payload buffer", pSql);
|
||||
|
|
|
@ -517,6 +517,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
pSql->cmd.parseFinished = true;
|
||||
return TSDB_CODE_SUCCESS; // do not build query message here
|
||||
}
|
||||
|
||||
|
@ -542,6 +543,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
|||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), "not support sql expression");
|
||||
}
|
||||
|
||||
pSql->cmd.parseFinished = true;
|
||||
return tscBuildMsg[pCmd->command](pSql, pInfo);
|
||||
}
|
||||
|
||||
|
|
|
@ -1185,7 +1185,9 @@ bool needToMerge(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer, tFilePage
|
|||
int32_t ret = 0; // merge all result by default
|
||||
|
||||
int16_t functionId = pLocalReducer->pCtx[0].functionId;
|
||||
if (functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_ARITHM) { // column projection query
|
||||
|
||||
// todo opt performance
|
||||
if ((/*functionId == TSDB_FUNC_PRJ || */functionId == TSDB_FUNC_ARITHM) || (tscIsProjectionQueryOnSTable(pQueryInfo, 0))) { // column projection query
|
||||
ret = 1; // disable merge procedure
|
||||
} else {
|
||||
tOrderDescriptor *pDesc = pLocalReducer->pDesc;
|
||||
|
|
|
@ -505,10 +505,9 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
|
|||
|
||||
tsem_init(&pSql->rspSem, 0, 0);
|
||||
|
||||
SSqlInfo SQLInfo = {0};
|
||||
tSQLParse(&SQLInfo, pSql->sqlstr);
|
||||
|
||||
SSqlInfo SQLInfo = qSQLParse(pSql->sqlstr);
|
||||
tscResetSqlCmdObj(&pSql->cmd);
|
||||
|
||||
ret = tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE);
|
||||
if (TSDB_CODE_SUCCESS != ret) {
|
||||
setErrorInfo(pSql, ret, NULL);
|
||||
|
|
|
@ -322,7 +322,7 @@ enum {
|
|||
#define NORMAL_ARITHMETIC 1
|
||||
#define AGG_ARIGHTMEIC 2
|
||||
|
||||
int32_t tSQLParse(SSqlInfo *pSQLInfo, const char *pSql);
|
||||
SSqlInfo qSQLParse(const char *str);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -373,7 +373,6 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin
|
|||
SPosInfo pos = {-1, -1};
|
||||
createQueryResultInfo(pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, &pos);
|
||||
}
|
||||
|
||||
pWindowResInfo->capacity = newCap;
|
||||
}
|
||||
|
||||
|
@ -1566,11 +1565,6 @@ static bool isFirstLastRowQuery(SQuery *pQuery) {
|
|||
return false;
|
||||
}
|
||||
|
||||
static UNUSED_FUNC bool notHasQueryTimeRange(SQuery *pQuery) {
|
||||
return (pQuery->window.skey == 0 && pQuery->window.ekey == INT64_MAX && QUERY_IS_ASC_QUERY(pQuery)) ||
|
||||
(pQuery->window.skey == INT64_MAX && pQuery->window.ekey == 0 && (!QUERY_IS_ASC_QUERY(pQuery)));
|
||||
}
|
||||
|
||||
static bool needReverseScan(SQuery *pQuery) {
|
||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
|
||||
|
@ -1768,61 +1762,6 @@ static void changeExecuteScanOrder(SQuery *pQuery, bool stableQuery) {
|
|||
}
|
||||
}
|
||||
|
||||
static UNUSED_FUNC void doSetInterpVal(SQLFunctionCtx *pCtx, TSKEY ts, int16_t type, int32_t index, char *data) {
|
||||
assert(pCtx->param[index].pz == NULL);
|
||||
|
||||
int32_t len = 0;
|
||||
size_t t = 0;
|
||||
|
||||
if (type == TSDB_DATA_TYPE_BINARY) {
|
||||
t = strlen(data);
|
||||
|
||||
len = t + 1 + TSDB_KEYSIZE;
|
||||
pCtx->param[index].pz = calloc(1, len);
|
||||
} else if (type == TSDB_DATA_TYPE_NCHAR) {
|
||||
t = wcslen((const wchar_t *)data);
|
||||
|
||||
len = (t + 1) * TSDB_NCHAR_SIZE + TSDB_KEYSIZE;
|
||||
pCtx->param[index].pz = calloc(1, len);
|
||||
} else {
|
||||
len = TSDB_KEYSIZE * 2;
|
||||
pCtx->param[index].pz = malloc(len);
|
||||
}
|
||||
|
||||
pCtx->param[index].nType = TSDB_DATA_TYPE_BINARY;
|
||||
|
||||
char *z = pCtx->param[index].pz;
|
||||
*(TSKEY *)z = ts;
|
||||
z += TSDB_KEYSIZE;
|
||||
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
*(double *)z = GET_FLOAT_VAL(data);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
*(double *)z = GET_DOUBLE_VAL(data);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
*(int64_t *)z = GET_INT64_VAL(data);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
strncpy(z, data, t);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_NCHAR: {
|
||||
wcsncpy((wchar_t *)z, (const wchar_t *)data, t);
|
||||
} break;
|
||||
default:
|
||||
assert(0);
|
||||
}
|
||||
|
||||
pCtx->param[index].nLen = len;
|
||||
}
|
||||
|
||||
static int32_t getInitialPageNum(SQInfo *pQInfo) {
|
||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||
int32_t INITIAL_RESULT_ROWS_VALUE = 16;
|
||||
|
@ -4071,45 +4010,19 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool
|
|||
initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, rows, 4096, type);
|
||||
}
|
||||
|
||||
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
||||
|
||||
/*
|
||||
* in case of last_row query without query range, we set the query timestamp to be
|
||||
* STable->lastKey. Otherwise, keep the initial query time range unchanged.
|
||||
*/
|
||||
// if (isFirstLastRowQuery(pQuery)) {
|
||||
// if (!normalizeUnBoundLastRowQuery(pQInfo, &interpInfo)) {
|
||||
// sem_post(&pQInfo->dataReady);
|
||||
// pointInterpSupporterDestroy(&interpInfo);
|
||||
// return TSDB_CODE_SUCCESS;
|
||||
// }
|
||||
// }
|
||||
|
||||
if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) {
|
||||
SFillColInfo* pColInfo = taosCreateFillColInfo(pQuery);
|
||||
pRuntimeEnv->pFillInfo = taosInitFillInfo(pQuery->order.order, 0, 0, pQuery->rec.capacity, pQuery->numOfOutput,
|
||||
pQuery->slidingTime, pQuery->fillType, pColInfo);
|
||||
}
|
||||
|
||||
|
||||
// todo refactor
|
||||
pRuntimeEnv->topBotQuery = isTopBottomQuery(pQuery);
|
||||
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static UNUSED_FUNC bool isGroupbyEachTable(SSqlGroupbyExpr *pGroupbyExpr, STableGroupInfo *pSidset) {
|
||||
if (pGroupbyExpr == NULL || pGroupbyExpr->numOfGroupCols == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pGroupbyExpr->numOfGroupCols; ++i) {
|
||||
SColIndex* pColIndex = taosArrayGet(pGroupbyExpr->columnInfo, i);
|
||||
if (pColIndex->flag == TSDB_COL_TAG) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
|
|
|
@ -26,16 +26,18 @@
|
|||
#include "tstrbuild.h"
|
||||
#include "queryLog.h"
|
||||
|
||||
int32_t tSQLParse(SSqlInfo *pSQLInfo, const char *pStr) {
|
||||
SSqlInfo qSQLParse(const char *pStr) {
|
||||
void *pParser = ParseAlloc(malloc);
|
||||
pSQLInfo->valid = true;
|
||||
|
||||
SSqlInfo sqlInfo = {0};
|
||||
sqlInfo.valid = true;
|
||||
|
||||
int32_t i = 0;
|
||||
while (1) {
|
||||
SSQLToken t0 = {0};
|
||||
|
||||
if (pStr[i] == 0) {
|
||||
Parse(pParser, 0, t0, pSQLInfo);
|
||||
Parse(pParser, 0, t0, &sqlInfo);
|
||||
goto abort_parse;
|
||||
}
|
||||
|
||||
|
@ -49,19 +51,19 @@ int32_t tSQLParse(SSqlInfo *pSQLInfo, const char *pStr) {
|
|||
break;
|
||||
}
|
||||
case TK_SEMI: {
|
||||
Parse(pParser, 0, t0, pSQLInfo);
|
||||
Parse(pParser, 0, t0, &sqlInfo);
|
||||
goto abort_parse;
|
||||
}
|
||||
|
||||
case TK_QUESTION:
|
||||
case TK_ILLEGAL: {
|
||||
snprintf(pSQLInfo->pzErrMsg, tListLen(pSQLInfo->pzErrMsg), "unrecognized token: \"%s\"", t0.z);
|
||||
pSQLInfo->valid = false;
|
||||
snprintf(sqlInfo.pzErrMsg, tListLen(sqlInfo.pzErrMsg), "unrecognized token: \"%s\"", t0.z);
|
||||
sqlInfo.valid = false;
|
||||
goto abort_parse;
|
||||
}
|
||||
default:
|
||||
Parse(pParser, t0.type, t0, pSQLInfo);
|
||||
if (pSQLInfo->valid == false) {
|
||||
Parse(pParser, t0.type, t0, &sqlInfo);
|
||||
if (sqlInfo.valid == false) {
|
||||
goto abort_parse;
|
||||
}
|
||||
}
|
||||
|
@ -69,7 +71,7 @@ int32_t tSQLParse(SSqlInfo *pSQLInfo, const char *pStr) {
|
|||
|
||||
abort_parse:
|
||||
ParseFree(pParser, free);
|
||||
return 0;
|
||||
return sqlInfo;
|
||||
}
|
||||
|
||||
tSQLExprList *tSQLExprListAppend(tSQLExprList *pList, tSQLExpr *pNode, SSQLToken *pToken) {
|
||||
|
|
|
@ -180,7 +180,7 @@ if $data03 != 0 then
|
|||
endi
|
||||
|
||||
print $data04
|
||||
if $data04 != 0.0000 then
|
||||
if $data04 != 0.00000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
@ -201,7 +201,8 @@ if $data13 != 1 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
if $data14 != 1.0000 then
|
||||
if $data14 != 1.00000 then
|
||||
print expect 1.00000, actual:$data14
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
@ -345,6 +346,19 @@ if $data94 != 9 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql select c1,sum(c1),avg(c1),count(*) from group_mt0 where c1<5 group by c1;
|
||||
if $row != 5 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != 0 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 800 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select first(c1), last(ts), first(ts), last(c1),sum(c1),avg(c1),count(*) from group_mt0 where c1<20 group by tbname,c1;
|
||||
if $row != 160 then
|
||||
return -1
|
||||
|
|
Loading…
Reference in New Issue