commit
478b1a0729
|
@ -168,9 +168,10 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// (uid, tid) + VGID + TAGSIZE + VARSTR_HEADER_SIZE
|
||||
if (functionId == TSDB_FUNC_TID_TAG) { // todo use struct
|
||||
*type = TSDB_DATA_TYPE_BINARY;
|
||||
*bytes = dataBytes + sizeof(int64_t) + sizeof(int32_t) + sizeof(int32_t); // (uid, tid) + VGID + TAGSIZE
|
||||
*bytes = dataBytes + sizeof(int64_t) + sizeof(int32_t) + sizeof(int32_t) + VARSTR_HEADER_SIZE;
|
||||
*interBytes = *bytes;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -5285,10 +5286,10 @@ SQLAggFuncElem aAggs[] = {{
|
|||
},
|
||||
{
|
||||
// 34
|
||||
"tid_tag", // return table id and the corresponding tags for join match
|
||||
"tid_tag", // return table id and the corresponding tags for join match and subscribe
|
||||
TSDB_FUNC_TID_TAG,
|
||||
TSDB_FUNC_TID_TAG,
|
||||
TSDB_FUNCSTATE_MO,
|
||||
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE,
|
||||
function_setup,
|
||||
noop1,
|
||||
noop2,
|
||||
|
|
|
@ -1123,7 +1123,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
|
|||
if (addProjectionExprAndResultField(pQueryInfo, pItem) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_INVALID_SQL;
|
||||
}
|
||||
} else if (pItem->pNode->nSQLOptr >= TK_COUNT && pItem->pNode->nSQLOptr <= TK_AVG_IRATE) {
|
||||
} else if (pItem->pNode->nSQLOptr >= TK_COUNT && pItem->pNode->nSQLOptr <= TK_TBID) {
|
||||
// sql function in selection clause, append sql function info in pSqlCmd structure sequentially
|
||||
if (addExprAndResultField(pQueryInfo, outputIndex, pItem, true) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_INVALID_SQL;
|
||||
|
@ -1468,7 +1468,8 @@ static int32_t setExprInfoForFunctions(SQueryInfo* pQueryInfo, SSchema* pSchema,
|
|||
|
||||
int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExprItem* pItem, bool finalResult) {
|
||||
STableMetaInfo* pTableMetaInfo = NULL;
|
||||
int32_t optr = pItem->pNode->nSQLOptr;
|
||||
|
||||
int32_t optr = pItem->pNode->nSQLOptr;
|
||||
|
||||
const char* msg1 = "not support column types";
|
||||
const char* msg2 = "invalid parameters";
|
||||
|
@ -1476,7 +1477,8 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr
|
|||
const char* msg4 = "invalid table name";
|
||||
const char* msg5 = "parameter is out of range [0, 100]";
|
||||
const char* msg6 = "function applied to tags not allowed";
|
||||
|
||||
const char* msg7 = "normal table can not apply this function";
|
||||
|
||||
switch (optr) {
|
||||
case TK_COUNT: {
|
||||
if (pItem->pNode->pParam != NULL && pItem->pNode->pParam->nExpr != 1) {
|
||||
|
@ -1858,13 +1860,68 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr
|
|||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
};
|
||||
|
||||
case TK_TBID: {
|
||||
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
|
||||
return invalidSqlErrMsg(pQueryInfo->msg, msg7);
|
||||
}
|
||||
|
||||
// no parameters or more than one parameter for function
|
||||
if (pItem->pNode->pParam == NULL || pItem->pNode->pParam->nExpr != 1) {
|
||||
return invalidSqlErrMsg(pQueryInfo->msg, msg2);
|
||||
}
|
||||
|
||||
tSQLExpr* pParam = pItem->pNode->pParam->a[0].pNode;
|
||||
|
||||
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
|
||||
if (getColumnIndexByName(&pParam->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
|
||||
return invalidSqlErrMsg(pQueryInfo->msg, msg3);
|
||||
}
|
||||
|
||||
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
|
||||
SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
|
||||
|
||||
// functions can not be applied to normal columns
|
||||
int32_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
|
||||
if (index.columnIndex < numOfCols) {
|
||||
return invalidSqlErrMsg(pQueryInfo->msg, msg6);
|
||||
}
|
||||
|
||||
index.columnIndex -= numOfCols;
|
||||
|
||||
// 2. valid the column type
|
||||
int16_t colType = pSchema[index.columnIndex].type;
|
||||
if (colType == TSDB_DATA_TYPE_BOOL || colType >= TSDB_DATA_TYPE_BINARY) {
|
||||
return invalidSqlErrMsg(pQueryInfo->msg, msg1);
|
||||
}
|
||||
|
||||
tscColumnListInsert(pTableMetaInfo->tagColList, &index);
|
||||
SSchema* pTagSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
|
||||
SSchema s = pTagSchema[index.columnIndex];
|
||||
|
||||
int16_t bytes = 0;
|
||||
int16_t type = 0;
|
||||
int16_t inter = 0;
|
||||
|
||||
int32_t ret = getResultDataInfo(s.type, s.bytes, TSDB_FUNC_TID_TAG, 0, &type, &bytes, &inter, 0, 0);
|
||||
assert(ret == TSDB_CODE_SUCCESS);
|
||||
|
||||
s.type = type;
|
||||
s.bytes = bytes;
|
||||
|
||||
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY);
|
||||
tscAddSpecialColumnForSelect(pQueryInfo, 0, TSDB_FUNC_TID_TAG, &index, &s, TSDB_COL_TAG);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
default:
|
||||
return TSDB_CODE_INVALID_SQL;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
// todo refactor
|
||||
|
|
|
@ -412,10 +412,6 @@ static void updateQueryTimeRange(SQueryInfo* pQueryInfo, int64_t st, int64_t et)
|
|||
|
||||
static void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj* pSql) {
|
||||
SSqlObj* pParentSql = pSupporter->pObj;
|
||||
// SSqlCmd* pCmd = &pSql->cmd;
|
||||
// SSqlRes* pRes = &pSql->res;
|
||||
|
||||
// SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||
SQueryInfo* pParentQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex);
|
||||
|
||||
// if (tscNonOrderedProjectionQueryOnSTable(pParentQueryInfo, 0)) {
|
||||
|
@ -602,21 +598,6 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
|
|||
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||
|
||||
// if (pSupporter->pState->code != TSDB_CODE_SUCCESS) {
|
||||
// tscError("%p abort query due to other subquery failure. code:%d, global code:%s", pSql, numOfRows,
|
||||
// tstrerror(pSupporter->pState->code));
|
||||
//
|
||||
// quitAllSubquery(pParentSql, pSupporter);
|
||||
// return;
|
||||
// }
|
||||
//
|
||||
// if (numOfRows < 0) {
|
||||
// tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex);
|
||||
// pSupporter->pState->code = numOfRows;
|
||||
// quitAllSubquery(pParentSql, pSupporter);
|
||||
// return;
|
||||
// }
|
||||
|
||||
// response of tag retrieve
|
||||
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) {
|
||||
if (numOfRows == 0 || pSql->res.completed) {
|
||||
|
|
|
@ -206,19 +206,20 @@
|
|||
#define TK_SUM_IRATE 188
|
||||
#define TK_AVG_RATE 189
|
||||
#define TK_AVG_IRATE 190
|
||||
#define TK_SEMI 191
|
||||
#define TK_NONE 192
|
||||
#define TK_PREV 193
|
||||
#define TK_LINEAR 194
|
||||
#define TK_IMPORT 195
|
||||
#define TK_METRIC 196
|
||||
#define TK_TBNAME 197
|
||||
#define TK_JOIN 198
|
||||
#define TK_METRICS 199
|
||||
#define TK_STABLE 200
|
||||
#define TK_INSERT 201
|
||||
#define TK_INTO 202
|
||||
#define TK_VALUES 203
|
||||
#define TK_TBID 191
|
||||
#define TK_SEMI 192
|
||||
#define TK_NONE 193
|
||||
#define TK_PREV 194
|
||||
#define TK_LINEAR 195
|
||||
#define TK_IMPORT 196
|
||||
#define TK_METRIC 197
|
||||
#define TK_TBNAME 198
|
||||
#define TK_JOIN 199
|
||||
#define TK_METRICS 200
|
||||
#define TK_STABLE 201
|
||||
#define TK_INSERT 202
|
||||
#define TK_INTO 203
|
||||
#define TK_VALUES 204
|
||||
|
||||
#endif
|
||||
|
||||
|
|
|
@ -655,5 +655,5 @@ cmd ::= KILL QUERY IPTOKEN(X) COLON(Z) INTEGER(Y) COLON(K) INTEGER(F). {X
|
|||
DELIMITERS DESC DETACH EACH END EXPLAIN FAIL FOR GLOB IGNORE IMMEDIATE INITIALLY INSTEAD
|
||||
LIKE MATCH KEY OF OFFSET RAISE REPLACE RESTRICT ROW STATEMENT TRIGGER VIEW ALL
|
||||
COUNT SUM AVG MIN MAX FIRST LAST TOP BOTTOM STDDEV PERCENTILE APERCENTILE LEASTSQUARES HISTOGRAM DIFF
|
||||
SPREAD TWA INTERP LAST_ROW RATE IRATE SUM_RATE SUM_IRATE AVG_RATE AVG_IRATE NOW IPTOKEN SEMI NONE PREV LINEAR IMPORT
|
||||
SPREAD TWA INTERP LAST_ROW RATE IRATE SUM_RATE SUM_IRATE AVG_RATE AVG_IRATE TBID NOW IPTOKEN SEMI NONE PREV LINEAR IMPORT
|
||||
METRIC TBNAME JOIN METRICS STABLE NULL INSERT INTO VALUES.
|
||||
|
|
|
@ -245,21 +245,8 @@ enum {
|
|||
BLK_DATA_ALL_NEEDED = 0x3,
|
||||
};
|
||||
|
||||
#define IS_FILE_BLOCK(x) (((x)&BLK_FILE_BLOCK) != 0)
|
||||
|
||||
#define SET_FILE_BLOCK_FLAG(x) \
|
||||
do { \
|
||||
(x) &= (~BLK_CACHE_BLOCK); \
|
||||
(x) |= BLK_FILE_BLOCK; \
|
||||
} while (0);
|
||||
|
||||
#define SET_CACHE_BLOCK_FLAG(x) ((x) = BLK_CACHE_BLOCK | BLK_BLOCK_LOADED);
|
||||
|
||||
#define SET_DATA_BLOCK_NOT_LOADED(x) ((x) &= (~BLK_BLOCK_LOADED));
|
||||
|
||||
#define SET_DATA_BLOCK_LOADED(x) ((x) |= BLK_BLOCK_LOADED);
|
||||
#define IS_DATA_BLOCK_LOADED(x) (((x)&BLK_BLOCK_LOADED) != 0)
|
||||
|
||||
typedef struct STwaInfo {
|
||||
TSKEY lastKey;
|
||||
int8_t hasResult; // flag to denote has value
|
||||
|
@ -291,7 +278,6 @@ bool top_bot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, char *mi
|
|||
|
||||
bool stableQueryFunctChanged(int32_t funcId);
|
||||
|
||||
|
||||
void resetResultInfo(SResultInfo *pResInfo);
|
||||
void initResultInfo(SResultInfo *pResInfo);
|
||||
void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable);
|
||||
|
|
|
@ -225,6 +225,7 @@ static SKeyword keywordTable[] = {
|
|||
{"TBNAME", TK_TBNAME},
|
||||
{"JOIN", TK_JOIN},
|
||||
{"METRICS", TK_METRICS},
|
||||
{"TBID", TK_TBID},
|
||||
{"STABLE", TK_STABLE},
|
||||
{"FILE", TK_FILE},
|
||||
{"VNODES", TK_VNODES},
|
||||
|
|
|
@ -39,10 +39,10 @@
|
|||
#define TSDB_COL_IS_TAG(f) (((f)&TSDB_COL_TAG) != 0)
|
||||
#define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP)
|
||||
|
||||
#define IS_MASTER_SCAN(runtime) (((runtime)->scanFlag & 1u) == MASTER_SCAN)
|
||||
#define IS_SUPPLEMENT_SCAN(runtime) ((runtime)->scanFlag == SUPPLEMENTARY_SCAN)
|
||||
#define SET_SUPPLEMENT_SCAN_FLAG(runtime) ((runtime)->scanFlag = SUPPLEMENTARY_SCAN)
|
||||
#define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN)
|
||||
#define IS_MASTER_SCAN(runtime) ((runtime)->scanFlag == MASTER_SCAN)
|
||||
#define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == SUPPLEMENTARY_SCAN)
|
||||
#define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN)
|
||||
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = SUPPLEMENTARY_SCAN)
|
||||
|
||||
#define GET_QINFO_ADDR(x) ((void *)((char *)(x)-offsetof(SQInfo, runtimeEnv)))
|
||||
|
||||
|
@ -1101,7 +1101,7 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
|
|||
}
|
||||
|
||||
// in the supplementary scan, only the following functions need to be executed
|
||||
if (IS_SUPPLEMENT_SCAN(pRuntimeEnv) &&
|
||||
if (IS_REVERSE_SCAN(pRuntimeEnv) &&
|
||||
!(functionId == TSDB_FUNC_LAST_DST || functionId == TSDB_FUNC_FIRST_DST || functionId == TSDB_FUNC_FIRST ||
|
||||
functionId == TSDB_FUNC_LAST || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TS)) {
|
||||
return false;
|
||||
|
@ -2450,8 +2450,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
qTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d",
|
||||
GET_QINFO_ADDR(pRuntimeEnv), pQuery->window.skey, pQuery->window.ekey, pQuery->lastKey, pQuery->order.order);
|
||||
|
||||
TsdbQueryHandleT pQueryHandle =
|
||||
pRuntimeEnv->scanFlag == MASTER_SCAN ? pRuntimeEnv->pQueryHandle : pRuntimeEnv->pSecQueryHandle;
|
||||
TsdbQueryHandleT pQueryHandle = IS_MASTER_SCAN(pRuntimeEnv)? pRuntimeEnv->pQueryHandle : pRuntimeEnv->pSecQueryHandle;
|
||||
while (tsdbNextDataBlock(pQueryHandle)) {
|
||||
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
|
||||
return 0;
|
||||
|
@ -2835,11 +2834,12 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
|
|||
return; // failed to save data in the disk
|
||||
}
|
||||
|
||||
// set current query completed
|
||||
// if (pQInfo->numOfGroupResultPages == 0 && pQInfo->groupIndex == pQInfo->pSidSet->numOfSubSet) {
|
||||
// pQInfo->tableIndex = pQInfo->pSidSet->numOfTables;
|
||||
// return;
|
||||
// }
|
||||
// check if all results has been sent to client
|
||||
int32_t numOfGroup = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
|
||||
if (pQInfo->numOfGroupResultPages == 0 && pQInfo->groupIndex == numOfGroup) {
|
||||
pQInfo->tableIndex = pQInfo->groupInfo.numOfTables; // set query completed
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
SQueryRuntimeEnv * pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||
|
@ -3087,7 +3087,31 @@ void setTableDataInfo(STableQueryInfo *pTableQueryInfo, int32_t tableIndex, int3
|
|||
pTableQueryInfo->tableIndex = tableIndex;
|
||||
}
|
||||
|
||||
static void doDisableFunctsForSupplementaryScan(SQuery *pQuery, SWindowResInfo *pWindowResInfo, int32_t order) {
|
||||
static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo *pTableQueryInfo) {
|
||||
if (pTableQueryInfo == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
// order has change already!
|
||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||
if (!QUERY_IS_ASC_QUERY(pQuery)) {
|
||||
assert(pTableQueryInfo->win.ekey >= pTableQueryInfo->lastKey + step);
|
||||
} else {
|
||||
assert(pTableQueryInfo->win.ekey <= pTableQueryInfo->lastKey + step);
|
||||
}
|
||||
|
||||
pTableQueryInfo->win.ekey = pTableQueryInfo->lastKey + step;
|
||||
|
||||
SWAP(pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey, TSKEY);
|
||||
pTableQueryInfo->lastKey = pTableQueryInfo->win.skey;
|
||||
|
||||
SWITCH_ORDER(pTableQueryInfo->cur.order);
|
||||
pTableQueryInfo->cur.vgroupIndex = -1;
|
||||
}
|
||||
|
||||
static void disableFuncInReverseScanImpl(SQInfo* pQInfo, SWindowResInfo *pWindowResInfo, int32_t order) {
|
||||
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
|
||||
|
||||
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
|
||||
SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, i);
|
||||
if (!pStatus->closed) {
|
||||
|
@ -3108,18 +3132,32 @@ static void doDisableFunctsForSupplementaryScan(SQuery *pQuery, SWindowResInfo *
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
|
||||
|
||||
for(int32_t i = 0; i < numOfGroups; ++i) {
|
||||
SArray *group = taosArrayGetP(pQInfo->groupInfo.pGroupList, i);
|
||||
qTrace("QInfo:%p no result in group %d, continue", pQInfo, pQInfo->groupIndex - 1);
|
||||
|
||||
size_t t = taosArrayGetSize(group);
|
||||
for (int32_t j = 0; j < t; ++j) {
|
||||
SGroupItem *item = taosArrayGet(group, j);
|
||||
updateTableQueryInfoForReverseScan(pQuery, item->info);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void disableFuncInReverseScan(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||
void disableFuncInReverseScan(SQInfo *pQInfo) {
|
||||
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
int32_t order = pQuery->order.order;
|
||||
|
||||
// group by normal columns and interval query on normal table
|
||||
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
||||
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) {
|
||||
doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order);
|
||||
disableFuncInReverseScanImpl(pQInfo, pWindowResInfo, order);
|
||||
} else { // for simple result of table query,
|
||||
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
|
||||
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { // todo refactor
|
||||
int32_t functId = pQuery->pSelectExpr[j].base.functionId;
|
||||
|
||||
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[j];
|
||||
|
@ -3134,34 +3172,10 @@ void disableFuncInReverseScan(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
}
|
||||
}
|
||||
|
||||
void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) {
|
||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||
pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order) ^ 1u;
|
||||
}
|
||||
|
||||
if (isIntervalQuery(pQuery)) {
|
||||
// for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) {
|
||||
// STableQueryInfo *pTableQueryInfo = pQInfo->pTableQueryInfo[i].pTableQInfo;
|
||||
// SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo;
|
||||
//
|
||||
// doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order);
|
||||
// }
|
||||
} else {
|
||||
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
||||
doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order);
|
||||
}
|
||||
|
||||
pQuery->order.order = (pQuery->order.order) ^ 1u;
|
||||
}
|
||||
|
||||
void switchCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||
SWITCH_ORDER(pRuntimeEnv->pCtx[i]
|
||||
.order); // = (pRuntimeEnv->pCtx[i].order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
|
||||
SWITCH_ORDER(pRuntimeEnv->pCtx[i] .order);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3358,7 +3372,7 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI
|
|||
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
|
||||
|
||||
SWITCH_ORDER(pQuery->order.order);
|
||||
SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv);
|
||||
SET_REVERSE_SCAN_FLAG(pRuntimeEnv);
|
||||
|
||||
STsdbQueryCond cond = {
|
||||
.twindow = pQuery->window,
|
||||
|
@ -3376,7 +3390,7 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI
|
|||
|
||||
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
||||
switchCtxOrder(pRuntimeEnv);
|
||||
disableFuncInReverseScan(pRuntimeEnv);
|
||||
disableFuncInReverseScan(pQInfo);
|
||||
}
|
||||
|
||||
static void clearEnvAfterReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusInfo *pStatus) {
|
||||
|
@ -3533,28 +3547,6 @@ void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols)
|
|||
free(pTableQueryInfo);
|
||||
}
|
||||
|
||||
void changeMeterQueryInfoForSuppleQuery(SQuery *pQuery, STableQueryInfo *pTableQueryInfo) {
|
||||
if (pTableQueryInfo == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
// order has change already!
|
||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||
if (!QUERY_IS_ASC_QUERY(pQuery)) {
|
||||
assert(pTableQueryInfo->win.ekey >= pTableQueryInfo->lastKey + step);
|
||||
} else {
|
||||
assert(pTableQueryInfo->win.ekey <= pTableQueryInfo->lastKey + step);
|
||||
}
|
||||
|
||||
pTableQueryInfo->win.ekey = pTableQueryInfo->lastKey + step;
|
||||
|
||||
SWAP(pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey, TSKEY);
|
||||
pTableQueryInfo->lastKey = pTableQueryInfo->win.skey;
|
||||
|
||||
pTableQueryInfo->cur.order = pTableQueryInfo->cur.order ^ 1u;
|
||||
pTableQueryInfo->cur.vgroupIndex = -1;
|
||||
}
|
||||
|
||||
void restoreIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *pTableQueryInfo) {
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
|
@ -3943,9 +3935,16 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
|
|||
data += bytes * numOfRows;
|
||||
}
|
||||
|
||||
|
||||
// all data returned, set query over
|
||||
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
|
||||
setQueryStatus(pQuery, QUERY_OVER);
|
||||
if (pQInfo->runtimeEnv.stableQuery && isIntervalQuery(pQuery)) {
|
||||
if (pQInfo->tableIndex >= pQInfo->groupInfo.numOfTables) {
|
||||
setQueryStatus(pQuery, QUERY_OVER);
|
||||
}
|
||||
} else {
|
||||
setQueryStatus(pQuery, QUERY_OVER);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4368,7 +4367,8 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
|
|||
|
||||
int64_t st = taosGetTimestampMs();
|
||||
|
||||
TsdbQueryHandleT *pQueryHandle = pRuntimeEnv->pQueryHandle;
|
||||
TsdbQueryHandleT pQueryHandle = IS_MASTER_SCAN(pRuntimeEnv)? pRuntimeEnv->pQueryHandle : pRuntimeEnv->pSecQueryHandle;
|
||||
|
||||
while (tsdbNextDataBlock(pQueryHandle)) {
|
||||
if (isQueryKilled(pQInfo)) {
|
||||
break;
|
||||
|
@ -4400,7 +4400,7 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
assert(pTableQueryInfo != NULL && pTableQueryInfo != NULL);
|
||||
assert(pTableQueryInfo != NULL);
|
||||
restoreIntervalQueryRange(pRuntimeEnv, pTableQueryInfo);
|
||||
|
||||
SDataStatis *pStatis = NULL;
|
||||
|
@ -4759,28 +4759,35 @@ static void createTableQueryInfo(SQInfo *pQInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
static void prepareQueryInfoForReverseScan(SQInfo *pQInfo) {
|
||||
// SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||
|
||||
// for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) {
|
||||
// STableQueryInfo *pTableQueryInfo = pQInfo->pTableQueryInfo[i].pTableQInfo;
|
||||
// changeMeterQueryInfoForSuppleQuery(pQuery, pTableQueryInfo);
|
||||
// }
|
||||
}
|
||||
|
||||
static void doSaveContext(SQInfo *pQInfo) {
|
||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv);
|
||||
disableFuncForReverseScan(pQInfo, pQuery->order.order);
|
||||
|
||||
if (pRuntimeEnv->pTSBuf != NULL) {
|
||||
pRuntimeEnv->pTSBuf->cur.order = pRuntimeEnv->pTSBuf->cur.order ^ 1u;
|
||||
}
|
||||
|
||||
SET_REVERSE_SCAN_FLAG(pRuntimeEnv);
|
||||
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
|
||||
prepareQueryInfoForReverseScan(pQInfo);
|
||||
SWITCH_ORDER(pQuery->order.order);
|
||||
|
||||
if (pRuntimeEnv->pTSBuf != NULL) {
|
||||
pRuntimeEnv->pTSBuf->cur.order = pQuery->order.order;
|
||||
}
|
||||
|
||||
STsdbQueryCond cond = {
|
||||
.twindow = pQuery->window,
|
||||
.order = pQuery->order.order,
|
||||
.colList = pQuery->colList,
|
||||
.numOfCols = pQuery->numOfCols,
|
||||
};
|
||||
|
||||
// clean unused handle
|
||||
if (pRuntimeEnv->pSecQueryHandle != NULL) {
|
||||
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
|
||||
}
|
||||
|
||||
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableIdGroupInfo);
|
||||
|
||||
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
||||
switchCtxOrder(pRuntimeEnv);
|
||||
disableFuncInReverseScan(pQInfo);
|
||||
}
|
||||
|
||||
static void doRestoreContext(SQInfo *pQInfo) {
|
||||
|
@ -4835,8 +4842,6 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
|
|||
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
|
||||
}
|
||||
|
||||
pQuery->rec.rows += pQuery->rec.rows;
|
||||
|
||||
if (pQuery->rec.rows == 0) {
|
||||
// vnodePrintQueryStatistics(pSupporter);
|
||||
}
|
||||
|
@ -6287,7 +6292,10 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
|
|||
SGroupItem* item = taosArrayGet(pa, i);
|
||||
|
||||
char* output = pQuery->sdata[0]->data + i * rsize;
|
||||
*(int64_t*) output = item->id.uid; // memory align problem
|
||||
varDataSetLen(output, rsize - VARSTR_HEADER_SIZE);
|
||||
|
||||
output = varDataVal(output);
|
||||
*(int64_t*) output = item->id.uid; // memory align problem, todo serialize
|
||||
output += sizeof(item->id.uid);
|
||||
|
||||
*(int32_t*) output = item->id.tid;
|
||||
|
|
1348
src/query/src/sql.c
1348
src/query/src/sql.c
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue