Merge pull request #13418 from taosdata/enh/idxFilter
enh: refactor tag filter code
This commit is contained in:
commit
3eb1d48815
|
@ -49,7 +49,8 @@ typedef struct {
|
||||||
#define varDataCopy(dst, v) memcpy((dst), (void *)(v), varDataTLen(v))
|
#define varDataCopy(dst, v) memcpy((dst), (void *)(v), varDataTLen(v))
|
||||||
#define varDataLenByData(v) (*(VarDataLenT *)(((char *)(v)) - VARSTR_HEADER_SIZE))
|
#define varDataLenByData(v) (*(VarDataLenT *)(((char *)(v)) - VARSTR_HEADER_SIZE))
|
||||||
#define varDataSetLen(v, _len) (((VarDataLenT *)(v))[0] = (VarDataLenT)(_len))
|
#define varDataSetLen(v, _len) (((VarDataLenT *)(v))[0] = (VarDataLenT)(_len))
|
||||||
#define IS_VAR_DATA_TYPE(t) (((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_NCHAR) || ((t) == TSDB_DATA_TYPE_JSON))
|
#define IS_VAR_DATA_TYPE(t) \
|
||||||
|
(((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_NCHAR) || ((t) == TSDB_DATA_TYPE_JSON))
|
||||||
#define IS_STR_DATA_TYPE(t) (((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_NCHAR))
|
#define IS_STR_DATA_TYPE(t) (((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_NCHAR))
|
||||||
|
|
||||||
#define varDataNetLen(v) (htons(((VarDataLenT *)(v))[0]))
|
#define varDataNetLen(v) (htons(((VarDataLenT *)(v))[0]))
|
||||||
|
@ -179,6 +180,7 @@ typedef struct {
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
|
|
||||||
//TODO: use varchar(0) to represent NULL type
|
//TODO: use varchar(0) to represent NULL type
|
||||||
#define IS_VAR_NULL_TYPE(_t, _b) ((_t) == TSDB_DATA_TYPE_VARCHAR && (_b) == 0)
|
#define IS_VAR_NULL_TYPE(_t, _b) ((_t) == TSDB_DATA_TYPE_VARCHAR && (_b) == 0)
|
||||||
#define IS_NULL_TYPE(_t) ((_t) == TSDB_DATA_TYPE_NULL)
|
#define IS_NULL_TYPE(_t) ((_t) == TSDB_DATA_TYPE_NULL)
|
||||||
|
@ -189,7 +191,8 @@ typedef struct {
|
||||||
#define IS_INTEGER_TYPE(_t) ((IS_SIGNED_NUMERIC_TYPE(_t)) || (IS_UNSIGNED_NUMERIC_TYPE(_t)))
|
#define IS_INTEGER_TYPE(_t) ((IS_SIGNED_NUMERIC_TYPE(_t)) || (IS_UNSIGNED_NUMERIC_TYPE(_t)))
|
||||||
|
|
||||||
#define IS_NUMERIC_TYPE(_t) ((IS_SIGNED_NUMERIC_TYPE(_t)) || (IS_UNSIGNED_NUMERIC_TYPE(_t)) || (IS_FLOAT_TYPE(_t)))
|
#define IS_NUMERIC_TYPE(_t) ((IS_SIGNED_NUMERIC_TYPE(_t)) || (IS_UNSIGNED_NUMERIC_TYPE(_t)) || (IS_FLOAT_TYPE(_t)))
|
||||||
#define IS_MATHABLE_TYPE(_t) (IS_NUMERIC_TYPE(_t) || (_t) == (TSDB_DATA_TYPE_BOOL) || (_t) == (TSDB_DATA_TYPE_TIMESTAMP))
|
#define IS_MATHABLE_TYPE(_t) \
|
||||||
|
(IS_NUMERIC_TYPE(_t) || (_t) == (TSDB_DATA_TYPE_BOOL) || (_t) == (TSDB_DATA_TYPE_TIMESTAMP))
|
||||||
|
|
||||||
#define IS_VALID_TINYINT(_t) ((_t) >= INT8_MIN && (_t) <= INT8_MAX)
|
#define IS_VALID_TINYINT(_t) ((_t) >= INT8_MIN && (_t) <= INT8_MAX)
|
||||||
#define IS_VALID_SMALLINT(_t) ((_t) >= INT16_MIN && (_t) <= INT16_MAX)
|
#define IS_VALID_SMALLINT(_t) ((_t) >= INT16_MIN && (_t) <= INT16_MAX)
|
||||||
|
|
|
@ -605,8 +605,6 @@ typedef struct {
|
||||||
int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
||||||
SIdxCursor *pCursor = NULL;
|
SIdxCursor *pCursor = NULL;
|
||||||
|
|
||||||
char *tagData = param->val;
|
|
||||||
|
|
||||||
int32_t ret = 0, valid = 0;
|
int32_t ret = 0, valid = 0;
|
||||||
pCursor = (SIdxCursor *)taosMemoryCalloc(1, sizeof(SIdxCursor));
|
pCursor = (SIdxCursor *)taosMemoryCalloc(1, sizeof(SIdxCursor));
|
||||||
pCursor->pMeta = pMeta;
|
pCursor->pMeta = pMeta;
|
||||||
|
@ -623,12 +621,16 @@ int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
||||||
int32_t nKey = 0;
|
int32_t nKey = 0;
|
||||||
|
|
||||||
int32_t nTagData = 0;
|
int32_t nTagData = 0;
|
||||||
|
void * tagData = NULL;
|
||||||
|
|
||||||
if (IS_VAR_DATA_TYPE(param->type)) {
|
if (IS_VAR_DATA_TYPE(param->type)) {
|
||||||
nTagData = strlen(param->val);
|
tagData = varDataVal(param->val);
|
||||||
|
nTagData = varDataLen(param->val);
|
||||||
} else {
|
} else {
|
||||||
|
tagData = param->val;
|
||||||
nTagData = tDataTypes[param->type].bytes;
|
nTagData = tDataTypes[param->type].bytes;
|
||||||
}
|
}
|
||||||
ret = metaCreateTagIdxKey(pCursor->suid, pCursor->cid, param->val, nTagData, pCursor->type,
|
ret = metaCreateTagIdxKey(pCursor->suid, pCursor->cid, tagData, nTagData, pCursor->type,
|
||||||
param->reverse ? INT64_MAX : INT64_MIN, &pKey, &nKey);
|
param->reverse ? INT64_MAX : INT64_MIN, &pKey, &nKey);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
goto END;
|
goto END;
|
||||||
|
@ -637,6 +639,7 @@ int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
||||||
if (tdbTbcMoveTo(pCursor->pCur, pKey, nKey, &cmp) < 0) {
|
if (tdbTbcMoveTo(pCursor->pCur, pKey, nKey, &cmp) < 0) {
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
void * entryKey = NULL, *entryVal = NULL;
|
void * entryKey = NULL, *entryVal = NULL;
|
||||||
int32_t nEntryKey, nEntryVal;
|
int32_t nEntryKey, nEntryVal;
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -649,7 +652,12 @@ int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
||||||
int32_t cmp = (*param->filterFunc)(p->data, pKey->data, pKey->type);
|
int32_t cmp = (*param->filterFunc)(p->data, pKey->data, pKey->type);
|
||||||
if (cmp == 0) {
|
if (cmp == 0) {
|
||||||
// match
|
// match
|
||||||
tb_uid_t tuid = *(tb_uid_t *)(p->data + tDataTypes[pCursor->type].bytes);
|
tb_uid_t tuid = 0;
|
||||||
|
if (IS_VAR_DATA_TYPE(pKey->type)) {
|
||||||
|
tuid = *(tb_uid_t *)(p->data + varDataTLen(p->data));
|
||||||
|
} else {
|
||||||
|
tuid = *(tb_uid_t *)(p->data + tDataTypes[pCursor->type].bytes);
|
||||||
|
}
|
||||||
taosArrayPush(pUids, &tuid);
|
taosArrayPush(pUids, &tuid);
|
||||||
} else if (cmp == 1) {
|
} else if (cmp == 1) {
|
||||||
// not match but should continue to iter
|
// not match but should continue to iter
|
||||||
|
|
|
@ -341,6 +341,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq, STableMetaRsp *pMetaRsp) {
|
static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq, STableMetaRsp *pMetaRsp) {
|
||||||
void * pVal = NULL;
|
void * pVal = NULL;
|
||||||
int nVal = 0;
|
int nVal = 0;
|
||||||
|
@ -753,16 +754,11 @@ static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME) {
|
||||||
|
|
||||||
int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void *pTagData, int32_t nTagData, int8_t type, tb_uid_t uid,
|
int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void *pTagData, int32_t nTagData, int8_t type, tb_uid_t uid,
|
||||||
STagIdxKey **ppTagIdxKey, int32_t *nTagIdxKey) {
|
STagIdxKey **ppTagIdxKey, int32_t *nTagIdxKey) {
|
||||||
// int32_t nTagData = 0;
|
if (IS_VAR_DATA_TYPE(type)) {
|
||||||
|
*nTagIdxKey = sizeof(STagIdxKey) + nTagData + VARSTR_HEADER_SIZE + sizeof(tb_uid_t);
|
||||||
// if (pTagData) {
|
} else {
|
||||||
// if (IS_VAR_DATA_TYPE(type)) {
|
|
||||||
// nTagData = varDataTLen(pTagData);
|
|
||||||
// } else {
|
|
||||||
// nTagData = tDataTypes[type].bytes;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
*nTagIdxKey = sizeof(STagIdxKey) + nTagData + sizeof(tb_uid_t);
|
*nTagIdxKey = sizeof(STagIdxKey) + nTagData + sizeof(tb_uid_t);
|
||||||
|
}
|
||||||
|
|
||||||
*ppTagIdxKey = (STagIdxKey *)taosMemoryMalloc(*nTagIdxKey);
|
*ppTagIdxKey = (STagIdxKey *)taosMemoryMalloc(*nTagIdxKey);
|
||||||
if (*ppTagIdxKey == NULL) {
|
if (*ppTagIdxKey == NULL) {
|
||||||
|
@ -774,8 +770,16 @@ int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void *pTagData, int32_
|
||||||
(*ppTagIdxKey)->cid = cid;
|
(*ppTagIdxKey)->cid = cid;
|
||||||
(*ppTagIdxKey)->isNull = (pTagData == NULL) ? 1 : 0;
|
(*ppTagIdxKey)->isNull = (pTagData == NULL) ? 1 : 0;
|
||||||
(*ppTagIdxKey)->type = type;
|
(*ppTagIdxKey)->type = type;
|
||||||
if (nTagData) memcpy((*ppTagIdxKey)->data, pTagData, nTagData);
|
|
||||||
|
// refactor
|
||||||
|
if (IS_VAR_DATA_TYPE(type)) {
|
||||||
|
memcpy((*ppTagIdxKey)->data, (uint16_t *)&nTagData, VARSTR_HEADER_SIZE);
|
||||||
|
memcpy((*ppTagIdxKey)->data + VARSTR_HEADER_SIZE, pTagData, nTagData);
|
||||||
|
*(tb_uid_t *)((*ppTagIdxKey)->data + VARSTR_HEADER_SIZE + nTagData) = uid;
|
||||||
|
} else {
|
||||||
|
memcpy((*ppTagIdxKey)->data, pTagData, nTagData);
|
||||||
*(tb_uid_t *)((*ppTagIdxKey)->data + nTagData) = uid;
|
*(tb_uid_t *)((*ppTagIdxKey)->data + nTagData) = uid;
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -265,23 +265,24 @@ static int32_t sifExecFunction(SFunctionNode *node, SIFCtx *ctx, SIFParam *outpu
|
||||||
typedef int (*Filter)(void *a, void *b, int16_t dtype);
|
typedef int (*Filter)(void *a, void *b, int16_t dtype);
|
||||||
|
|
||||||
int sifGreaterThan(void *a, void *b, int16_t dtype) {
|
int sifGreaterThan(void *a, void *b, int16_t dtype) {
|
||||||
__compar_fn_t func = indexGetCompar(dtype);
|
__compar_fn_t func = getComparFunc(dtype, 0);
|
||||||
return tDoCompare(func, QUERY_GREATER_THAN, a, b);
|
return tDoCompare(func, QUERY_GREATER_THAN, a, b);
|
||||||
}
|
}
|
||||||
int sifGreaterEqual(void *a, void *b, int16_t dtype) {
|
int sifGreaterEqual(void *a, void *b, int16_t dtype) {
|
||||||
__compar_fn_t func = indexGetCompar(dtype);
|
__compar_fn_t func = getComparFunc(dtype, 0);
|
||||||
return tDoCompare(func, QUERY_GREATER_EQUAL, a, b);
|
return tDoCompare(func, QUERY_GREATER_EQUAL, a, b);
|
||||||
}
|
}
|
||||||
int sifLessEqual(void *a, void *b, int16_t dtype) {
|
int sifLessEqual(void *a, void *b, int16_t dtype) {
|
||||||
__compar_fn_t func = indexGetCompar(dtype);
|
__compar_fn_t func = getComparFunc(dtype, 0);
|
||||||
return tDoCompare(func, QUERY_LESS_EQUAL, a, b);
|
return tDoCompare(func, QUERY_LESS_EQUAL, a, b);
|
||||||
}
|
}
|
||||||
int sifLessThan(void *a, void *b, int16_t dtype) {
|
int sifLessThan(void *a, void *b, int16_t dtype) {
|
||||||
__compar_fn_t func = indexGetCompar(dtype);
|
__compar_fn_t func = getComparFunc(dtype, 0);
|
||||||
return (int)tDoCompare(func, QUERY_LESS_THAN, a, b);
|
return (int)tDoCompare(func, QUERY_LESS_THAN, a, b);
|
||||||
}
|
}
|
||||||
int sifEqual(void *a, void *b, int16_t dtype) {
|
int sifEqual(void *a, void *b, int16_t dtype) {
|
||||||
__compar_fn_t func = indexGetCompar(dtype);
|
__compar_fn_t func = getComparFunc(dtype, 0);
|
||||||
|
//__compar_fn_t func = indexGetCompar(dtype);
|
||||||
return (int)tDoCompare(func, QUERY_TERM, a, b);
|
return (int)tDoCompare(func, QUERY_TERM, a, b);
|
||||||
}
|
}
|
||||||
static Filter sifGetFilterFunc(EIndexQueryType type, bool *reverse) {
|
static Filter sifGetFilterFunc(EIndexQueryType type, bool *reverse) {
|
||||||
|
@ -397,38 +398,62 @@ static int32_t sifDefaultFunc(SIFParam *left, SIFParam *right, SIFParam *output)
|
||||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
static sif_func_t sifGetOperFn(int32_t funcId) {
|
static int32_t sifGetOperFn(int32_t funcId, sif_func_t *func, SIdxFltStatus *status) {
|
||||||
// impl later
|
// impl later
|
||||||
|
*status = SFLT_ACCURATE_INDEX;
|
||||||
switch (funcId) {
|
switch (funcId) {
|
||||||
case OP_TYPE_GREATER_THAN:
|
case OP_TYPE_GREATER_THAN:
|
||||||
return sifGreaterThanFunc;
|
*func = sifGreaterThanFunc;
|
||||||
|
return 0;
|
||||||
case OP_TYPE_GREATER_EQUAL:
|
case OP_TYPE_GREATER_EQUAL:
|
||||||
return sifGreaterEqualFunc;
|
*func = sifGreaterEqualFunc;
|
||||||
|
return 0;
|
||||||
case OP_TYPE_LOWER_THAN:
|
case OP_TYPE_LOWER_THAN:
|
||||||
return sifLessThanFunc;
|
*func = sifLessThanFunc;
|
||||||
|
return 0;
|
||||||
case OP_TYPE_LOWER_EQUAL:
|
case OP_TYPE_LOWER_EQUAL:
|
||||||
return sifLessEqualFunc;
|
*func = sifLessEqualFunc;
|
||||||
|
return 0;
|
||||||
case OP_TYPE_EQUAL:
|
case OP_TYPE_EQUAL:
|
||||||
return sifEqualFunc;
|
*func = sifEqualFunc;
|
||||||
|
return 0;
|
||||||
case OP_TYPE_NOT_EQUAL:
|
case OP_TYPE_NOT_EQUAL:
|
||||||
return sifNotEqualFunc;
|
*status = SFLT_NOT_INDEX;
|
||||||
|
*func = sifNotEqualFunc;
|
||||||
|
return 0;
|
||||||
case OP_TYPE_IN:
|
case OP_TYPE_IN:
|
||||||
return sifInFunc;
|
*status = SFLT_NOT_INDEX;
|
||||||
|
*func = sifInFunc;
|
||||||
|
return 0;
|
||||||
case OP_TYPE_NOT_IN:
|
case OP_TYPE_NOT_IN:
|
||||||
return sifNotInFunc;
|
*status = SFLT_NOT_INDEX;
|
||||||
|
*func = sifNotInFunc;
|
||||||
|
return 0;
|
||||||
case OP_TYPE_LIKE:
|
case OP_TYPE_LIKE:
|
||||||
return sifLikeFunc;
|
*status = SFLT_NOT_INDEX;
|
||||||
|
*func = sifLikeFunc;
|
||||||
|
return 0;
|
||||||
case OP_TYPE_NOT_LIKE:
|
case OP_TYPE_NOT_LIKE:
|
||||||
return sifNotLikeFunc;
|
*status = SFLT_NOT_INDEX;
|
||||||
|
*func = sifNotLikeFunc;
|
||||||
|
return 0;
|
||||||
case OP_TYPE_MATCH:
|
case OP_TYPE_MATCH:
|
||||||
return sifMatchFunc;
|
*status = SFLT_NOT_INDEX;
|
||||||
|
*func = sifMatchFunc;
|
||||||
|
return 0;
|
||||||
case OP_TYPE_NMATCH:
|
case OP_TYPE_NMATCH:
|
||||||
return sifNotMatchFunc;
|
*status = SFLT_NOT_INDEX;
|
||||||
|
*func = sifNotMatchFunc;
|
||||||
|
return 0;
|
||||||
default:
|
default:
|
||||||
return sifNullFunc;
|
*status = SFLT_NOT_INDEX;
|
||||||
|
*func = sifNullFunc;
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
return sifNullFunc;
|
return 0;
|
||||||
}
|
}
|
||||||
|
// typedef struct filterFuncDict {
|
||||||
|
|
||||||
static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
|
static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t nParam = sifGetOperParamNum(node->opType);
|
int32_t nParam = sifGetOperParamNum(node->opType);
|
||||||
|
@ -441,18 +466,14 @@ static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
|
||||||
|
|
||||||
// ugly code, refactor later
|
// ugly code, refactor later
|
||||||
output->arg = ctx->arg;
|
output->arg = ctx->arg;
|
||||||
sif_func_t operFn = sifGetOperFn(node->opType);
|
|
||||||
if (ctx->noExec && operFn == NULL) {
|
|
||||||
output->status = SFLT_NOT_INDEX;
|
|
||||||
} else {
|
|
||||||
output->status = SFLT_ACCURATE_INDEX;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
sif_func_t operFn = sifNullFunc;
|
||||||
|
code = sifGetOperFn(node->opType, &operFn, &output->status);
|
||||||
if (ctx->noExec) {
|
if (ctx->noExec) {
|
||||||
SIF_RET(code);
|
SIF_RET(code);
|
||||||
}
|
} else {
|
||||||
|
|
||||||
return operFn(¶ms[0], nParam > 1 ? ¶ms[1] : NULL, output);
|
return operFn(¶ms[0], nParam > 1 ? ¶ms[1] : NULL, output);
|
||||||
|
}
|
||||||
_return:
|
_return:
|
||||||
taosMemoryFree(params);
|
taosMemoryFree(params);
|
||||||
SIF_RET(code);
|
SIF_RET(code);
|
||||||
|
@ -477,7 +498,7 @@ static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *ou
|
||||||
} else if (node->condType == LOGIC_COND_TYPE_OR) {
|
} else if (node->condType == LOGIC_COND_TYPE_OR) {
|
||||||
taosArrayAddAll(output->result, params[m].result);
|
taosArrayAddAll(output->result, params[m].result);
|
||||||
} else if (node->condType == LOGIC_COND_TYPE_NOT) {
|
} else if (node->condType == LOGIC_COND_TYPE_NOT) {
|
||||||
taosArrayAddAll(output->result, params[m].result);
|
// taosArrayAddAll(output->result, params[m].result);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -1151,7 +1151,6 @@ void transSetDefaultAddr(void* ahandle, const char* ip, const char* fqdn) {
|
||||||
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i];
|
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i];
|
||||||
tDebug("update epset at thread:%d, threadID:%" PRId64 "", i, thrd->thread);
|
tDebug("update epset at thread:%d, threadID:%" PRId64 "", i, thrd->thread);
|
||||||
|
|
||||||
tsem_t* pSem = pCtx->pSem;
|
|
||||||
transSendAsync(thrd->asyncPool, &(cliMsg->q));
|
transSendAsync(thrd->asyncPool, &(cliMsg->q));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue