pass expr filter to taosd
This commit is contained in:
parent
a19ed6e956
commit
f0a32e5c3f
|
@ -96,20 +96,6 @@ typedef struct STableMetaInfo {
|
|||
SArray *tagColList; // SArray<SColumn*>, involved tag columns
|
||||
} STableMetaInfo;
|
||||
|
||||
/* the structure for sql function in select clause */
|
||||
typedef struct SSqlExpr {
|
||||
char aliasName[TSDB_COL_NAME_LEN]; // as aliasName
|
||||
SColIndex colInfo;
|
||||
uint64_t uid; // refactor use the pointer
|
||||
int16_t functionId; // function id in aAgg array
|
||||
int16_t resType; // return value type
|
||||
int16_t resBytes; // length of return value
|
||||
int32_t interBytes; // inter result buffer size
|
||||
int16_t numOfParams; // argument value of each function
|
||||
tVariant param[3]; // parameters are not more than 3
|
||||
int32_t offset; // sub result column value of arithmetic expression.
|
||||
int16_t resColId; // result column id
|
||||
} SSqlExpr;
|
||||
|
||||
typedef struct SColumnIndex {
|
||||
int16_t tableIndex;
|
||||
|
@ -129,6 +115,22 @@ typedef struct SColumn {
|
|||
SColumnFilterInfo *filterInfo;
|
||||
} SColumn;
|
||||
|
||||
/* the structure for sql function in select clause */
|
||||
typedef struct SSqlExpr {
|
||||
char aliasName[TSDB_COL_NAME_LEN]; // as aliasName
|
||||
SColIndex colInfo;
|
||||
uint64_t uid; // refactor use the pointer
|
||||
int16_t functionId; // function id in aAgg array
|
||||
int16_t resType; // return value type
|
||||
int16_t resBytes; // length of return value
|
||||
int32_t interBytes; // inter result buffer size
|
||||
int16_t numOfParams; // argument value of each function
|
||||
tVariant param[3]; // parameters are not more than 3
|
||||
int32_t offset; // sub result column value of arithmetic expression.
|
||||
int16_t resColId; // result column id
|
||||
SColumn *pFilter; // expr filter
|
||||
} SSqlExpr;
|
||||
|
||||
typedef struct SExprFilter {
|
||||
tSQLExpr *pExpr; //used for having parse
|
||||
SSqlExpr *pSqlExpr;
|
||||
|
|
|
@ -6735,6 +6735,7 @@ static int32_t checkQueryRangeForFill(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
|
|||
|
||||
pFieldFilters->pFilters = pFilters;
|
||||
pFieldFilters->pSqlExpr = pSqlExpr;
|
||||
pSqlExpr->pFilter = pFilters;
|
||||
pInfo->pFieldFilters = pFieldFilters;
|
||||
}
|
||||
|
||||
|
|
|
@ -838,8 +838,44 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
pSqlFuncExpr->functionId = htons(pExpr->functionId);
|
||||
pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
|
||||
pSqlFuncExpr->resColId = htons(pExpr->resColId);
|
||||
if (pExpr->pFilter && pExpr->pFilter->numOfFilters > 0) {
|
||||
pSqlFuncExpr->filterNum = htonl(pExpr->pFilter->numOfFilters);
|
||||
} else {
|
||||
pSqlFuncExpr->filterNum = 0;
|
||||
}
|
||||
|
||||
pMsg += sizeof(SSqlFuncMsg);
|
||||
|
||||
if (pSqlFuncExpr->filterNum) {
|
||||
pMsg += sizeof(SColumnFilterInfo) * pExpr->pFilter->numOfFilters;
|
||||
|
||||
// append the filter information after the basic column information
|
||||
for (int32_t f = 0; f < pExpr->pFilter->numOfFilters; ++f) {
|
||||
SColumnFilterInfo *pColFilter = &pExpr->pFilter->filterInfo[f];
|
||||
|
||||
SColumnFilterInfo *pFilterMsg = &pSqlFuncExpr->filterInfo[f];
|
||||
pFilterMsg->filterstr = htons(pColFilter->filterstr);
|
||||
|
||||
if (pColFilter->filterstr) {
|
||||
pFilterMsg->len = htobe64(pColFilter->len);
|
||||
memcpy(pMsg, (void *)pColFilter->pz, (size_t)(pColFilter->len + 1));
|
||||
pMsg += (pColFilter->len + 1); // append the additional filter binary info
|
||||
} else {
|
||||
pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
|
||||
pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
|
||||
}
|
||||
|
||||
pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
|
||||
pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
|
||||
|
||||
if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
|
||||
tscError("invalid filter info");
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
|
||||
// todo add log
|
||||
pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
|
||||
|
|
|
@ -390,33 +390,6 @@ typedef struct SColIndex {
|
|||
char name[TSDB_COL_NAME_LEN];
|
||||
} SColIndex;
|
||||
|
||||
/* sql function msg, to describe the message to vnode about sql function
|
||||
* operations in select clause */
|
||||
typedef struct SSqlFuncMsg {
|
||||
int16_t functionId;
|
||||
int16_t numOfParams;
|
||||
int16_t resColId; // result column id, id of the current output column
|
||||
|
||||
SColIndex colInfo;
|
||||
struct ArgElem {
|
||||
int16_t argType;
|
||||
int16_t argBytes;
|
||||
union {
|
||||
double d;
|
||||
int64_t i64;
|
||||
char * pz;
|
||||
} argValue;
|
||||
} arg[3];
|
||||
} SSqlFuncMsg;
|
||||
|
||||
typedef struct SExprInfo {
|
||||
SSqlFuncMsg base;
|
||||
struct tExprNode* pExpr;
|
||||
int16_t bytes;
|
||||
int16_t type;
|
||||
int32_t interBytes;
|
||||
int64_t uid;
|
||||
} SExprInfo;
|
||||
|
||||
typedef struct SColumnFilterInfo {
|
||||
int16_t lowerRelOptr;
|
||||
|
@ -439,6 +412,39 @@ typedef struct SColumnFilterInfo {
|
|||
};
|
||||
} SColumnFilterInfo;
|
||||
|
||||
/* sql function msg, to describe the message to vnode about sql function
|
||||
* operations in select clause */
|
||||
typedef struct SSqlFuncMsg {
|
||||
int16_t functionId;
|
||||
int16_t numOfParams;
|
||||
int16_t resColId; // result column id, id of the current output column
|
||||
|
||||
SColIndex colInfo;
|
||||
struct ArgElem {
|
||||
int16_t argType;
|
||||
int16_t argBytes;
|
||||
union {
|
||||
double d;
|
||||
int64_t i64;
|
||||
char * pz;
|
||||
} argValue;
|
||||
} arg[3];
|
||||
|
||||
int32_t filterNum;
|
||||
SColumnFilterInfo filterInfo[];
|
||||
} SSqlFuncMsg;
|
||||
|
||||
|
||||
typedef struct SExprInfo {
|
||||
SSqlFuncMsg base;
|
||||
SColumnFilterInfo * pFilter;
|
||||
struct tExprNode* pExpr;
|
||||
int16_t bytes;
|
||||
int16_t type;
|
||||
int32_t interBytes;
|
||||
int64_t uid;
|
||||
} SExprInfo;
|
||||
|
||||
/*
|
||||
* for client side struct, we only need the column id, type, bytes are not necessary
|
||||
* But for data in vnode side, we need all the following information.
|
||||
|
|
|
@ -6120,9 +6120,35 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
|
|||
pExprMsg->functionId = htons(pExprMsg->functionId);
|
||||
pExprMsg->numOfParams = htons(pExprMsg->numOfParams);
|
||||
pExprMsg->resColId = htons(pExprMsg->resColId);
|
||||
pExprMsg->filterNum = htonl(pExprMsg->filterNum);
|
||||
|
||||
pMsg += sizeof(SSqlFuncMsg);
|
||||
|
||||
SColumnFilterInfo* pExprFilterInfo = pExprMsg->filterInfo;
|
||||
|
||||
pMsg += sizeof(SColumnFilterInfo) * pExprMsg->filterNum;
|
||||
|
||||
for (int32_t f = 0; f < pExprMsg->filterNum; ++f) {
|
||||
SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pExprFilterInfo;
|
||||
|
||||
pFilterMsg->filterstr = htons(pFilterMsg->filterstr);
|
||||
|
||||
if (pFilterMsg->filterstr) {
|
||||
pFilterMsg->len = htobe64(pFilterMsg->len);
|
||||
|
||||
pFilterMsg->pz = (int64_t)pMsg;
|
||||
pMsg += (pFilterMsg->len + 1);
|
||||
} else {
|
||||
pFilterMsg->lowerBndi = htobe64(pFilterMsg->lowerBndi);
|
||||
pFilterMsg->upperBndi = htobe64(pFilterMsg->upperBndi);
|
||||
}
|
||||
|
||||
pFilterMsg->lowerRelOptr = htons(pFilterMsg->lowerRelOptr);
|
||||
pFilterMsg->upperRelOptr = htons(pFilterMsg->upperRelOptr);
|
||||
|
||||
pExprFilterInfo++;
|
||||
}
|
||||
|
||||
for (int32_t j = 0; j < pExprMsg->numOfParams; ++j) {
|
||||
pExprMsg->arg[j].argType = htons(pExprMsg->arg[j].argType);
|
||||
pExprMsg->arg[j].argBytes = htons(pExprMsg->arg[j].argBytes);
|
||||
|
@ -6304,6 +6330,42 @@ _cleanup:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t cloneExprFilterInfo(SColumnFilterInfo **dst, SColumnFilterInfo* src, int32_t filterNum) {
|
||||
if (filterNum <= 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
*dst = calloc(filterNum, sizeof(*src));
|
||||
if (*dst == NULL) {
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
memcpy(*dst, src, sizeof(*src) * filterNum);
|
||||
|
||||
for (int32_t i = 0; i < filterNum; i++) {
|
||||
if (dst[i]->filterstr && dst[i]->len > 0) {
|
||||
void *pz = calloc(1, dst[i]->len + 1);
|
||||
|
||||
if (pz == NULL) {
|
||||
if (i == 0) {
|
||||
free(*dst);
|
||||
} else {
|
||||
freeColumnFilterInfo(*dst, i);
|
||||
}
|
||||
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
memcpy(pz, (void *)src->pz, src->len + 1);
|
||||
|
||||
dst[i]->pz = (int64_t)pz;
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTableMsg *pQueryMsg) {
|
||||
qDebug("qmsg:%p create arithmetic expr from binary", pQueryMsg);
|
||||
|
||||
|
@ -6396,6 +6458,13 @@ int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutpu
|
|||
type = s->type;
|
||||
bytes = s->bytes;
|
||||
}
|
||||
|
||||
if (pExprs[i].base.filterNum > 0) {
|
||||
int32_t ret = cloneExprFilterInfo(&pExprs[i].pFilter, pExprMsg[i]->filterInfo, pExprMsg[i]->filterNum);
|
||||
if (ret) {
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t param = (int32_t)pExprs[i].base.arg[0].argValue.i64;
|
||||
|
@ -6770,6 +6839,10 @@ _cleanup_query:
|
|||
tExprTreeDestroy(pExprInfo->pExpr, NULL);
|
||||
pExprInfo->pExpr = NULL;
|
||||
}
|
||||
|
||||
if (pExprInfo->pFilter) {
|
||||
freeColumnFilterInfo(pExprInfo->pFilter, pExprInfo->base.filterNum);
|
||||
}
|
||||
}
|
||||
|
||||
tfree(pExprs);
|
||||
|
@ -6850,7 +6923,7 @@ void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters) {
|
|||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfFilters; i++) {
|
||||
if (pFilter[i].filterstr) {
|
||||
if (pFilter[i].filterstr && pFilter[i].pz) {
|
||||
free((void*)(pFilter[i].pz));
|
||||
}
|
||||
}
|
||||
|
@ -6892,6 +6965,10 @@ static void* destroyQueryFuncExpr(SExprInfo* pExprInfo, int32_t numOfExpr) {
|
|||
if (pExprInfo[i].pExpr != NULL) {
|
||||
tExprTreeDestroy(pExprInfo[i].pExpr, NULL);
|
||||
}
|
||||
|
||||
if (pExprInfo[i].pFilter) {
|
||||
freeColumnFilterInfo(pExprInfo[i].pFilter, pExprInfo[i].base.filterNum);
|
||||
}
|
||||
}
|
||||
|
||||
tfree(pExprInfo);
|
||||
|
|
Loading…
Reference in New Issue