Merge pull request #16685 from taosdata/enh/add_compile_opt
enh: add compile opt
This commit is contained in:
commit
0c42158194
|
@ -81,28 +81,28 @@ __compar_fn_t idxGetCompar(int8_t type) {
|
||||||
}
|
}
|
||||||
return getComparFunc(type, 0);
|
return getComparFunc(type, 0);
|
||||||
}
|
}
|
||||||
static TExeCond tCompareLessThan(void* a, void* b, int8_t type) {
|
static FORCE_INLINE TExeCond tCompareLessThan(void* a, void* b, int8_t type) {
|
||||||
__compar_fn_t func = idxGetCompar(type);
|
__compar_fn_t func = idxGetCompar(type);
|
||||||
return tCompare(func, QUERY_LESS_THAN, a, b, type);
|
return tCompare(func, QUERY_LESS_THAN, a, b, type);
|
||||||
}
|
}
|
||||||
static TExeCond tCompareLessEqual(void* a, void* b, int8_t type) {
|
static FORCE_INLINE TExeCond tCompareLessEqual(void* a, void* b, int8_t type) {
|
||||||
__compar_fn_t func = idxGetCompar(type);
|
__compar_fn_t func = idxGetCompar(type);
|
||||||
return tCompare(func, QUERY_LESS_EQUAL, a, b, type);
|
return tCompare(func, QUERY_LESS_EQUAL, a, b, type);
|
||||||
}
|
}
|
||||||
static TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) {
|
static FORCE_INLINE TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) {
|
||||||
__compar_fn_t func = idxGetCompar(type);
|
__compar_fn_t func = idxGetCompar(type);
|
||||||
return tCompare(func, QUERY_GREATER_THAN, a, b, type);
|
return tCompare(func, QUERY_GREATER_THAN, a, b, type);
|
||||||
}
|
}
|
||||||
static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) {
|
static FORCE_INLINE TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) {
|
||||||
__compar_fn_t func = idxGetCompar(type);
|
__compar_fn_t func = idxGetCompar(type);
|
||||||
return tCompare(func, QUERY_GREATER_EQUAL, a, b, type);
|
return tCompare(func, QUERY_GREATER_EQUAL, a, b, type);
|
||||||
}
|
}
|
||||||
|
|
||||||
static TExeCond tCompareContains(void* a, void* b, int8_t type) {
|
static FORCE_INLINE TExeCond tCompareContains(void* a, void* b, int8_t type) {
|
||||||
__compar_fn_t func = idxGetCompar(type);
|
__compar_fn_t func = idxGetCompar(type);
|
||||||
return tCompare(func, QUERY_TERM, a, b, type);
|
return tCompare(func, QUERY_TERM, a, b, type);
|
||||||
}
|
}
|
||||||
static TExeCond tCompareEqual(void* a, void* b, int8_t type) {
|
static FORCE_INLINE TExeCond tCompareEqual(void* a, void* b, int8_t type) {
|
||||||
__compar_fn_t func = idxGetCompar(type);
|
__compar_fn_t func = idxGetCompar(type);
|
||||||
return tCompare(func, QUERY_TERM, a, b, type);
|
return tCompare(func, QUERY_TERM, a, b, type);
|
||||||
}
|
}
|
||||||
|
|
|
@ -88,7 +88,7 @@ typedef struct SIFCtx {
|
||||||
SIndexMetaArg arg;
|
SIndexMetaArg arg;
|
||||||
} SIFCtx;
|
} SIFCtx;
|
||||||
|
|
||||||
static int32_t sifGetFuncFromSql(EOperatorType src, EIndexQueryType *dst) {
|
static FORCE_INLINE int32_t sifGetFuncFromSql(EOperatorType src, EIndexQueryType *dst) {
|
||||||
if (src == OP_TYPE_GREATER_THAN) {
|
if (src == OP_TYPE_GREATER_THAN) {
|
||||||
*dst = QUERY_GREATER_THAN;
|
*dst = QUERY_GREATER_THAN;
|
||||||
} else if (src == OP_TYPE_GREATER_EQUAL) {
|
} else if (src == OP_TYPE_GREATER_EQUAL) {
|
||||||
|
@ -110,10 +110,9 @@ static int32_t sifGetFuncFromSql(EOperatorType src, EIndexQueryType *dst) {
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef int32_t (*sif_func_t)(SIFParam *left, SIFParam *rigth, SIFParam *output);
|
typedef int32_t (*sif_func_t)(SIFParam *left, SIFParam *rigth, SIFParam *output);
|
||||||
|
|
||||||
static sif_func_t sifNullFunc = NULL;
|
static sif_func_t sifNullFunc = NULL;
|
||||||
|
|
||||||
static void sifFreeParam(SIFParam *param) {
|
static FORCE_INLINE void sifFreeParam(SIFParam *param) {
|
||||||
if (param == NULL) return;
|
if (param == NULL) return;
|
||||||
|
|
||||||
taosArrayDestroy(param->result);
|
taosArrayDestroy(param->result);
|
||||||
|
@ -123,7 +122,7 @@ static void sifFreeParam(SIFParam *param) {
|
||||||
param->pFilter = NULL;
|
param->pFilter = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sifGetOperParamNum(EOperatorType ty) {
|
static FORCE_INLINE int32_t sifGetOperParamNum(EOperatorType ty) {
|
||||||
if (OP_TYPE_IS_NULL == ty || OP_TYPE_IS_NOT_NULL == ty || OP_TYPE_IS_TRUE == ty || OP_TYPE_IS_NOT_TRUE == ty ||
|
if (OP_TYPE_IS_NULL == ty || OP_TYPE_IS_NOT_NULL == ty || OP_TYPE_IS_TRUE == ty || OP_TYPE_IS_NOT_TRUE == ty ||
|
||||||
OP_TYPE_IS_FALSE == ty || OP_TYPE_IS_NOT_FALSE == ty || OP_TYPE_IS_UNKNOWN == ty ||
|
OP_TYPE_IS_FALSE == ty || OP_TYPE_IS_NOT_FALSE == ty || OP_TYPE_IS_UNKNOWN == ty ||
|
||||||
OP_TYPE_IS_NOT_UNKNOWN == ty || OP_TYPE_MINUS == ty) {
|
OP_TYPE_IS_NOT_UNKNOWN == ty || OP_TYPE_MINUS == ty) {
|
||||||
|
@ -131,14 +130,14 @@ static int32_t sifGetOperParamNum(EOperatorType ty) {
|
||||||
}
|
}
|
||||||
return 2;
|
return 2;
|
||||||
}
|
}
|
||||||
static int32_t sifValidOp(EOperatorType ty) {
|
static FORCE_INLINE int32_t sifValidOp(EOperatorType ty) {
|
||||||
if ((ty >= OP_TYPE_ADD && ty <= OP_TYPE_BIT_OR) || (ty == OP_TYPE_IN || ty == OP_TYPE_NOT_IN) ||
|
if ((ty >= OP_TYPE_ADD && ty <= OP_TYPE_BIT_OR) || (ty == OP_TYPE_IN || ty == OP_TYPE_NOT_IN) ||
|
||||||
(ty == OP_TYPE_LIKE || ty == OP_TYPE_NOT_LIKE || ty == OP_TYPE_MATCH || ty == OP_TYPE_NMATCH)) {
|
(ty == OP_TYPE_LIKE || ty == OP_TYPE_NOT_LIKE || ty == OP_TYPE_MATCH || ty == OP_TYPE_NMATCH)) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
static int32_t sifValidColumn(SColumnNode *cn) {
|
static FORCE_INLINE int32_t sifValidColumn(SColumnNode *cn) {
|
||||||
// add more check
|
// add more check
|
||||||
if (cn == NULL) {
|
if (cn == NULL) {
|
||||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
|
@ -149,7 +148,7 @@ static int32_t sifValidColumn(SColumnNode *cn) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SIdxFltStatus sifMergeCond(ELogicConditionType type, SIdxFltStatus ls, SIdxFltStatus rs) {
|
static FORCE_INLINE SIdxFltStatus sifMergeCond(ELogicConditionType type, SIdxFltStatus ls, SIdxFltStatus rs) {
|
||||||
// enh rule later
|
// enh rule later
|
||||||
if (type == LOGIC_COND_TYPE_AND) {
|
if (type == LOGIC_COND_TYPE_AND) {
|
||||||
if (ls == SFLT_NOT_INDEX || rs == SFLT_NOT_INDEX) {
|
if (ls == SFLT_NOT_INDEX || rs == SFLT_NOT_INDEX) {
|
||||||
|
@ -167,7 +166,7 @@ static SIdxFltStatus sifMergeCond(ELogicConditionType type, SIdxFltStatus ls, SI
|
||||||
return SFLT_NOT_INDEX;
|
return SFLT_NOT_INDEX;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sifGetValueFromNode(SNode *node, char **value) {
|
static FORCE_INLINE int32_t sifGetValueFromNode(SNode *node, char **value) {
|
||||||
// covert data From snode;
|
// covert data From snode;
|
||||||
SValueNode *vn = (SValueNode *)node;
|
SValueNode *vn = (SValueNode *)node;
|
||||||
|
|
||||||
|
@ -205,7 +204,7 @@ static int32_t sifGetValueFromNode(SNode *node, char **value) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sifInitJsonParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
|
static FORCE_INLINE int32_t sifInitJsonParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
|
||||||
SOperatorNode *nd = (SOperatorNode *)node;
|
SOperatorNode *nd = (SOperatorNode *)node;
|
||||||
assert(nodeType(node) == QUERY_NODE_OPERATOR);
|
assert(nodeType(node) == QUERY_NODE_OPERATOR);
|
||||||
SColumnNode *l = (SColumnNode *)nd->pLeft;
|
SColumnNode *l = (SColumnNode *)nd->pLeft;
|
||||||
|
@ -355,30 +354,30 @@ static int32_t sifExecFunction(SFunctionNode *node, SIFCtx *ctx, SIFParam *outpu
|
||||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef int (*Filter)(void *a, void *b, int16_t dtype);
|
typedef int (*FilterFunc)(void *a, void *b, int16_t dtype);
|
||||||
|
|
||||||
int sifGreaterThan(void *a, void *b, int16_t dtype) {
|
static FORCE_INLINE int sifGreaterThan(void *a, void *b, int16_t dtype) {
|
||||||
__compar_fn_t func = getComparFunc(dtype, 0);
|
__compar_fn_t func = getComparFunc(dtype, 0);
|
||||||
return tDoCompare(func, QUERY_GREATER_THAN, a, b);
|
return tDoCompare(func, QUERY_GREATER_THAN, a, b);
|
||||||
}
|
}
|
||||||
int sifGreaterEqual(void *a, void *b, int16_t dtype) {
|
static FORCE_INLINE int sifGreaterEqual(void *a, void *b, int16_t dtype) {
|
||||||
__compar_fn_t func = getComparFunc(dtype, 0);
|
__compar_fn_t func = getComparFunc(dtype, 0);
|
||||||
return tDoCompare(func, QUERY_GREATER_EQUAL, a, b);
|
return tDoCompare(func, QUERY_GREATER_EQUAL, a, b);
|
||||||
}
|
}
|
||||||
int sifLessEqual(void *a, void *b, int16_t dtype) {
|
static FORCE_INLINE int sifLessEqual(void *a, void *b, int16_t dtype) {
|
||||||
__compar_fn_t func = getComparFunc(dtype, 0);
|
__compar_fn_t func = getComparFunc(dtype, 0);
|
||||||
return tDoCompare(func, QUERY_LESS_EQUAL, a, b);
|
return tDoCompare(func, QUERY_LESS_EQUAL, a, b);
|
||||||
}
|
}
|
||||||
int sifLessThan(void *a, void *b, int16_t dtype) {
|
static FORCE_INLINE int sifLessThan(void *a, void *b, int16_t dtype) {
|
||||||
__compar_fn_t func = getComparFunc(dtype, 0);
|
__compar_fn_t func = getComparFunc(dtype, 0);
|
||||||
return (int)tDoCompare(func, QUERY_LESS_THAN, a, b);
|
return (int)tDoCompare(func, QUERY_LESS_THAN, a, b);
|
||||||
}
|
}
|
||||||
int sifEqual(void *a, void *b, int16_t dtype) {
|
static FORCE_INLINE int sifEqual(void *a, void *b, int16_t dtype) {
|
||||||
__compar_fn_t func = getComparFunc(dtype, 0);
|
__compar_fn_t func = getComparFunc(dtype, 0);
|
||||||
//__compar_fn_t func = idxGetCompar(dtype);
|
//__compar_fn_t func = idxGetCompar(dtype);
|
||||||
return (int)tDoCompare(func, QUERY_TERM, a, b);
|
return (int)tDoCompare(func, QUERY_TERM, a, b);
|
||||||
}
|
}
|
||||||
static Filter sifGetFilterFunc(EIndexQueryType type, bool *reverse) {
|
static FORCE_INLINE FilterFunc sifGetFilterFunc(EIndexQueryType type, bool *reverse) {
|
||||||
if (type == QUERY_LESS_EQUAL || type == QUERY_LESS_THAN) {
|
if (type == QUERY_LESS_EQUAL || type == QUERY_LESS_THAN) {
|
||||||
*reverse = true;
|
*reverse = true;
|
||||||
} else {
|
} else {
|
||||||
|
@ -470,8 +469,8 @@ static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFP
|
||||||
indexMultiTermQueryAdd(mtm, tm, qtype);
|
indexMultiTermQueryAdd(mtm, tm, qtype);
|
||||||
ret = indexJsonSearch(arg->ivtIdx, mtm, output->result);
|
ret = indexJsonSearch(arg->ivtIdx, mtm, output->result);
|
||||||
} else {
|
} else {
|
||||||
bool reverse;
|
bool reverse;
|
||||||
Filter filterFunc = sifGetFilterFunc(qtype, &reverse);
|
FilterFunc filterFunc = sifGetFilterFunc(qtype, &reverse);
|
||||||
|
|
||||||
SMetaFltParam param = {.suid = arg->suid,
|
SMetaFltParam param = {.suid = arg->suid,
|
||||||
.cid = left->colId,
|
.cid = left->colId,
|
||||||
|
@ -498,72 +497,72 @@ static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFP
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sifLessThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static FORCE_INLINE int32_t sifLessThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_LOWER_THAN;
|
int id = OP_TYPE_LOWER_THAN;
|
||||||
return sifDoIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
static int32_t sifLessEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static FORCE_INLINE int32_t sifLessEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_LOWER_EQUAL;
|
int id = OP_TYPE_LOWER_EQUAL;
|
||||||
return sifDoIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sifGreaterThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static FORCE_INLINE int32_t sifGreaterThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_GREATER_THAN;
|
int id = OP_TYPE_GREATER_THAN;
|
||||||
return sifDoIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
static int32_t sifGreaterEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static FORCE_INLINE int32_t sifGreaterEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_GREATER_EQUAL;
|
int id = OP_TYPE_GREATER_EQUAL;
|
||||||
return sifDoIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sifEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static FORCE_INLINE int32_t sifEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_EQUAL;
|
int id = OP_TYPE_EQUAL;
|
||||||
return sifDoIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
static int32_t sifNotEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static FORCE_INLINE int32_t sifNotEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_NOT_EQUAL;
|
int id = OP_TYPE_NOT_EQUAL;
|
||||||
return sifDoIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
static int32_t sifInFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static FORCE_INLINE int32_t sifInFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_IN;
|
int id = OP_TYPE_IN;
|
||||||
return sifDoIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
static int32_t sifNotInFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static FORCE_INLINE int32_t sifNotInFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_NOT_IN;
|
int id = OP_TYPE_NOT_IN;
|
||||||
return sifDoIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
static int32_t sifLikeFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static FORCE_INLINE int32_t sifLikeFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_LIKE;
|
int id = OP_TYPE_LIKE;
|
||||||
return sifDoIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
static int32_t sifNotLikeFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static FORCE_INLINE int32_t sifNotLikeFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_NOT_LIKE;
|
int id = OP_TYPE_NOT_LIKE;
|
||||||
return sifDoIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sifMatchFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static FORCE_INLINE int32_t sifMatchFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_MATCH;
|
int id = OP_TYPE_MATCH;
|
||||||
return sifDoIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
static int32_t sifNotMatchFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static FORCE_INLINE int32_t sifNotMatchFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_NMATCH;
|
int id = OP_TYPE_NMATCH;
|
||||||
return sifDoIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
static int32_t sifJsonContains(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static FORCE_INLINE int32_t sifJsonContains(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
int id = OP_TYPE_JSON_CONTAINS;
|
int id = OP_TYPE_JSON_CONTAINS;
|
||||||
return sifDoIndex(left, right, id, output);
|
return sifDoIndex(left, right, id, output);
|
||||||
}
|
}
|
||||||
static int32_t sifJsonGetValue(SIFParam *left, SIFParam *rigth, SIFParam *output) {
|
static FORCE_INLINE int32_t sifJsonGetValue(SIFParam *left, SIFParam *rigth, SIFParam *output) {
|
||||||
// return 0
|
// return 0
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sifDefaultFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static FORCE_INLINE int32_t sifDefaultFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
// add more except
|
// add more except
|
||||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sifGetOperFn(int32_t funcId, sif_func_t *func, SIdxFltStatus *status) {
|
static FORCE_INLINE int32_t sifGetOperFn(int32_t funcId, sif_func_t *func, SIdxFltStatus *status) {
|
||||||
// impl later
|
// impl later
|
||||||
*status = SFLT_ACCURATE_INDEX;
|
*status = SFLT_ACCURATE_INDEX;
|
||||||
switch (funcId) {
|
switch (funcId) {
|
||||||
|
|
|
@ -126,22 +126,22 @@ _OVER:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void destroyHttpClient(SHttpClient* cli) {
|
static FORCE_INLINE void destroyHttpClient(SHttpClient* cli) {
|
||||||
taosMemoryFree(cli->wbuf);
|
taosMemoryFree(cli->wbuf);
|
||||||
taosMemoryFree(cli->rbuf);
|
taosMemoryFree(cli->rbuf);
|
||||||
taosMemoryFree(cli->addr);
|
taosMemoryFree(cli->addr);
|
||||||
taosMemoryFree(cli);
|
taosMemoryFree(cli);
|
||||||
}
|
}
|
||||||
static void clientCloseCb(uv_handle_t* handle) {
|
static FORCE_INLINE void clientCloseCb(uv_handle_t* handle) {
|
||||||
SHttpClient* cli = handle->data;
|
SHttpClient* cli = handle->data;
|
||||||
destroyHttpClient(cli);
|
destroyHttpClient(cli);
|
||||||
}
|
}
|
||||||
static void clientAllocBuffCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
static FORCE_INLINE void clientAllocBuffCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
||||||
SHttpClient* cli = handle->data;
|
SHttpClient* cli = handle->data;
|
||||||
buf->base = cli->rbuf;
|
buf->base = cli->rbuf;
|
||||||
buf->len = HTTP_RECV_BUF_SIZE;
|
buf->len = HTTP_RECV_BUF_SIZE;
|
||||||
}
|
}
|
||||||
static void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
||||||
SHttpClient* cli = handle->data;
|
SHttpClient* cli = handle->data;
|
||||||
if (nread < 0) {
|
if (nread < 0) {
|
||||||
uError("http-report recv error:%s", uv_err_name(nread));
|
uError("http-report recv error:%s", uv_err_name(nread));
|
||||||
|
@ -173,7 +173,7 @@ static void clientConnCb(uv_connect_t* req, int32_t status) {
|
||||||
uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb);
|
uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) {
|
static FORCE_INLINE int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) {
|
||||||
uint32_t ip = taosGetIpv4FromFqdn(server);
|
uint32_t ip = taosGetIpv4FromFqdn(server);
|
||||||
if (ip == 0xffffffff) {
|
if (ip == 0xffffffff) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
|
|
@ -69,11 +69,9 @@ typedef struct SCliThrd {
|
||||||
SAsyncPool* asyncPool;
|
SAsyncPool* asyncPool;
|
||||||
uv_prepare_t* prepare;
|
uv_prepare_t* prepare;
|
||||||
void* pool; // conn pool
|
void* pool; // conn pool
|
||||||
|
// timer handles
|
||||||
SArray* timerList;
|
SArray* timerList;
|
||||||
|
|
||||||
// msg queue
|
// msg queue
|
||||||
|
|
||||||
queue msg;
|
queue msg;
|
||||||
TdThreadMutex msgMtx;
|
TdThreadMutex msgMtx;
|
||||||
SDelayQueue* delayQueue;
|
SDelayQueue* delayQueue;
|
||||||
|
@ -108,7 +106,7 @@ static void cliReadTimeoutCb(uv_timer_t* handle);
|
||||||
// register timer in each thread to clear expire conn
|
// register timer in each thread to clear expire conn
|
||||||
// static void cliTimeoutCb(uv_timer_t* handle);
|
// static void cliTimeoutCb(uv_timer_t* handle);
|
||||||
// alloc buffer for recv
|
// alloc buffer for recv
|
||||||
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
|
static FORCE_INLINE void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
|
||||||
// callback after recv nbytes from socket
|
// callback after recv nbytes from socket
|
||||||
static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
|
static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
|
||||||
// callback after send data to socket
|
// callback after send data to socket
|
||||||
|
@ -132,10 +130,10 @@ static void cliSend(SCliConn* pConn);
|
||||||
static void cliDestroyConnMsgs(SCliConn* conn, bool destroy);
|
static void cliDestroyConnMsgs(SCliConn* conn, bool destroy);
|
||||||
|
|
||||||
// cli util func
|
// cli util func
|
||||||
static bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx);
|
static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx);
|
||||||
static void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr);
|
static FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr);
|
||||||
|
|
||||||
static int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* resp);
|
static FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* resp);
|
||||||
|
|
||||||
// process data read from server, add decompress etc later
|
// process data read from server, add decompress etc later
|
||||||
static void cliHandleResp(SCliConn* conn);
|
static void cliHandleResp(SCliConn* conn);
|
||||||
|
@ -150,12 +148,10 @@ static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd);
|
||||||
static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, NULL,
|
static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, NULL,
|
||||||
cliHandleUpdate};
|
cliHandleUpdate};
|
||||||
|
|
||||||
static void cliSendQuit(SCliThrd* thrd);
|
static FORCE_INLINE void destroyUserdata(STransMsg* userdata);
|
||||||
static void destroyUserdata(STransMsg* userdata);
|
static FORCE_INLINE void destroyCmsg(void* cmsg);
|
||||||
|
static FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst);
|
||||||
|
|
||||||
static int cliRBChoseIdx(STrans* pTransInst);
|
|
||||||
|
|
||||||
static void destroyCmsg(void* cmsg);
|
|
||||||
static void transDestroyConnCtx(STransConnCtx* ctx);
|
static void transDestroyConnCtx(STransConnCtx* ctx);
|
||||||
// thread obj
|
// thread obj
|
||||||
static SCliThrd* createThrdObj();
|
static SCliThrd* createThrdObj();
|
||||||
|
@ -885,26 +881,23 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) {
|
||||||
}
|
}
|
||||||
return conn;
|
return conn;
|
||||||
}
|
}
|
||||||
void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) {
|
FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) {
|
||||||
if (pCvtAddr->cvt == false) {
|
if (pCvtAddr->cvt == false) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
for (int i = 0; i < pEpSet->numOfEps && pEpSet->numOfEps == 1; i++) {
|
if (pEpSet->numOfEps == 1 && strncmp(pEpSet->eps[0].fqdn, pCvtAddr->fqdn, TSDB_FQDN_LEN) == 0) {
|
||||||
if (strncmp(pEpSet->eps[i].fqdn, pCvtAddr->fqdn, TSDB_FQDN_LEN) == 0) {
|
memset(pEpSet->eps[0].fqdn, 0, TSDB_FQDN_LEN);
|
||||||
memset(pEpSet->eps[i].fqdn, 0, TSDB_FQDN_LEN);
|
memcpy(pEpSet->eps[0].fqdn, pCvtAddr->ip, TSDB_FQDN_LEN);
|
||||||
memcpy(pEpSet->eps[i].fqdn, pCvtAddr->ip, TSDB_FQDN_LEN);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) {
|
FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) {
|
||||||
if (code != 0) return false;
|
if (code != 0) return false;
|
||||||
if (pCtx->retryCnt == 0) return false;
|
if (pCtx->retryCnt == 0) return false;
|
||||||
if (transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) return false;
|
if (transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) return false;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) {
|
||||||
int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) {
|
|
||||||
if (pMsg == NULL) return -1;
|
if (pMsg == NULL) return -1;
|
||||||
|
|
||||||
memset(pResp, 0, sizeof(STransMsg));
|
memset(pResp, 0, sizeof(STransMsg));
|
||||||
|
@ -1128,14 +1121,15 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
||||||
return cli;
|
return cli;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void destroyUserdata(STransMsg* userdata) {
|
FORCE_INLINE void destroyUserdata(STransMsg* userdata) {
|
||||||
if (userdata->pCont == NULL) {
|
if (userdata->pCont == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
transFreeMsg(userdata->pCont);
|
transFreeMsg(userdata->pCont);
|
||||||
userdata->pCont = NULL;
|
userdata->pCont = NULL;
|
||||||
}
|
}
|
||||||
static void destroyCmsg(void* arg) {
|
|
||||||
|
FORCE_INLINE void destroyCmsg(void* arg) {
|
||||||
SCliMsg* pMsg = arg;
|
SCliMsg* pMsg = arg;
|
||||||
if (pMsg == NULL) {
|
if (pMsg == NULL) {
|
||||||
return;
|
return;
|
||||||
|
@ -1220,7 +1214,7 @@ void cliWalkCb(uv_handle_t* handle, void* arg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int cliRBChoseIdx(STrans* pTransInst) {
|
FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst) {
|
||||||
int8_t index = pTransInst->index;
|
int8_t index = pTransInst->index;
|
||||||
if (pTransInst->numOfThreads == 0) {
|
if (pTransInst->numOfThreads == 0) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -1230,7 +1224,7 @@ int cliRBChoseIdx(STrans* pTransInst) {
|
||||||
}
|
}
|
||||||
return index % pTransInst->numOfThreads;
|
return index % pTransInst->numOfThreads;
|
||||||
}
|
}
|
||||||
static void doDelayTask(void* param) {
|
static FORCE_INLINE void doDelayTask(void* param) {
|
||||||
STaskArg* arg = param;
|
STaskArg* arg = param;
|
||||||
SCliMsg* pMsg = arg->param1;
|
SCliMsg* pMsg = arg->param1;
|
||||||
SCliThrd* pThrd = arg->param2;
|
SCliThrd* pThrd = arg->param2;
|
||||||
|
@ -1264,13 +1258,13 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||||
transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
|
transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
|
||||||
}
|
}
|
||||||
|
|
||||||
void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) {
|
FORCE_INLINE void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) {
|
||||||
if (*val != exp) {
|
if (*val != exp) {
|
||||||
*val = newVal;
|
*val = newVal;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) {
|
FORCE_INLINE bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) {
|
||||||
if ((pResp == NULL || pResp->info.hasEpSet == 0)) {
|
if ((pResp == NULL || pResp->info.hasEpSet == 0)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -1402,7 +1396,7 @@ void transUnrefCliHandle(void* handle) {
|
||||||
cliDestroyConn((SCliConn*)handle, true);
|
cliDestroyConn((SCliConn*)handle, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
SCliThrd* transGetWorkThrdFromHandle(int64_t handle, bool* validHandle) {
|
static FORCE_INLINE SCliThrd* transGetWorkThrdFromHandle(int64_t handle, bool* validHandle) {
|
||||||
SCliThrd* pThrd = NULL;
|
SCliThrd* pThrd = NULL;
|
||||||
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
|
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
|
||||||
if (exh == NULL) {
|
if (exh == NULL) {
|
||||||
|
|
|
@ -424,7 +424,7 @@ void transQueueDestroy(STransQueue* queue) {
|
||||||
taosArrayDestroy(queue->q);
|
taosArrayDestroy(queue->q);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t timeCompare(const HeapNode* a, const HeapNode* b) {
|
static FORCE_INLINE int32_t timeCompare(const HeapNode* a, const HeapNode* b) {
|
||||||
SDelayTask* arg1 = container_of(a, SDelayTask, node);
|
SDelayTask* arg1 = container_of(a, SDelayTask, node);
|
||||||
SDelayTask* arg2 = container_of(b, SDelayTask, node);
|
SDelayTask* arg2 = container_of(b, SDelayTask, node);
|
||||||
if (arg1->execTime > arg2->execTime) {
|
if (arg1->execTime > arg2->execTime) {
|
||||||
|
|
|
@ -125,17 +125,17 @@ static void uvWorkAfterTask(uv_work_t* req, int status);
|
||||||
static void uvWalkCb(uv_handle_t* handle, void* arg);
|
static void uvWalkCb(uv_handle_t* handle, void* arg);
|
||||||
static void uvFreeCb(uv_handle_t* handle);
|
static void uvFreeCb(uv_handle_t* handle);
|
||||||
|
|
||||||
static void uvStartSendRespImpl(SSvrMsg* smsg);
|
static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg);
|
||||||
|
|
||||||
static void uvPrepareSendData(SSvrMsg* msg, uv_buf_t* wb);
|
static void uvPrepareSendData(SSvrMsg* msg, uv_buf_t* wb);
|
||||||
static void uvStartSendResp(SSvrMsg* msg);
|
static void uvStartSendResp(SSvrMsg* msg);
|
||||||
|
|
||||||
static void uvNotifyLinkBrokenToApp(SSvrConn* conn);
|
static void uvNotifyLinkBrokenToApp(SSvrConn* conn);
|
||||||
|
|
||||||
static void destroySmsg(SSvrMsg* smsg);
|
static FORCE_INLINE void destroySmsg(SSvrMsg* smsg);
|
||||||
// check whether already read complete packet
|
static FORCE_INLINE SSvrConn* createConn(void* hThrd);
|
||||||
static SSvrConn* createConn(void* hThrd);
|
static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/);
|
||||||
static void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/);
|
static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn);
|
||||||
static void destroyConnRegArg(SSvrConn* conn);
|
|
||||||
|
|
||||||
static int reallocConnRef(SSvrConn* conn);
|
static int reallocConnRef(SSvrConn* conn);
|
||||||
|
|
||||||
|
@ -413,7 +413,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
|
||||||
wb->len = len;
|
wb->len = len;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void uvStartSendRespImpl(SSvrMsg* smsg) {
|
static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg) {
|
||||||
SSvrConn* pConn = smsg->pConn;
|
SSvrConn* pConn = smsg->pConn;
|
||||||
if (pConn->broken) {
|
if (pConn->broken) {
|
||||||
return;
|
return;
|
||||||
|
@ -447,7 +447,7 @@ static void uvStartSendResp(SSvrMsg* smsg) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void destroySmsg(SSvrMsg* smsg) {
|
static FORCE_INLINE void destroySmsg(SSvrMsg* smsg) {
|
||||||
if (smsg == NULL) {
|
if (smsg == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -812,7 +812,7 @@ void* transWorkerThread(void* arg) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSvrConn* createConn(void* hThrd) {
|
static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
|
||||||
SWorkThrd* pThrd = hThrd;
|
SWorkThrd* pThrd = hThrd;
|
||||||
|
|
||||||
SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn));
|
SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn));
|
||||||
|
@ -842,7 +842,7 @@ static SSvrConn* createConn(void* hThrd) {
|
||||||
return pConn;
|
return pConn;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void destroyConn(SSvrConn* conn, bool clear) {
|
static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear) {
|
||||||
if (conn == NULL) {
|
if (conn == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -854,7 +854,7 @@ static void destroyConn(SSvrConn* conn, bool clear) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
static void destroyConnRegArg(SSvrConn* conn) {
|
static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn) {
|
||||||
if (conn->regArg.init == 1) {
|
if (conn->regArg.init == 1) {
|
||||||
transFreeMsg(conn->regArg.msg.pCont);
|
transFreeMsg(conn->regArg.msg.pCont);
|
||||||
conn->regArg.init = 0;
|
conn->regArg.init = 0;
|
||||||
|
|
Loading…
Reference in New Issue