Merge pull request #1443 from taosdata/liaohj_2
[TD-32]add super table tag filter support at vnode side.
This commit is contained in:
commit
db78012091
|
@ -177,7 +177,7 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId);
|
||||||
|
|
||||||
// get starter position of metric query condition (query on tags) in SSqlCmd.payload
|
// get starter position of metric query condition (query on tags) in SSqlCmd.payload
|
||||||
SCond* tsGetSTableQueryCondPos(STagCond* pCond, uint64_t tableIndex);
|
SCond* tsGetSTableQueryCondPos(STagCond* pCond, uint64_t tableIndex);
|
||||||
void tsSetMetricQueryCond(STagCond* pTagCond, uint64_t uid, const char* str);
|
void tsSetSTableQueryCond(STagCond* pTagCond, uint64_t uid, const char* str);
|
||||||
|
|
||||||
void tscTagCondCopy(STagCond* dest, const STagCond* src);
|
void tscTagCondCopy(STagCond* dest, const STagCond* src);
|
||||||
void tscTagCondRelease(STagCond* pCond);
|
void tscTagCondRelease(STagCond* pCond);
|
||||||
|
|
|
@ -3511,7 +3511,7 @@ int tableNameCompar(const void* lhs, const void* rhs) {
|
||||||
return ret > 0 ? 1 : -1;
|
return ret > 0 ? 1 : -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t setTableCondForMetricQuery(SQueryInfo* pQueryInfo, const char* account, tSQLExpr* pExpr,
|
static int32_t setTableCondForSTableQuery(SQueryInfo* pQueryInfo, const char* account, tSQLExpr* pExpr,
|
||||||
int16_t tableCondIndex, SStringBuilder* sb) {
|
int16_t tableCondIndex, SStringBuilder* sb) {
|
||||||
const char* msg = "table name too long";
|
const char* msg = "table name too long";
|
||||||
|
|
||||||
|
@ -3740,7 +3740,7 @@ static int32_t getTagQueryCondExpr(SQueryInfo* pQueryInfo, SCondExpr* pCondExpr,
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsSetMetricQueryCond(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->uid, c);
|
tsSetSTableQueryCond(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->uid, c);
|
||||||
|
|
||||||
doCompactQueryExpr(pExpr);
|
doCompactQueryExpr(pExpr);
|
||||||
tSQLExprDestroy(p1);
|
tSQLExprDestroy(p1);
|
||||||
|
@ -3815,7 +3815,7 @@ int32_t parseWhereClause(SQueryInfo* pQueryInfo, tSQLExpr** pExpr, SSqlObj* pSql
|
||||||
// 7. query condition for table name
|
// 7. query condition for table name
|
||||||
pQueryInfo->tagCond.relType = (condExpr.relType == TK_AND) ? TSDB_RELATION_AND : TSDB_RELATION_OR;
|
pQueryInfo->tagCond.relType = (condExpr.relType == TK_AND) ? TSDB_RELATION_AND : TSDB_RELATION_OR;
|
||||||
|
|
||||||
ret = setTableCondForMetricQuery(pQueryInfo, getAccountId(pSql), condExpr.pTableCond, condExpr.tableCondIndex, &sb);
|
ret = setTableCondForSTableQuery(pQueryInfo, getAccountId(pSql), condExpr.pTableCond, condExpr.tableCondIndex, &sb);
|
||||||
taosStringBuilderDestroy(&sb);
|
taosStringBuilderDestroy(&sb);
|
||||||
|
|
||||||
if (!validateFilterExpr(pQueryInfo)) {
|
if (!validateFilterExpr(pQueryInfo)) {
|
||||||
|
@ -5444,8 +5444,9 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
|
||||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
||||||
|
|
||||||
// two table: the first one is for current table, and the secondary is for the super table.
|
// two table: the first one is for current table, and the secondary is for the super table.
|
||||||
tscAddEmptyMetaInfo(pQueryInfo);
|
if (pQueryInfo->numOfTables < 2) {
|
||||||
assert(pQueryInfo->numOfTables == 2);
|
tscAddEmptyMetaInfo(pQueryInfo);
|
||||||
|
}
|
||||||
|
|
||||||
const int32_t TABLE_INDEX = 0;
|
const int32_t TABLE_INDEX = 0;
|
||||||
const int32_t STABLE_INDEX = 1;
|
const int32_t STABLE_INDEX = 1;
|
||||||
|
|
|
@ -601,13 +601,22 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||||
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
|
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
|
||||||
// SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta;
|
|
||||||
|
|
||||||
if (pQueryInfo->colList.numOfCols <= 0) {
|
if (pQueryInfo->colList.numOfCols <= 0) {
|
||||||
tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta));
|
tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pQueryInfo->intervalTime < 0) {
|
||||||
|
tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
|
||||||
|
tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
char *pStart = pCmd->payload + tsRpcHeadSize;
|
char *pStart = pCmd->payload + tsRpcHeadSize;
|
||||||
|
|
||||||
SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pStart;
|
SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pStart;
|
||||||
|
@ -643,8 +652,6 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
numOfTables = 1;
|
numOfTables = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pQueryMsg->numOfTables = htonl(numOfTables);
|
|
||||||
|
|
||||||
if (pQueryInfo->order.order == TSQL_SO_ASC) {
|
if (pQueryInfo->order.order == TSQL_SO_ASC) {
|
||||||
pQueryMsg->window.skey = htobe64(pQueryInfo->stime);
|
pQueryMsg->window.skey = htobe64(pQueryInfo->stime);
|
||||||
pQueryMsg->window.ekey = htobe64(pQueryInfo->etime);
|
pQueryMsg->window.ekey = htobe64(pQueryInfo->etime);
|
||||||
|
@ -653,6 +660,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
pQueryMsg->window.ekey = htobe64(pQueryInfo->stime);
|
pQueryMsg->window.ekey = htobe64(pQueryInfo->stime);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pQueryMsg->numOfTables = htonl(numOfTables);
|
||||||
pQueryMsg->order = htons(pQueryInfo->order.order);
|
pQueryMsg->order = htons(pQueryInfo->order.order);
|
||||||
pQueryMsg->orderColId = htons(pQueryInfo->order.orderColId);
|
pQueryMsg->orderColId = htons(pQueryInfo->order.orderColId);
|
||||||
pQueryMsg->interpoType = htons(pQueryInfo->interpoType);
|
pQueryMsg->interpoType = htons(pQueryInfo->interpoType);
|
||||||
|
@ -662,23 +670,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
pQueryMsg->intervalTime = htobe64(pQueryInfo->intervalTime);
|
pQueryMsg->intervalTime = htobe64(pQueryInfo->intervalTime);
|
||||||
pQueryMsg->slidingTime = htobe64(pQueryInfo->slidingTime);
|
pQueryMsg->slidingTime = htobe64(pQueryInfo->slidingTime);
|
||||||
pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
|
pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
|
||||||
|
|
||||||
if (pQueryInfo->intervalTime < 0) {
|
|
||||||
tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
|
|
||||||
tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
|
pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
|
||||||
if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { // query on meter
|
|
||||||
pQueryMsg->tagLength = 0;
|
|
||||||
} else { // query on super table
|
|
||||||
pQueryMsg->tagLength = htons(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
pQueryMsg->queryType = htons(pQueryInfo->type);
|
pQueryMsg->queryType = htons(pQueryInfo->type);
|
||||||
pQueryMsg->numOfOutputCols = htons(pQueryInfo->exprsInfo.numOfExprs);
|
pQueryMsg->numOfOutputCols = htons(pQueryInfo->exprsInfo.numOfExprs);
|
||||||
|
@ -742,7 +734,6 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
bool hasArithmeticFunction = false;
|
bool hasArithmeticFunction = false;
|
||||||
|
|
||||||
SSqlFuncExprMsg *pSqlFuncExpr = (SSqlFuncExprMsg *)pMsg;
|
SSqlFuncExprMsg *pSqlFuncExpr = (SSqlFuncExprMsg *)pMsg;
|
||||||
|
|
||||||
for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
|
for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
|
||||||
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
|
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
|
||||||
|
|
||||||
|
@ -856,6 +847,24 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
|
pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pQueryInfo->tagCond.numOfTagCond > 0) {
|
||||||
|
STagCond* pTagCond = &pQueryInfo->tagCond;
|
||||||
|
|
||||||
|
SCond *pCond = tsGetSTableQueryCondPos(pTagCond, pTableMeta->uid);
|
||||||
|
if (pCond != NULL && pCond->cond != NULL) {
|
||||||
|
size_t condLen = strlen(pCond->cond) + 1;
|
||||||
|
|
||||||
|
bool ret = taosMbsToUcs4(pCond->cond, condLen, pMsg, condLen * TSDB_NCHAR_SIZE);
|
||||||
|
if (!ret) {
|
||||||
|
tscError("%p mbs to ucs4 failed:%d", pSql, tsGetSTableQueryCondPos(pTagCond, pTableMeta->uid));
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
pQueryMsg->tagCondLen = htons(condLen);
|
||||||
|
pMsg += condLen * TSDB_NCHAR_SIZE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
msgLen = pMsg - pStart;
|
msgLen = pMsg - pStart;
|
||||||
|
|
||||||
tscTrace("%p msg built success,len:%d bytes", pSql, msgLen);
|
tscTrace("%p msg built success,len:%d bytes", pSql, msgLen);
|
||||||
|
|
|
@ -102,7 +102,8 @@ SCond* tsGetSTableQueryCondPos(STagCond* pTagCond, uint64_t uid) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsSetMetricQueryCond(STagCond* pTagCond, uint64_t uid, const char* str) {
|
// todo refactor by using SArray
|
||||||
|
void tsSetSTableQueryCond(STagCond* pTagCond, uint64_t uid, const char* str) {
|
||||||
size_t len = strlen(str);
|
size_t len = strlen(str);
|
||||||
if (len == 0) {
|
if (len == 0) {
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -474,14 +474,14 @@ typedef struct {
|
||||||
int64_t intervalOffset; // start offset for interval query
|
int64_t intervalOffset; // start offset for interval query
|
||||||
int64_t slidingTime; // value for sliding window
|
int64_t slidingTime; // value for sliding window
|
||||||
char slidingTimeUnit; // time interval type, for revisement of interval(1d)
|
char slidingTimeUnit; // time interval type, for revisement of interval(1d)
|
||||||
int16_t tagLength; // tag length in current query
|
uint16_t tagCondLen; // tag length in current query
|
||||||
int16_t numOfGroupCols; // num of group by columns
|
int16_t numOfGroupCols; // num of group by columns
|
||||||
int16_t orderByIdx;
|
int16_t orderByIdx;
|
||||||
int16_t orderType; // used in group by xx order by xxx
|
int16_t orderType; // used in group by xx order by xxx
|
||||||
uint64_t groupbyTagIds;
|
uint64_t groupbyTagIds;
|
||||||
int64_t limit;
|
int64_t limit;
|
||||||
int64_t offset;
|
int64_t offset;
|
||||||
int16_t queryType; // denote another query process
|
uint16_t queryType; // denote another query process
|
||||||
int16_t numOfOutputCols; // final output columns numbers
|
int16_t numOfOutputCols; // final output columns numbers
|
||||||
int16_t interpoType; // interpolate type
|
int16_t interpoType; // interpolate type
|
||||||
uint64_t defaultVal; // default value array list
|
uint64_t defaultVal; // default value array list
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#include <tskiplist.h>
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
|
||||||
#include "taosmsg.h"
|
#include "taosmsg.h"
|
||||||
|
@ -93,8 +94,7 @@ void tSQLBinaryExprToString(tSQLBinaryExpr *pExpr, char *dst, int32_t *len);
|
||||||
|
|
||||||
void tSQLBinaryExprDestroy(tSQLBinaryExpr **pExprs, void (*fp)(void*));
|
void tSQLBinaryExprDestroy(tSQLBinaryExpr **pExprs, void (*fp)(void*));
|
||||||
|
|
||||||
void tSQLBinaryExprTraverse(tSQLBinaryExpr *pExprs, struct tSkipList *pSkipList, tQueryResultset *result,
|
void tSQLBinaryExprTraverse(tSQLBinaryExpr *pExpr, SSkipList *pSkipList, SArray *result, SBinaryFilterSupp *param);
|
||||||
SBinaryFilterSupp *param);
|
|
||||||
|
|
||||||
void tSQLBinaryExprCalcTraverse(tSQLBinaryExpr *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
|
void tSQLBinaryExprCalcTraverse(tSQLBinaryExpr *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
|
||||||
char *(*cb)(void *, char *, int32_t));
|
char *(*cb)(void *, char *, int32_t));
|
||||||
|
|
|
@ -14,6 +14,8 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "qast.h"
|
#include "qast.h"
|
||||||
|
#include <tarray.h>
|
||||||
|
#include <tskiplist.h>
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "qsqlparser.h"
|
#include "qsqlparser.h"
|
||||||
#include "qsyntaxtreefunction.h"
|
#include "qsyntaxtreefunction.h"
|
||||||
|
@ -705,126 +707,120 @@ static bool filterItem(tSQLBinaryExpr *pExpr, const void *pItem, SBinaryFilterSu
|
||||||
* @param pSchema tag schemas
|
* @param pSchema tag schemas
|
||||||
* @param fp filter callback function
|
* @param fp filter callback function
|
||||||
*/
|
*/
|
||||||
static UNUSED_FUNC void tSQLBinaryTraverseOnResult(tSQLBinaryExpr *pExpr, tQueryResultset *pResult, SBinaryFilterSupp *param) {
|
static void tSQLBinaryTraverseOnResult(tSQLBinaryExpr *pExpr, SArray *pResult, SBinaryFilterSupp *param) {
|
||||||
int32_t n = 0;
|
size_t size = taosArrayGetSize(pResult);
|
||||||
for (int32_t i = 0; i < pResult->num; ++i) {
|
|
||||||
void *pItem = pResult->pRes[i];
|
SArray* array = taosArrayInit(size, POINTER_BYTES);
|
||||||
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
|
void *pItem = taosArrayGetP(pResult, i);
|
||||||
|
|
||||||
if (filterItem(pExpr, pItem, param)) {
|
if (filterItem(pExpr, pItem, param)) {
|
||||||
pResult->pRes[n++] = pResult->pRes[i];
|
taosArrayPush(array, &pItem);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pResult->num = n;
|
taosArrayCopy(pResult, array);
|
||||||
}
|
}
|
||||||
|
|
||||||
//static void tSQLBinaryTraverseOnSkipList(tSQLBinaryExpr *pExpr, tQueryResultset *pResult, tSkipList *pSkipList,
|
static void tSQLBinaryTraverseOnSkipList(tSQLBinaryExpr *pExpr, SArray *pResult, SSkipList *pSkipList,
|
||||||
// SBinaryFilterSupp *param) {
|
SBinaryFilterSupp *param) {
|
||||||
// int32_t n = 0;
|
SSkipListIterator* iter = tSkipListCreateIter(pSkipList);
|
||||||
// SSkipListIterator iter = {0};
|
|
||||||
//
|
while (tSkipListIterNext(iter)) {
|
||||||
// int32_t ret = tSkipListIteratorReset(pSkipList, &iter);
|
SSkipListNode *pNode = tSkipListIterGet(iter);
|
||||||
// assert(ret == 0);
|
|
||||||
//
|
if (filterItem(pExpr, pNode, param)) {
|
||||||
// pResult->pRes = calloc(pSkipList->nSize, POINTER_BYTES);
|
taosArrayPush(pResult, SL_GET_NODE_DATA(pNode));
|
||||||
//
|
}
|
||||||
// while (tSkipListIteratorNext(&iter)) {
|
}
|
||||||
// tSkipListNode *pNode = tSkipListIteratorGet(&iter);
|
}
|
||||||
// if (filterItem(pExpr, pNode, param)) {
|
|
||||||
// pResult->pRes[n++] = pNode;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// pResult->num = n;
|
|
||||||
//}
|
|
||||||
|
|
||||||
// post-root order traverse syntax tree
|
// post-root order traverse syntax tree
|
||||||
//void tSQLBinaryExprTraverse(tSQLBinaryExpr *pExpr, tSkipList *pSkipList, tQueryResultset *result,
|
void tSQLBinaryExprTraverse(tSQLBinaryExpr *pExpr, SSkipList *pSkipList, SArray *result, SBinaryFilterSupp *param) {
|
||||||
// SBinaryFilterSupp *param) {
|
if (pExpr == NULL) {
|
||||||
// if (pExpr == NULL) {
|
return;
|
||||||
// return;
|
}
|
||||||
// }
|
|
||||||
//
|
tSQLSyntaxNode *pLeft = pExpr->pLeft;
|
||||||
// tSQLSyntaxNode *pLeft = pExpr->pLeft;
|
tSQLSyntaxNode *pRight = pExpr->pRight;
|
||||||
// tSQLSyntaxNode *pRight = pExpr->pRight;
|
|
||||||
//
|
// recursive traverse left child branch
|
||||||
// // recursive traverse left child branch
|
if (pLeft->nodeType == TSQL_NODE_EXPR || pRight->nodeType == TSQL_NODE_EXPR) {
|
||||||
// if (pLeft->nodeType == TSQL_NODE_EXPR || pRight->nodeType == TSQL_NODE_EXPR) {
|
uint8_t weight = pLeft->pExpr->filterOnPrimaryKey + pRight->pExpr->filterOnPrimaryKey;
|
||||||
// uint8_t weight = pLeft->pExpr->filterOnPrimaryKey + pRight->pExpr->filterOnPrimaryKey;
|
|
||||||
//
|
if (weight == 0 && taosArrayGetSize(result) > 0 && pSkipList == NULL) {
|
||||||
// if (weight == 0 && result->num > 0 && pSkipList == NULL) {
|
/**
|
||||||
// /**
|
* Perform the filter operation based on the initial filter result, which is obtained from filtering from index.
|
||||||
// * Perform the filter operation based on the initial filter result, which is obtained from filtering from index.
|
* Since no index presented, the filter operation is done by scan all elements in the result set.
|
||||||
// * Since no index presented, the filter operation is done by scan all elements in the result set.
|
*
|
||||||
// *
|
* if the query is a high selectivity filter, only small portion of meters are retrieved.
|
||||||
// * if the query is a high selectivity filter, only small portion of meters are retrieved.
|
*/
|
||||||
// */
|
tSQLBinaryTraverseOnResult(pExpr, result, param);
|
||||||
// tSQLBinaryTraverseOnResult(pExpr, result, param);
|
} else if (weight == 0) {
|
||||||
// } else if (weight == 0) {
|
/**
|
||||||
// /**
|
* apply the hierarchical expression to every node in skiplist for find the qualified nodes
|
||||||
// * apply the hierarchical expression to every node in skiplist for find the qualified nodes
|
*/
|
||||||
// */
|
assert(taosArrayGetSize(result) == 0);
|
||||||
|
tSQLBinaryTraverseOnSkipList(pExpr, result, pSkipList, param);
|
||||||
|
} else if (weight == 2 || (weight == 1 && pExpr->nSQLBinaryOptr == TSDB_RELATION_OR)) {
|
||||||
|
tQueryResultset rLeft = {0};
|
||||||
|
tQueryResultset rRight = {0};
|
||||||
|
|
||||||
|
tSQLBinaryExprTraverse(pLeft->pExpr, pSkipList, &rLeft, param);
|
||||||
|
tSQLBinaryExprTraverse(pRight->pExpr, pSkipList, &rRight, param);
|
||||||
|
|
||||||
|
if (pExpr->nSQLBinaryOptr == TSDB_RELATION_AND) { // CROSS
|
||||||
|
intersect(&rLeft, &rRight, result);
|
||||||
|
} else if (pExpr->nSQLBinaryOptr == TSDB_RELATION_OR) { // or
|
||||||
|
merge(&rLeft, &rRight, result);
|
||||||
|
} else {
|
||||||
|
assert(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
free(rLeft.pRes);
|
||||||
|
free(rRight.pRes);
|
||||||
|
} else {
|
||||||
|
/*
|
||||||
|
* (weight == 1 && pExpr->nSQLBinaryOptr == TSDB_RELATION_AND) is handled here
|
||||||
|
*
|
||||||
|
* first, we filter results based on the skiplist index, which is the initial filter stage,
|
||||||
|
* then, we conduct the secondary filter operation based on the result from the initial filter stage.
|
||||||
|
*/
|
||||||
|
assert(pExpr->nSQLBinaryOptr == TSDB_RELATION_AND);
|
||||||
|
|
||||||
|
tSQLBinaryExpr *pFirst = NULL;
|
||||||
|
tSQLBinaryExpr *pSecond = NULL;
|
||||||
|
if (pLeft->pExpr->filterOnPrimaryKey == 1) {
|
||||||
|
pFirst = pLeft->pExpr;
|
||||||
|
pSecond = pRight->pExpr;
|
||||||
|
} else {
|
||||||
|
pFirst = pRight->pExpr;
|
||||||
|
pSecond = pLeft->pExpr;
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(pFirst != pSecond && pFirst != NULL && pSecond != NULL);
|
||||||
|
|
||||||
|
// we filter the result based on the skiplist index in the first place
|
||||||
|
tSQLBinaryExprTraverse(pFirst, pSkipList, result, param);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* recursively perform the filter operation based on the initial results,
|
||||||
|
* So, we do not set the skiplist index as a parameter
|
||||||
|
*/
|
||||||
|
tSQLBinaryExprTraverse(pSecond, NULL, result, param);
|
||||||
|
}
|
||||||
|
} else { // column project
|
||||||
|
assert(pLeft->nodeType == TSQL_NODE_COL && pRight->nodeType == TSQL_NODE_VALUE);
|
||||||
|
|
||||||
|
param->setupInfoFn(pExpr, param->pExtInfo);
|
||||||
|
if (pSkipList == NULL) {
|
||||||
|
tSQLListTraverseOnResult(pExpr, param->fp, result);
|
||||||
|
} else {
|
||||||
// assert(result->num == 0);
|
// assert(result->num == 0);
|
||||||
// tSQLBinaryTraverseOnSkipList(pExpr, result, pSkipList, param);
|
// tSQLDoFilterInitialResult(pSkipList, param->fp, pExpr->info, result);
|
||||||
// } else if (weight == 2 || (weight == 1 && pExpr->nSQLBinaryOptr == TSDB_RELATION_OR)) {
|
}
|
||||||
// tQueryResultset rLeft = {0};
|
}
|
||||||
// tQueryResultset rRight = {0};
|
}
|
||||||
//
|
|
||||||
// tSQLBinaryExprTraverse(pLeft->pExpr, pSkipList, &rLeft, param);
|
|
||||||
// tSQLBinaryExprTraverse(pRight->pExpr, pSkipList, &rRight, param);
|
|
||||||
//
|
|
||||||
// if (pExpr->nSQLBinaryOptr == TSDB_RELATION_AND) { // CROSS
|
|
||||||
// intersect(&rLeft, &rRight, result);
|
|
||||||
// } else if (pExpr->nSQLBinaryOptr == TSDB_RELATION_OR) { // or
|
|
||||||
// merge(&rLeft, &rRight, result);
|
|
||||||
// } else {
|
|
||||||
// assert(false);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// free(rLeft.pRes);
|
|
||||||
// free(rRight.pRes);
|
|
||||||
// } else {
|
|
||||||
// /*
|
|
||||||
// * (weight == 1 && pExpr->nSQLBinaryOptr == TSDB_RELATION_AND) is handled here
|
|
||||||
// *
|
|
||||||
// * first, we filter results based on the skiplist index, which is the initial filter stage,
|
|
||||||
// * then, we conduct the secondary filter operation based on the result from the initial filter stage.
|
|
||||||
// */
|
|
||||||
// assert(pExpr->nSQLBinaryOptr == TSDB_RELATION_AND);
|
|
||||||
//
|
|
||||||
// tSQLBinaryExpr *pFirst = NULL;
|
|
||||||
// tSQLBinaryExpr *pSecond = NULL;
|
|
||||||
// if (pLeft->pExpr->filterOnPrimaryKey == 1) {
|
|
||||||
// pFirst = pLeft->pExpr;
|
|
||||||
// pSecond = pRight->pExpr;
|
|
||||||
// } else {
|
|
||||||
// pFirst = pRight->pExpr;
|
|
||||||
// pSecond = pLeft->pExpr;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// assert(pFirst != pSecond && pFirst != NULL && pSecond != NULL);
|
|
||||||
//
|
|
||||||
// // we filter the result based on the skiplist index in the first place
|
|
||||||
// tSQLBinaryExprTraverse(pFirst, pSkipList, result, param);
|
|
||||||
//
|
|
||||||
// /*
|
|
||||||
// * recursively perform the filter operation based on the initial results,
|
|
||||||
// * So, we do not set the skiplist index as a parameter
|
|
||||||
// */
|
|
||||||
// tSQLBinaryExprTraverse(pSecond, NULL, result, param);
|
|
||||||
// }
|
|
||||||
// } else { // column project
|
|
||||||
// assert(pLeft->nodeType == TSQL_NODE_COL && pRight->nodeType == TSQL_NODE_VALUE);
|
|
||||||
//
|
|
||||||
// param->setupInfoFn(pExpr, param->pExtInfo);
|
|
||||||
// if (pSkipList == NULL) {
|
|
||||||
// tSQLListTraverseOnResult(pExpr, param->fp, result);
|
|
||||||
// } else {
|
|
||||||
// assert(result->num == 0);
|
|
||||||
//// tSQLDoFilterInitialResult(pSkipList, param->fp, pExpr->info, result);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
|
|
||||||
void tSQLBinaryExprCalcTraverse(tSQLBinaryExpr *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
|
void tSQLBinaryExprCalcTraverse(tSQLBinaryExpr *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
|
||||||
char *(*getSourceDataBlock)(void *, char *, int32_t)) {
|
char *(*getSourceDataBlock)(void *, char *, int32_t)) {
|
||||||
|
|
|
@ -5310,11 +5310,6 @@ static int32_t validateQueryMeterMsg(SQueryTableMsg *pQueryTableMsg) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pQueryTableMsg->tagLength < 0) {
|
|
||||||
dError("qmsg:%p illegal value of tag length %d", pQueryTableMsg, pQueryTableMsg->tagLength);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5353,7 +5348,8 @@ static char* createTableIdList(SQueryTableMsg* pQueryTableMsg, char* pMsg, SArra
|
||||||
* @param pExpr
|
* @param pExpr
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
static int32_t convertQueryMsg(SQueryTableMsg *pQueryTableMsg, SArray **pTableIdList, SSqlFuncExprMsg ***pExpr) {
|
static int32_t convertQueryMsg(SQueryTableMsg *pQueryTableMsg, SArray **pTableIdList, SSqlFuncExprMsg ***pExpr,
|
||||||
|
wchar_t** tagCond) {
|
||||||
pQueryTableMsg->numOfTables = htonl(pQueryTableMsg->numOfTables);
|
pQueryTableMsg->numOfTables = htonl(pQueryTableMsg->numOfTables);
|
||||||
|
|
||||||
pQueryTableMsg->window.skey = htobe64(pQueryTableMsg->window.skey);
|
pQueryTableMsg->window.skey = htobe64(pQueryTableMsg->window.skey);
|
||||||
|
@ -5372,7 +5368,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryTableMsg, SArray **pTableId
|
||||||
pQueryTableMsg->numOfCols = htons(pQueryTableMsg->numOfCols);
|
pQueryTableMsg->numOfCols = htons(pQueryTableMsg->numOfCols);
|
||||||
pQueryTableMsg->numOfOutputCols = htons(pQueryTableMsg->numOfOutputCols);
|
pQueryTableMsg->numOfOutputCols = htons(pQueryTableMsg->numOfOutputCols);
|
||||||
pQueryTableMsg->numOfGroupCols = htons(pQueryTableMsg->numOfGroupCols);
|
pQueryTableMsg->numOfGroupCols = htons(pQueryTableMsg->numOfGroupCols);
|
||||||
pQueryTableMsg->tagLength = htons(pQueryTableMsg->tagLength);
|
pQueryTableMsg->tagCondLen = htons(pQueryTableMsg->tagCondLen);
|
||||||
|
|
||||||
pQueryTableMsg->tsOffset = htonl(pQueryTableMsg->tsOffset);
|
pQueryTableMsg->tsOffset = htonl(pQueryTableMsg->tsOffset);
|
||||||
pQueryTableMsg->tsLen = htonl(pQueryTableMsg->tsLen);
|
pQueryTableMsg->tsLen = htonl(pQueryTableMsg->tsLen);
|
||||||
|
@ -5428,7 +5424,6 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryTableMsg, SArray **pTableId
|
||||||
bool hasArithmeticFunction = false;
|
bool hasArithmeticFunction = false;
|
||||||
|
|
||||||
*pExpr = calloc(pQueryTableMsg->numOfOutputCols, POINTER_BYTES);
|
*pExpr = calloc(pQueryTableMsg->numOfOutputCols, POINTER_BYTES);
|
||||||
|
|
||||||
SSqlFuncExprMsg *pExprMsg = (SSqlFuncExprMsg *)pMsg;
|
SSqlFuncExprMsg *pExprMsg = (SSqlFuncExprMsg *)pMsg;
|
||||||
|
|
||||||
for (int32_t i = 0; i < pQueryTableMsg->numOfOutputCols; ++i) {
|
for (int32_t i = 0; i < pQueryTableMsg->numOfOutputCols; ++i) {
|
||||||
|
@ -5501,6 +5496,14 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryTableMsg, SArray **pTableId
|
||||||
for (int32_t i = 0; i < pQueryTableMsg->numOfOutputCols; ++i) {
|
for (int32_t i = 0; i < pQueryTableMsg->numOfOutputCols; ++i) {
|
||||||
v[i] = htobe64(v[i]);
|
v[i] = htobe64(v[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pMsg += sizeof(int64_t) * pQueryTableMsg->numOfOutputCols;
|
||||||
|
}
|
||||||
|
|
||||||
|
// the tag query condition expression string is located at the end of query msg
|
||||||
|
if (pQueryTableMsg->tagCondLen > 0) {
|
||||||
|
*tagCond = calloc(1, pQueryTableMsg->tagCondLen * TSDB_NCHAR_SIZE);
|
||||||
|
memcpy(*tagCond, pMsg, pQueryTableMsg->tagCondLen * TSDB_NCHAR_SIZE);
|
||||||
}
|
}
|
||||||
|
|
||||||
dTrace("qmsg:%p query on %d meter(s), qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, numOfTagCols:%d, "
|
dTrace("qmsg:%p query on %d meter(s), qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, numOfTagCols:%d, "
|
||||||
|
@ -5995,7 +5998,7 @@ static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyE
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
(*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pSqlExprs, pTableIdList);
|
(*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pSqlExprs, pTableIdList);
|
||||||
if (pQInfo == NULL) {
|
if ((*pQInfo) == NULL) {
|
||||||
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
|
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
@ -6024,6 +6027,7 @@ static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyE
|
||||||
tsBufNextPos(pTSBuf);
|
tsBufNextPos(pTSBuf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// filter the qualified
|
||||||
if ((code = initQInfo(*pQInfo, pTSBuf, tsdb)) != TSDB_CODE_SUCCESS) {
|
if ((code = initQInfo(*pQInfo, pTSBuf, tsdb)) != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
@ -6050,7 +6054,9 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, SQInfo **pQ
|
||||||
|
|
||||||
SArray *pTableIdList = NULL;
|
SArray *pTableIdList = NULL;
|
||||||
SSqlFuncExprMsg** pExprMsg = NULL;
|
SSqlFuncExprMsg** pExprMsg = NULL;
|
||||||
if ((code = convertQueryMsg(pQueryTableMsg, &pTableIdList, &pExprMsg)) != TSDB_CODE_SUCCESS) {
|
wchar_t* tagCond = NULL;
|
||||||
|
|
||||||
|
if ((code = convertQueryMsg(pQueryTableMsg, &pTableIdList, &pExprMsg, &tagCond)) != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6077,6 +6083,17 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, SQInfo **pQ
|
||||||
goto _query_over;
|
goto _query_over;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// super table query
|
||||||
|
if ((pQueryTableMsg->queryType & TSDB_QUERY_TYPE_STABLE_QUERY) != 0) {
|
||||||
|
STableId* id = taosArrayGet(pTableIdList, 0);
|
||||||
|
|
||||||
|
SArray* res = tsdbQueryTableList(tsdb, id->uid, tagCond, pQueryTableMsg->tagCondLen);
|
||||||
|
if (taosArrayGetSize(res) == 0) { // no qualified table in stable query in this vnode
|
||||||
|
code = TSDB_CODE_SUCCESS;
|
||||||
|
goto _query_over;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
code = createQInfo(pQueryTableMsg, pGroupbyExpr, pExprs, pTableIdList, tsdb, pQInfo);
|
code = createQInfo(pQueryTableMsg, pGroupbyExpr, pExprs, pTableIdList, tsdb, pQInfo);
|
||||||
|
|
||||||
_query_over:
|
_query_over:
|
||||||
|
|
|
@ -87,7 +87,21 @@ size_t taosArrayGetSize(const SArray* pArray);
|
||||||
void* taosArrayInsert(SArray* pArray, size_t index, void* pData);
|
void* taosArrayInsert(SArray* pArray, size_t index, void* pData);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
* remove data entry of the given index
|
||||||
|
* @param pArray
|
||||||
|
* @param index
|
||||||
|
*/
|
||||||
|
void taosArrayRemove(SArray* pArray, size_t index);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* copy the whole array from source to destination
|
||||||
|
* @param pDst
|
||||||
|
* @param pSrc
|
||||||
|
*/
|
||||||
|
void taosArrayCopy(SArray* pDst, SArray* pSrc);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* destroy array list
|
||||||
* @param pArray
|
* @param pArray
|
||||||
*/
|
*/
|
||||||
void taosArrayDestroy(SArray* pArray);
|
void taosArrayDestroy(SArray* pArray);
|
||||||
|
|
|
@ -128,6 +128,38 @@ void* taosArrayInsert(SArray* pArray, size_t index, void* pData) {
|
||||||
return dst;
|
return dst;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void taosArrayRemove(SArray* pArray, size_t index) {
|
||||||
|
assert(index < pArray->size);
|
||||||
|
|
||||||
|
if (index == pArray->size - 1) {
|
||||||
|
taosArrayPop(pArray);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t remain = pArray->size - index - 1;
|
||||||
|
memmove(pArray->pData + index * pArray->elemSize, pArray->pData + (index + 1) * pArray->elemSize, remain * pArray->elemSize);
|
||||||
|
pArray->size -= 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
void taosArrayCopy(SArray* pDst, SArray* pSrc) {
|
||||||
|
assert(pSrc != NULL && pDst != NULL);
|
||||||
|
|
||||||
|
if (pDst->capacity < pSrc->size) {
|
||||||
|
void* pData = realloc(pDst->pData, pSrc->size * pSrc->elemSize);
|
||||||
|
if (pData == NULL) { // todo handle oom
|
||||||
|
|
||||||
|
} else {
|
||||||
|
pDst->pData = pData;
|
||||||
|
pDst->capacity = pSrc->size;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy(pDst->pData, pSrc->pData, pSrc->elemSize * pSrc->size);
|
||||||
|
pDst->elemSize = pSrc->elemSize;
|
||||||
|
pDst->capacity = pSrc->size;
|
||||||
|
pDst->size = pSrc->size;
|
||||||
|
}
|
||||||
|
|
||||||
void taosArrayDestroy(SArray* pArray) {
|
void taosArrayDestroy(SArray* pArray) {
|
||||||
if (pArray == NULL) {
|
if (pArray == NULL) {
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -164,8 +164,7 @@ static int32_t vnodeBuildExprFromArithmeticStr(SSqlFunctionExpr* pExpr, SQueryMe
|
||||||
SSchema* pSchema = toSchema(pQueryMsg, pColMsg, pQueryMsg->numOfCols);
|
SSchema* pSchema = toSchema(pQueryMsg, pColMsg, pQueryMsg->numOfCols);
|
||||||
|
|
||||||
dTrace("qmsg:%p create binary expr from string:%s", pQueryMsg, pExpr->pBase.arg[0].argValue.pz);
|
dTrace("qmsg:%p create binary expr from string:%s", pQueryMsg, pExpr->pBase.arg[0].argValue.pz);
|
||||||
tSQLBinaryExprFromString(&pBinExpr, pSchema, pQueryMsg->numOfCols, pExpr->pBase.arg[0].argValue.pz,
|
tSQLBinaryExprFromString(&pBinExpr, pSchema, pQueryMsg->numOfCols, pExpr->pBase.arg[0].argValue.pz);
|
||||||
pExpr->pBase.arg[0].argBytes);
|
|
||||||
|
|
||||||
if (pBinExpr == NULL) {
|
if (pBinExpr == NULL) {
|
||||||
dError("qmsg:%p failed to create arithmetic expression string from:%s", pQueryMsg, pExpr->pBase.arg[0].argValue.pz);
|
dError("qmsg:%p failed to create arithmetic expression string from:%s", pQueryMsg, pExpr->pBase.arg[0].argValue.pz);
|
||||||
|
|
|
@ -207,11 +207,6 @@ typedef struct SDataBlockInfo {
|
||||||
int32_t sid;
|
int32_t sid;
|
||||||
} SDataBlockInfo;
|
} SDataBlockInfo;
|
||||||
|
|
||||||
typedef struct STableIDList {
|
|
||||||
STableId *tableIds;
|
|
||||||
int32_t num;
|
|
||||||
} STableIDList;
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
} SFields;
|
} SFields;
|
||||||
|
|
||||||
|
@ -325,15 +320,15 @@ tsdb_query_handle_t *tsdbQueryFromTagConds(STsdbQueryCond *pCond, int16_t stable
|
||||||
* @param pQueryHandle
|
* @param pQueryHandle
|
||||||
* @return table sid list. the invoker is responsible for the release of this the sid list.
|
* @return table sid list. the invoker is responsible for the release of this the sid list.
|
||||||
*/
|
*/
|
||||||
STableIDList *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle);
|
SArray *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the qualified table sid for a super table according to the tag query expression.
|
* Get the qualified table id for a super table according to the tag query expression.
|
||||||
* @param stableid. super table sid
|
* @param stableid. super table sid
|
||||||
* @param pTagCond. tag query condition
|
* @param pTagCond. tag query condition
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
STableIDList *tsdbQueryTableList(int16_t stableId, const char *pTagCond);
|
SArray *tsdbQueryTableList(struct STsdbRepo* tsdb, int64_t uid, const wchar_t *pTagCond, size_t len);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,9 +13,12 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include <tlog.h>
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
|
|
||||||
|
#include "../../../query/inc/qast.h"
|
||||||
|
#include "../../../query/inc/tsqlfunction.h"
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
#include "tsdbFile.h"
|
#include "tsdbFile.h"
|
||||||
#include "tsdbMeta.h"
|
#include "tsdbMeta.h"
|
||||||
|
@ -413,6 +416,328 @@ SArray *tsdbRetrieveDataRow(tsdb_query_handle_t *pQueryHandle, SArray *pIdList,
|
||||||
|
|
||||||
tsdb_query_handle_t *tsdbQueryFromTagConds(STsdbQueryCond *pCond, int16_t stableId, const char *pTagFilterStr) {}
|
tsdb_query_handle_t *tsdbQueryFromTagConds(STsdbQueryCond *pCond, int16_t stableId, const char *pTagFilterStr) {}
|
||||||
|
|
||||||
STableIDList *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle) {}
|
SArray *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle) {}
|
||||||
|
|
||||||
STableIDList *tsdbQueryTableList(int16_t stableId, const char *pTagCond) {}
|
static SArray* createTableIdArrayList(struct STsdbRepo* tsdb, int64_t uid) {
|
||||||
|
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
|
||||||
|
assert(pTable != NULL); //assert pTable is a super table
|
||||||
|
|
||||||
|
size_t size = tSkipListGetSize(pTable->pIndex);
|
||||||
|
SArray* pList = taosArrayInit(size, sizeof(STableId));
|
||||||
|
|
||||||
|
SSkipListIterator* iter = tSkipListCreateIter(pTable->pIndex);
|
||||||
|
while(tSkipListIterNext(iter)) {
|
||||||
|
STable* t = *(STable**) tSkipListIterGet(iter);
|
||||||
|
taosArrayPush(pList, &t->tableId);
|
||||||
|
}
|
||||||
|
|
||||||
|
return pList;
|
||||||
|
}
|
||||||
|
|
||||||
|
typedef struct SSyntaxTreeFilterSupporter {
|
||||||
|
SSchema* pTagSchema;
|
||||||
|
int32_t numOfTags;
|
||||||
|
int32_t optr;
|
||||||
|
} SSyntaxTreeFilterSupporter;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* convert the result pointer to STabObj instead of tSkipListNode
|
||||||
|
* @param pRes
|
||||||
|
*/
|
||||||
|
static void tansformQueryResult(SArray* pRes) {
|
||||||
|
if (pRes == NULL || taosArrayGetSize(pRes) == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t size = taosArrayGetSize(pRes);
|
||||||
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
|
// pRes->pRes[i] = ((tSkipListNode*)(pRes->pRes[i]))->pData;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void tSQLListTraverseDestroyInfo(void* param) {
|
||||||
|
if (param == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
tQueryInfo* pInfo = (tQueryInfo*)param;
|
||||||
|
tVariantDestroy(&(pInfo->q));
|
||||||
|
free(param);
|
||||||
|
}
|
||||||
|
|
||||||
|
static char* convertTagQueryStr(const wchar_t* str, size_t len) {
|
||||||
|
char* mbs = NULL;
|
||||||
|
|
||||||
|
if (len > 0) {
|
||||||
|
mbs = calloc(1, (len + 1) * TSDB_NCHAR_SIZE);
|
||||||
|
taosUcs4ToMbs((void*) str, len * TSDB_NCHAR_SIZE, mbs); //todo add log
|
||||||
|
}
|
||||||
|
|
||||||
|
return mbs;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t compareStrVal(const void* pLeft, const void* pRight) {
|
||||||
|
int32_t ret = strcmp(pLeft, pRight);
|
||||||
|
if (ret == 0) {
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
return ret > 0 ? 1 : -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t compareWStrVal(const void* pLeft, const void* pRight) {
|
||||||
|
int32_t ret = wcscmp(pLeft, pRight);
|
||||||
|
if (ret == 0) {
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
return ret > 0 ? 1 : -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t compareIntVal(const void* pLeft, const void* pRight) {
|
||||||
|
DEFAULT_COMP(GET_INT64_VAL(pLeft), GET_INT64_VAL(pRight));
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t compareIntDoubleVal(const void* pLeft, const void* pRight) {
|
||||||
|
DEFAULT_COMP(GET_INT64_VAL(pLeft), GET_DOUBLE_VAL(pRight));
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t compareDoubleVal(const void* pLeft, const void* pRight) {
|
||||||
|
DEFAULT_COMP(GET_DOUBLE_VAL(pLeft), GET_DOUBLE_VAL(pRight));
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t compareDoubleIntVal(const void* pLeft, const void* pRight) {
|
||||||
|
double ret = (*(double*)pLeft) - (*(int64_t*)pRight);
|
||||||
|
if (fabs(ret) < DBL_EPSILON) {
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
return ret > 0 ? 1 : -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t compareStrPatternComp(const void* pLeft, const void* pRight) {
|
||||||
|
SPatternCompareInfo pInfo = {'%', '_'};
|
||||||
|
|
||||||
|
const char* pattern = pRight;
|
||||||
|
const char* str = pLeft;
|
||||||
|
|
||||||
|
int32_t ret = patternMatch(pattern, str, strlen(str), &pInfo);
|
||||||
|
|
||||||
|
return (ret == TSDB_PATTERN_MATCH) ? 0 : 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t compareWStrPatternComp(const void* pLeft, const void* pRight) {
|
||||||
|
SPatternCompareInfo pInfo = {'%', '_'};
|
||||||
|
|
||||||
|
const wchar_t* pattern = pRight;
|
||||||
|
const wchar_t* str = pLeft;
|
||||||
|
|
||||||
|
int32_t ret = WCSPatternMatch(pattern, str, wcslen(str), &pInfo);
|
||||||
|
|
||||||
|
return (ret == TSDB_PATTERN_MATCH) ? 0 : 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
static __compar_fn_t getFilterComparator(int32_t type, int32_t filterType, int32_t optr) {
|
||||||
|
__compar_fn_t comparator = NULL;
|
||||||
|
|
||||||
|
switch (type) {
|
||||||
|
case TSDB_DATA_TYPE_TINYINT:
|
||||||
|
case TSDB_DATA_TYPE_SMALLINT:
|
||||||
|
case TSDB_DATA_TYPE_INT:
|
||||||
|
case TSDB_DATA_TYPE_BIGINT:
|
||||||
|
case TSDB_DATA_TYPE_BOOL: {
|
||||||
|
if (filterType >= TSDB_DATA_TYPE_BOOL && filterType <= TSDB_DATA_TYPE_BIGINT) {
|
||||||
|
comparator = compareIntVal;
|
||||||
|
} else if (filterType >= TSDB_DATA_TYPE_FLOAT && filterType <= TSDB_DATA_TYPE_DOUBLE) {
|
||||||
|
comparator = compareIntDoubleVal;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_FLOAT:
|
||||||
|
case TSDB_DATA_TYPE_DOUBLE: {
|
||||||
|
if (filterType >= TSDB_DATA_TYPE_BOOL && filterType <= TSDB_DATA_TYPE_BIGINT) {
|
||||||
|
comparator = compareDoubleIntVal;
|
||||||
|
} else if (filterType >= TSDB_DATA_TYPE_FLOAT && filterType <= TSDB_DATA_TYPE_DOUBLE) {
|
||||||
|
comparator = compareDoubleVal;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_BINARY: {
|
||||||
|
assert(filterType == TSDB_DATA_TYPE_BINARY);
|
||||||
|
|
||||||
|
if (optr == TSDB_RELATION_LIKE) { /* wildcard query using like operator */
|
||||||
|
comparator = compareStrPatternComp;
|
||||||
|
} else { /* normal relational comparator */
|
||||||
|
comparator = compareStrVal;
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_NCHAR: {
|
||||||
|
assert(filterType == TSDB_DATA_TYPE_NCHAR);
|
||||||
|
|
||||||
|
if (optr == TSDB_RELATION_LIKE) {
|
||||||
|
comparator = compareWStrPatternComp;
|
||||||
|
} else {
|
||||||
|
comparator = compareWStrVal;
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
comparator = compareIntVal;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
return comparator;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void getTagColumnInfo(SSyntaxTreeFilterSupporter* pSupporter, SSchema* pSchema, int32_t* index,
|
||||||
|
int32_t* offset) {
|
||||||
|
*index = 0;
|
||||||
|
*offset = 0;
|
||||||
|
|
||||||
|
// filter on table name(TBNAME)
|
||||||
|
if (strcasecmp(pSchema->name, TSQL_TBNAME_L) == 0) {
|
||||||
|
*index = TSDB_TBNAME_COLUMN_INDEX;
|
||||||
|
*offset = TSDB_TBNAME_COLUMN_INDEX;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
while ((*index) < pSupporter->numOfTags) {
|
||||||
|
if (pSupporter->pTagSchema[*index].bytes == pSchema->bytes &&
|
||||||
|
pSupporter->pTagSchema[*index].type == pSchema->type &&
|
||||||
|
strcmp(pSupporter->pTagSchema[*index].name, pSchema->name) == 0) {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
(*offset) += pSupporter->pTagSchema[(*index)++].bytes;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void filterPrepare(void* expr, void* param) {
|
||||||
|
tSQLBinaryExpr *pExpr = (tSQLBinaryExpr*) expr;
|
||||||
|
if (pExpr->info != NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t i = 0, offset = 0;
|
||||||
|
pExpr->info = calloc(1, sizeof(tQueryInfo));
|
||||||
|
|
||||||
|
tQueryInfo* pInfo = pExpr->info;
|
||||||
|
SSyntaxTreeFilterSupporter* pSupporter = (SSyntaxTreeFilterSupporter*)param;
|
||||||
|
|
||||||
|
tVariant* pCond = pExpr->pRight->pVal;
|
||||||
|
SSchema* pSchema = pExpr->pLeft->pSchema;
|
||||||
|
|
||||||
|
getTagColumnInfo(pSupporter, pSchema, &i, &offset);
|
||||||
|
assert((i >= 0 && i < TSDB_MAX_TAGS) || (i == TSDB_TBNAME_COLUMN_INDEX));
|
||||||
|
assert((offset >= 0 && offset < TSDB_MAX_TAGS_LEN) || (offset == TSDB_TBNAME_COLUMN_INDEX));
|
||||||
|
|
||||||
|
pInfo->sch = *pSchema;
|
||||||
|
pInfo->colIdx = i;
|
||||||
|
pInfo->optr = pExpr->nSQLBinaryOptr;
|
||||||
|
pInfo->offset = offset;
|
||||||
|
pInfo->compare = getFilterComparator(pSchema->type, pCond->nType, pInfo->optr);
|
||||||
|
|
||||||
|
tVariantAssign(&pInfo->q, pCond);
|
||||||
|
tVariantTypeSetType(&pInfo->q, pInfo->sch.type);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool tSkipListNodeFilterCallback(const void* pNode, void* param) {
|
||||||
|
tQueryInfo* pInfo = (tQueryInfo*)param;
|
||||||
|
|
||||||
|
STable* pTable = (STable*)(SL_GET_NODE_DATA((SSkipListNode*)pNode));
|
||||||
|
|
||||||
|
char buf[TSDB_MAX_TAGS_LEN] = {0};
|
||||||
|
|
||||||
|
char* val = dataRowTuple(pTable->tagVal); // todo not only the first column
|
||||||
|
int8_t type = pInfo->sch.type;
|
||||||
|
|
||||||
|
int32_t ret = 0;
|
||||||
|
if (pInfo->q.nType == TSDB_DATA_TYPE_BINARY || pInfo->q.nType == TSDB_DATA_TYPE_NCHAR) {
|
||||||
|
ret = pInfo->compare(val, pInfo->q.pz);
|
||||||
|
} else {
|
||||||
|
tVariant t = {0};
|
||||||
|
tVariantCreateFromBinary(&t, val, (uint32_t) pInfo->sch.bytes, type);
|
||||||
|
|
||||||
|
ret = pInfo->compare(&t.i64Key, &pInfo->q.i64Key);
|
||||||
|
}
|
||||||
|
|
||||||
|
switch (pInfo->optr) {
|
||||||
|
case TSDB_RELATION_EQUAL: {
|
||||||
|
return ret == 0;
|
||||||
|
}
|
||||||
|
case TSDB_RELATION_NOT_EQUAL: {
|
||||||
|
return ret != 0;
|
||||||
|
}
|
||||||
|
case TSDB_RELATION_LARGE_EQUAL: {
|
||||||
|
return ret >= 0;
|
||||||
|
}
|
||||||
|
case TSDB_RELATION_LARGE: {
|
||||||
|
return ret > 0;
|
||||||
|
}
|
||||||
|
case TSDB_RELATION_LESS_EQUAL: {
|
||||||
|
return ret <= 0;
|
||||||
|
}
|
||||||
|
case TSDB_RELATION_LESS: {
|
||||||
|
return ret < 0;
|
||||||
|
}
|
||||||
|
case TSDB_RELATION_LIKE: {
|
||||||
|
return ret == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
assert(false);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mgmtFilterMeterByIndex(STable* pSTable, SArray* pRes, const char* pCond) {
|
||||||
|
STColumn* stcol = schemaColAt(pSTable->tagSchema, 0);
|
||||||
|
|
||||||
|
tSQLBinaryExpr* pExpr = NULL;
|
||||||
|
tSQLBinaryExprFromString(&pExpr, stcol, schemaNCols(pSTable->tagSchema), pCond, strlen(pCond));
|
||||||
|
|
||||||
|
// failed to build expression, no result, return immediately
|
||||||
|
if (pExpr == NULL) {
|
||||||
|
mError("table:%" PRIu64 ", no result returned, error in super table query expression:%s", pSTable->tableId.uid, pCond);
|
||||||
|
tfree(pCond);
|
||||||
|
|
||||||
|
return TSDB_CODE_OPS_NOT_SUPPORT;
|
||||||
|
}
|
||||||
|
|
||||||
|
// query according to the binary expression
|
||||||
|
SSyntaxTreeFilterSupporter s = {.pTagSchema = stcol, .numOfTags = schemaNCols(pSTable->tagSchema)};
|
||||||
|
|
||||||
|
SBinaryFilterSupp supp = {.fp = (__result_filter_fn_t)tSkipListNodeFilterCallback,
|
||||||
|
.setupInfoFn = (__do_filter_suppl_fn_t)filterPrepare,
|
||||||
|
.pExtInfo = &s};
|
||||||
|
|
||||||
|
tSQLBinaryExprTraverse(pExpr, pSTable->pIndex, pRes, &supp);
|
||||||
|
tSQLBinaryExprDestroy(&pExpr, tSQLListTraverseDestroyInfo);
|
||||||
|
|
||||||
|
tansformQueryResult(pRes);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SArray *tsdbQueryTableList(struct STsdbRepo* tsdb, int64_t uid, const wchar_t *pTagCond, size_t len) {
|
||||||
|
// no condition, all tables created according to the stable will involved in querying
|
||||||
|
if (pTagCond == NULL || wcslen(pTagCond) == 0) {
|
||||||
|
return createTableIdArrayList(tsdb, uid);
|
||||||
|
} else {
|
||||||
|
char* str = convertTagQueryStr(pTagCond, len);
|
||||||
|
SArray* result = taosArrayInit(8, POINTER_BYTES);
|
||||||
|
|
||||||
|
STable* pSTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
|
||||||
|
assert(pSTable != NULL);
|
||||||
|
|
||||||
|
if (mgmtFilterMeterByIndex(pSTable, result, str) == TSDB_CODE_SUCCESS) {
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue