Merge branch '3.0' of github.com:taosdata/TDengine into test/chr/TD-14699
This commit is contained in:
commit
b61297f0d5
|
@ -55,8 +55,12 @@ extern int32_t tMsgDict[];
|
||||||
|
|
||||||
#define TMSG_SEG_CODE(TYPE) (((TYPE)&0xff00) >> 8)
|
#define TMSG_SEG_CODE(TYPE) (((TYPE)&0xff00) >> 8)
|
||||||
#define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff)
|
#define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff)
|
||||||
#define TMSG_INFO(TYPE) \
|
#define TMSG_INFO(TYPE) \
|
||||||
(((TYPE) >= 0 && (TYPE) < TDMT_MAX) ? tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] : 0)
|
((TYPE) >= 0 && \
|
||||||
|
((TYPE) < TDMT_DND_MAX_MSG | (TYPE) < TDMT_MND_MAX_MSG | (TYPE) < TDMT_VND_MAX_MSG | (TYPE) < TDMT_SCH_MAX_MSG | \
|
||||||
|
(TYPE) < TDMT_STREAM_MAX_MSG | (TYPE) < TDMT_MON_MAX_MSG | (TYPE) < TDMT_SYNC_MAX_MSG)) \
|
||||||
|
? tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] \
|
||||||
|
: 0
|
||||||
#define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE))
|
#define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE))
|
||||||
|
|
||||||
typedef uint16_t tmsg_t;
|
typedef uint16_t tmsg_t;
|
||||||
|
|
|
@ -82,6 +82,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_DND_NET_TEST, "net-test", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_DND_NET_TEST, "net-test", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_DND_CONFIG_DNODE, "config-dnode", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_DND_CONFIG_DNODE, "config-dnode", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_DND_SYSTABLE_RETRIEVE, "dnode-retrieve", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_DND_SYSTABLE_RETRIEVE, "dnode-retrieve", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_DND_MAX_MSG, "dnd-max", NULL, NULL)
|
||||||
|
|
||||||
TD_NEW_MSG_SEG(TDMT_MND_MSG)
|
TD_NEW_MSG_SEG(TDMT_MND_MSG)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "connect", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "connect", NULL, NULL)
|
||||||
|
@ -164,6 +165,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_SPLIT_VGROUP, "split-vgroup", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_SPLIT_VGROUP, "split-vgroup", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_SHOW_VARIABLES, "show-variables", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_SHOW_VARIABLES, "show-variables", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_SERVER_VERSION, "server-version", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_SERVER_VERSION, "server-version", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
|
||||||
|
|
||||||
TD_NEW_MSG_SEG(TDMT_VND_MSG)
|
TD_NEW_MSG_SEG(TDMT_VND_MSG)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp)
|
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp)
|
||||||
|
@ -198,6 +200,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_HASHRANGE, "alter-hashrange", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_HASHRANGE, "alter-hashrange", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "compact", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "compact", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TTL_TABLE, "drop-ttl-stb", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TTL_TABLE, "drop-ttl-stb", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_VND_MAX_MSG, "vnd-max", NULL, NULL)
|
||||||
|
|
||||||
TD_NEW_MSG_SEG(TDMT_SCH_MSG)
|
TD_NEW_MSG_SEG(TDMT_SCH_MSG)
|
||||||
TD_DEF_MSG_TYPE(TDMT_SCH_QUERY, "query", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SCH_QUERY, "query", NULL, NULL)
|
||||||
|
@ -209,6 +212,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_SCH_DROP_TASK, "drop-task", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SCH_DROP_TASK, "drop-task", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_SCH_EXPLAIN, "explain", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SCH_EXPLAIN, "explain", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_SCH_MAX_MSG, "sch-max", NULL, NULL)
|
||||||
|
|
||||||
TD_NEW_MSG_SEG(TDMT_STREAM_MSG)
|
TD_NEW_MSG_SEG(TDMT_STREAM_MSG)
|
||||||
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
|
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
|
||||||
|
@ -217,6 +221,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RECOVER, "stream-task-recover", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RECOVER, "stream-task-recover", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL)
|
||||||
|
|
||||||
TD_NEW_MSG_SEG(TDMT_MON_MSG)
|
TD_NEW_MSG_SEG(TDMT_MON_MSG)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MON_MM_INFO, "monitor-minfo", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MON_MM_INFO, "monitor-minfo", NULL, NULL)
|
||||||
|
@ -227,6 +232,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_MON_VM_LOAD, "monitor-vload", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MON_VM_LOAD, "monitor-vload", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MON_MM_LOAD, "monitor-mload", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MON_MM_LOAD, "monitor-mload", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MON_QM_LOAD, "monitor-qload", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MON_QM_LOAD, "monitor-qload", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_MON_MAX_MSG, "monitor-max", NULL, NULL)
|
||||||
|
|
||||||
TD_NEW_MSG_SEG(TDMT_SYNC_MSG)
|
TD_NEW_MSG_SEG(TDMT_SYNC_MSG)
|
||||||
TD_DEF_MSG_TYPE(TDMT_SYNC_TIMEOUT, "sync-timer", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SYNC_TIMEOUT, "sync-timer", NULL, NULL)
|
||||||
|
@ -251,6 +257,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_SYNC_LEADER_TRANSFER, "sync-leader-transfer", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SYNC_LEADER_TRANSFER, "sync-leader-transfer", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_SYNC_SET_MNODE_STANDBY, "set-mnode-standby", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SYNC_SET_MNODE_STANDBY, "set-mnode-standby", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_SYNC_SET_VNODE_STANDBY, "set-vnode-standby", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SYNC_SET_VNODE_STANDBY, "set-vnode-standby", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL)
|
||||||
|
|
||||||
#if defined(TD_MSG_NUMBER_)
|
#if defined(TD_MSG_NUMBER_)
|
||||||
TDMT_MAX
|
TDMT_MAX
|
||||||
|
|
|
@ -212,6 +212,7 @@ void dmCleanupDnode(SDnode *pDnode) {
|
||||||
dmCleanupClient(pDnode);
|
dmCleanupClient(pDnode);
|
||||||
dmCleanupServer(pDnode);
|
dmCleanupServer(pDnode);
|
||||||
dmClearVars(pDnode);
|
dmClearVars(pDnode);
|
||||||
|
rpcCleanup();
|
||||||
dDebug("dnode is closed, ptr:%p", pDnode);
|
dDebug("dnode is closed, ptr:%p", pDnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -48,13 +48,19 @@ static int32_t validateTimeUnitParam(uint8_t dbPrec, const SValueNode* pVal) {
|
||||||
return TIME_UNIT_INVALID;
|
return TIME_UNIT_INVALID;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_TIME_PRECISION_MILLI == dbPrec && 0 == strcasecmp(pVal->literal, "1u")) {
|
if (TSDB_TIME_PRECISION_MILLI == dbPrec && (0 == strcasecmp(pVal->literal, "1u") ||
|
||||||
|
0 == strcasecmp(pVal->literal, "1b"))) {
|
||||||
return TIME_UNIT_TOO_SMALL;
|
return TIME_UNIT_TOO_SMALL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pVal->literal[0] != '1' ||
|
if (TSDB_TIME_PRECISION_MICRO == dbPrec && 0 == strcasecmp(pVal->literal, "1b")) {
|
||||||
(pVal->literal[1] != 'u' && pVal->literal[1] != 'a' && pVal->literal[1] != 's' && pVal->literal[1] != 'm' &&
|
return TIME_UNIT_TOO_SMALL;
|
||||||
pVal->literal[1] != 'h' && pVal->literal[1] != 'd' && pVal->literal[1] != 'w')) {
|
}
|
||||||
|
|
||||||
|
if (pVal->literal[0] != '1' || (pVal->literal[1] != 'u' && pVal->literal[1] != 'a' &&
|
||||||
|
pVal->literal[1] != 's' && pVal->literal[1] != 'm' &&
|
||||||
|
pVal->literal[1] != 'h' && pVal->literal[1] != 'd' &&
|
||||||
|
pVal->literal[1] != 'w' && pVal->literal[1] != 'b')) {
|
||||||
return TIME_UNIT_INVALID;
|
return TIME_UNIT_INVALID;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -700,9 +706,8 @@ static int32_t translateElapsed(SFunctionNode* pFunc, char* pErrBuf, int32_t len
|
||||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||||
"ELAPSED function time unit parameter should be greater than db precision");
|
"ELAPSED function time unit parameter should be greater than db precision");
|
||||||
} else if (ret == TIME_UNIT_INVALID) {
|
} else if (ret == TIME_UNIT_INVALID) {
|
||||||
return buildFuncErrMsg(
|
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||||
pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
"ELAPSED function time unit parameter should be one of the following: [1b, 1u, 1a, 1s, 1m, 1h, 1d, 1w]");
|
||||||
"ELAPSED function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1223,9 +1228,8 @@ static int32_t translateStateDuration(SFunctionNode* pFunc, char* pErrBuf, int32
|
||||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||||
"STATEDURATION function time unit parameter should be greater than db precision");
|
"STATEDURATION function time unit parameter should be greater than db precision");
|
||||||
} else if (ret == TIME_UNIT_INVALID) {
|
} else if (ret == TIME_UNIT_INVALID) {
|
||||||
return buildFuncErrMsg(
|
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||||
pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
"STATEDURATION function time unit parameter should be one of the following: [1b, 1u, 1a, 1s, 1m, 1h, 1d, 1w]");
|
||||||
"STATEDURATION function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1735,9 +1739,8 @@ static int32_t translateTimeTruncate(SFunctionNode* pFunc, char* pErrBuf, int32_
|
||||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||||
"TIMETRUNCATE function time unit parameter should be greater than db precision");
|
"TIMETRUNCATE function time unit parameter should be greater than db precision");
|
||||||
} else if (ret == TIME_UNIT_INVALID) {
|
} else if (ret == TIME_UNIT_INVALID) {
|
||||||
return buildFuncErrMsg(
|
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||||
pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
"TIMETRUNCATE function time unit parameter should be one of the following: [1b, 1u, 1a, 1s, 1m, 1h, 1d, 1w]");
|
||||||
"TIMETRUNCATE function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
|
addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
|
||||||
|
@ -1775,9 +1778,8 @@ static int32_t translateTimeDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t le
|
||||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||||
"TIMEDIFF function time unit parameter should be greater than db precision");
|
"TIMEDIFF function time unit parameter should be greater than db precision");
|
||||||
} else if (ret == TIME_UNIT_INVALID) {
|
} else if (ret == TIME_UNIT_INVALID) {
|
||||||
return buildFuncErrMsg(
|
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||||
pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
"TIMEDIFF function time unit parameter should be one of the following: [1b, 1u, 1a, 1s, 1m, 1h, 1d, 1w]");
|
||||||
"TIMEDIFF function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -490,18 +490,7 @@ static int32_t pushDownCondOptPushCondToJoin(SOptimizeContext* pCxt, SJoinLogicN
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t pushDownCondOptPushCondToChild(SOptimizeContext* pCxt, SLogicNode* pChild, SNode** pCond) {
|
static int32_t pushDownCondOptPushCondToChild(SOptimizeContext* pCxt, SLogicNode* pChild, SNode** pCond) {
|
||||||
switch (nodeType(pChild)) {
|
return pushDownCondOptAppendCond(&pChild->pConditions, pCond);
|
||||||
case QUERY_NODE_LOGIC_PLAN_SCAN:
|
|
||||||
return pushDownCondOptPushCondToScan(pCxt, (SScanLogicNode*)pChild, pCond);
|
|
||||||
case QUERY_NODE_LOGIC_PLAN_PROJECT:
|
|
||||||
return pushDownCondOptPushCondToProject(pCxt, (SProjectLogicNode*)pChild, pCond);
|
|
||||||
case QUERY_NODE_LOGIC_PLAN_JOIN:
|
|
||||||
return pushDownCondOptPushCondToJoin(pCxt, (SJoinLogicNode*)pChild, pCond);
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
planError("pushDownCondOptPushCondToChild failed, invalid logic plan node %s", nodesNodeName(nodeType(pChild)));
|
|
||||||
return TSDB_CODE_PLAN_INTERNAL_ERROR;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool pushDownCondOptIsPriKey(SNode* pNode, SNodeList* pTableCols) {
|
static bool pushDownCondOptIsPriKey(SNode* pNode, SNodeList* pTableCols) {
|
||||||
|
@ -802,10 +791,7 @@ static int32_t pushDownCondOptDealAgg(SOptimizeContext* pCxt, SAggLogicNode* pAg
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
// TODO: remove it after full implementation of pushing down to child
|
// TODO: remove it after full implementation of pushing down to child
|
||||||
if (1 != LIST_LENGTH(pAgg->node.pChildren) ||
|
if (1 != LIST_LENGTH(pAgg->node.pChildren)) {
|
||||||
QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pAgg->node.pChildren, 0)) &&
|
|
||||||
QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(nodesListGetNode(pAgg->node.pChildren, 0)) &&
|
|
||||||
QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(nodesListGetNode(pAgg->node.pChildren, 0))) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -832,6 +818,77 @@ static int32_t pushDownCondOptDealAgg(SOptimizeContext* pCxt, SAggLogicNode* pAg
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
typedef struct SRewriteProjCondContext {
|
||||||
|
SProjectLogicNode* pProj;
|
||||||
|
int32_t errCode;
|
||||||
|
}SRewriteProjCondContext;
|
||||||
|
|
||||||
|
static EDealRes rewriteProjectCondForPushDownImpl(SNode** ppNode, void* pContext) {
|
||||||
|
SRewriteProjCondContext* pCxt = pContext;
|
||||||
|
SProjectLogicNode* pProj = pCxt->pProj;
|
||||||
|
if (QUERY_NODE_COLUMN == nodeType(*ppNode)) {
|
||||||
|
SNode* pTarget = NULL;
|
||||||
|
FOREACH(pTarget, pProj->node.pTargets) {
|
||||||
|
if (nodesEqualNode(pTarget, *ppNode)) {
|
||||||
|
SNode* pProjection = NULL;
|
||||||
|
FOREACH(pProjection, pProj->pProjections) {
|
||||||
|
if (0 == strcmp(((SExprNode*)pProjection)->aliasName, ((SColumnNode*)(*ppNode))->colName)) {
|
||||||
|
SNode* pExpr = nodesCloneNode(pProjection);
|
||||||
|
if (pExpr == NULL) {
|
||||||
|
pCxt->errCode = terrno;
|
||||||
|
return DEAL_RES_ERROR;
|
||||||
|
}
|
||||||
|
nodesDestroyNode(*ppNode);
|
||||||
|
*ppNode = pExpr;
|
||||||
|
} // end if expr alias name equal column name
|
||||||
|
} // end for each project
|
||||||
|
} // end if target node equals cond column node
|
||||||
|
} // end for each targets
|
||||||
|
return DEAL_RES_IGNORE_CHILD;
|
||||||
|
}
|
||||||
|
return DEAL_RES_CONTINUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t rewriteProjectCondForPushDown(SOptimizeContext* pCxt, SProjectLogicNode* pProject, SNode** ppProjectCond) {
|
||||||
|
SRewriteProjCondContext cxt = {.pProj = pProject, .errCode = TSDB_CODE_SUCCESS};
|
||||||
|
SNode* pProjectCond = pProject->node.pConditions;
|
||||||
|
nodesRewriteExpr(&pProjectCond, rewriteProjectCondForPushDownImpl, &cxt);
|
||||||
|
*ppProjectCond = pProjectCond;
|
||||||
|
pProject->node.pConditions = NULL;
|
||||||
|
return cxt.errCode;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t pushDownCondOptDealProject(SOptimizeContext* pCxt, SProjectLogicNode* pProject) {
|
||||||
|
if (NULL == pProject->node.pConditions ||
|
||||||
|
OPTIMIZE_FLAG_TEST_MASK(pProject->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
// TODO: remove it after full implementation of pushing down to child
|
||||||
|
if (1 != LIST_LENGTH(pProject->node.pChildren)) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (NULL != pProject->node.pLimit || NULL != pProject->node.pSlimit) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
SNode* pProjCond = NULL;
|
||||||
|
code = rewriteProjectCondForPushDown(pCxt, pProject, &pProjCond);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pProject->node.pChildren, 0);
|
||||||
|
code = pushDownCondOptPushCondToChild(pCxt, pChild, &pProjCond);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
OPTIMIZE_FLAG_SET_MASK(pProject->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE);
|
||||||
|
pCxt->optimized = true;
|
||||||
|
} else {
|
||||||
|
nodesDestroyNode(pProjCond);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t pushDownCondOptimizeImpl(SOptimizeContext* pCxt, SLogicNode* pLogicNode) {
|
static int32_t pushDownCondOptimizeImpl(SOptimizeContext* pCxt, SLogicNode* pLogicNode) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
switch (nodeType(pLogicNode)) {
|
switch (nodeType(pLogicNode)) {
|
||||||
|
@ -844,6 +901,9 @@ static int32_t pushDownCondOptimizeImpl(SOptimizeContext* pCxt, SLogicNode* pLog
|
||||||
case QUERY_NODE_LOGIC_PLAN_AGG:
|
case QUERY_NODE_LOGIC_PLAN_AGG:
|
||||||
code = pushDownCondOptDealAgg(pCxt, (SAggLogicNode*)pLogicNode);
|
code = pushDownCondOptDealAgg(pCxt, (SAggLogicNode*)pLogicNode);
|
||||||
break;
|
break;
|
||||||
|
case QUERY_NODE_LOGIC_PLAN_PROJECT:
|
||||||
|
code = pushDownCondOptDealProject(pCxt, (SProjectLogicNode*)pLogicNode);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -81,3 +81,8 @@ TEST_F(PlanOptimizeTest, eliminateProjection) {
|
||||||
run("SELECT c1 FROM st1s3");
|
run("SELECT c1 FROM st1s3");
|
||||||
// run("select 1-abs(c1) from (select unique(c1) c1 from st1s3) order by 1 nulls first");
|
// run("select 1-abs(c1) from (select unique(c1) c1 from st1s3) order by 1 nulls first");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(PlanOptimizeTest, pushDownProjectCond) {
|
||||||
|
useDb("root", "test");
|
||||||
|
run("select 1-abs(c1) from (select unique(c1) c1 from st1s3) where 1-c1>5 order by 1 nulls first");
|
||||||
|
}
|
|
@ -1174,7 +1174,7 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
|
||||||
int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 :
|
int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 :
|
||||||
(timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000);
|
(timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000);
|
||||||
|
|
||||||
timeUnit = timeUnit * 1000 / factor;
|
int64_t unit = timeUnit * 1000 / factor;
|
||||||
|
|
||||||
for (int32_t i = 0; i < pInput[0].numOfRows; ++i) {
|
for (int32_t i = 0; i < pInput[0].numOfRows; ++i) {
|
||||||
if (colDataIsNull_s(pInput[0].columnData, i)) {
|
if (colDataIsNull_s(pInput[0].columnData, i)) {
|
||||||
|
@ -1209,12 +1209,14 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
|
||||||
NUM_TO_STRING(TSDB_DATA_TYPE_BIGINT, &timeVal, sizeof(buf), buf);
|
NUM_TO_STRING(TSDB_DATA_TYPE_BIGINT, &timeVal, sizeof(buf), buf);
|
||||||
int32_t tsDigits = (int32_t)strlen(buf);
|
int32_t tsDigits = (int32_t)strlen(buf);
|
||||||
|
|
||||||
switch (timeUnit) {
|
switch (unit) {
|
||||||
case 0: { /* 1u */
|
case 0: { /* 1u or 1b */
|
||||||
if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) {
|
if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) {
|
||||||
timeVal = timeVal / 1000 * 1000;
|
if (timePrec == TSDB_TIME_PRECISION_NANO && timeUnit == 1) {
|
||||||
//} else if (tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS) {
|
timeVal = timeVal * 1;
|
||||||
// //timeVal = timeVal / 1000;
|
} else {
|
||||||
|
timeVal = timeVal / 1000 * 1000;
|
||||||
|
}
|
||||||
} else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) {
|
} else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) {
|
||||||
timeVal = timeVal * factor;
|
timeVal = timeVal * factor;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1366,8 +1368,6 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
|
||||||
int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 :
|
int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 :
|
||||||
(timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000);
|
(timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000);
|
||||||
|
|
||||||
timeUnit = timeUnit * 1000 / factor;
|
|
||||||
|
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
for (int32_t i = 0; i < inputNum; ++i) {
|
for (int32_t i = 0; i < inputNum; ++i) {
|
||||||
if (pInput[i].numOfRows > numOfRows) {
|
if (pInput[i].numOfRows > numOfRows) {
|
||||||
|
@ -1447,9 +1447,14 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
switch(timeUnit) {
|
int64_t unit = timeUnit * 1000 / factor;
|
||||||
case 0: { /* 1u */
|
switch(unit) {
|
||||||
result = result / 1000;
|
case 0: { /* 1u or 1b */
|
||||||
|
if (timePrec == TSDB_TIME_PRECISION_NANO && timeUnit == 1) {
|
||||||
|
result = result / 1;
|
||||||
|
} else {
|
||||||
|
result = result / 1000;
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case 1: { /* 1a */
|
case 1: { /* 1a */
|
||||||
|
|
|
@ -96,8 +96,8 @@ typedef void* queue[2];
|
||||||
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))
|
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))
|
||||||
|
|
||||||
#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit
|
#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit
|
||||||
#define TRANS_RETRY_INTERVAL 15 // ms retry interval
|
#define TRANS_RETRY_INTERVAL 15 // ms retry interval
|
||||||
#define TRANS_CONN_TIMEOUT 3 // connect timeout
|
#define TRANS_CONN_TIMEOUT 3 // connect timeout
|
||||||
|
|
||||||
typedef SRpcMsg STransMsg;
|
typedef SRpcMsg STransMsg;
|
||||||
typedef SRpcCtx STransCtx;
|
typedef SRpcCtx STransCtx;
|
||||||
|
@ -180,18 +180,18 @@ typedef enum { Normal, Quit, Release, Register, Update } STransMsgType;
|
||||||
typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } ConnStatus;
|
typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } ConnStatus;
|
||||||
|
|
||||||
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
|
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
|
||||||
#define RPC_RESERVE_SIZE (sizeof(STranConnCtx))
|
#define RPC_RESERVE_SIZE (sizeof(STranConnCtx))
|
||||||
|
|
||||||
#define rpcIsReq(type) (type & 1U)
|
#define rpcIsReq(type) (type & 1U)
|
||||||
|
|
||||||
#define TRANS_RESERVE_SIZE (sizeof(STranConnCtx))
|
#define TRANS_RESERVE_SIZE (sizeof(STranConnCtx))
|
||||||
|
|
||||||
#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead))
|
#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead))
|
||||||
#define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead)))
|
#define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead)))
|
||||||
#define transContFromHead(msg) (msg + sizeof(STransMsgHead))
|
#define transContFromHead(msg) (msg + sizeof(STransMsgHead))
|
||||||
#define transMsgLenFromCont(contLen) (contLen + sizeof(STransMsgHead))
|
#define transMsgLenFromCont(contLen) (contLen + sizeof(STransMsgHead))
|
||||||
#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead));
|
#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead));
|
||||||
#define transIsReq(type) (type & 1U)
|
#define transIsReq(type) (type & 1U)
|
||||||
|
|
||||||
#define transLabel(trans) ((STrans*)trans)->label
|
#define transLabel(trans) ((STrans*)trans)->label
|
||||||
|
|
||||||
|
|
|
@ -241,18 +241,19 @@ static void uvHandleReq(SSvrConn* pConn) {
|
||||||
tDebug("conn %p acquired by server app", pConn);
|
tDebug("conn %p acquired by server app", pConn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
STrans* pTransInst = pConn->pTransInst;
|
||||||
STraceId* trace = &pHead->traceId;
|
STraceId* trace = &pHead->traceId;
|
||||||
if (pConn->status == ConnNormal && pHead->noResp == 0) {
|
if (pConn->status == ConnNormal && pHead->noResp == 0) {
|
||||||
transRefSrvHandle(pConn);
|
transRefSrvHandle(pConn);
|
||||||
|
|
||||||
tGTrace("%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", transLabel(pConn), pConn,
|
tGTrace("%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", transLabel(pTransInst), pConn,
|
||||||
TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
||||||
taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen);
|
taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen);
|
||||||
} else {
|
} else {
|
||||||
tGTrace("%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d, code: %d", transLabel(pConn),
|
tGTrace("%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d, code: %d",
|
||||||
pConn, TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr),
|
||||||
taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen, pHead->noResp,
|
ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port),
|
||||||
transMsg.code);
|
transMsg.contLen, pHead->noResp, transMsg.code);
|
||||||
// no ref here
|
// no ref here
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -265,8 +266,8 @@ static void uvHandleReq(SSvrConn* pConn) {
|
||||||
transMsg.info.refId = pConn->refId;
|
transMsg.info.refId = pConn->refId;
|
||||||
transMsg.info.traceId = pHead->traceId;
|
transMsg.info.traceId = pHead->traceId;
|
||||||
|
|
||||||
tGTrace("%s handle %p conn: %p translated to app, refId: %" PRIu64 "", transLabel(pConn), transMsg.info.handle, pConn,
|
tGTrace("%s handle %p conn: %p translated to app, refId: %" PRIu64 "", transLabel(pTransInst), transMsg.info.handle,
|
||||||
pConn->refId);
|
pConn, pConn->refId);
|
||||||
assert(transMsg.info.handle != NULL);
|
assert(transMsg.info.handle != NULL);
|
||||||
|
|
||||||
if (pHead->noResp == 1) {
|
if (pHead->noResp == 1) {
|
||||||
|
@ -281,7 +282,6 @@ static void uvHandleReq(SSvrConn* pConn) {
|
||||||
|
|
||||||
transReleaseExHandle(transGetRefMgt(), pConn->refId);
|
transReleaseExHandle(transGetRefMgt(), pConn->refId);
|
||||||
|
|
||||||
STrans* pTransInst = pConn->pTransInst;
|
|
||||||
(*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
|
(*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
|
||||||
// uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
|
// uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
|
||||||
}
|
}
|
||||||
|
@ -290,14 +290,15 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
|
||||||
// opt
|
// opt
|
||||||
SSvrConn* conn = cli->data;
|
SSvrConn* conn = cli->data;
|
||||||
SConnBuffer* pBuf = &conn->readBuf;
|
SConnBuffer* pBuf = &conn->readBuf;
|
||||||
|
STrans* pTransInst = conn->pTransInst;
|
||||||
if (nread > 0) {
|
if (nread > 0) {
|
||||||
pBuf->len += nread;
|
pBuf->len += nread;
|
||||||
tTrace("%s conn %p total read: %d, current read: %d", transLabel(conn->pTransInst), conn, pBuf->len, (int)nread);
|
tTrace("%s conn %p total read: %d, current read: %d", transLabel(pTransInst), conn, pBuf->len, (int)nread);
|
||||||
if (transReadComplete(pBuf)) {
|
if (transReadComplete(pBuf)) {
|
||||||
tTrace("%s conn %p alread read complete packet", transLabel(conn->pTransInst), conn);
|
tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn);
|
||||||
uvHandleReq(conn);
|
uvHandleReq(conn);
|
||||||
} else {
|
} else {
|
||||||
tTrace("%s conn %p read partial packet, continue to read", transLabel(conn->pTransInst), conn);
|
tTrace("%s conn %p read partial packet, continue to read", transLabel(pTransInst), conn);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -305,12 +306,12 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
tError("%s conn %p read error: %s", transLabel(conn->pTransInst), conn, uv_err_name(nread));
|
tError("%s conn %p read error: %s", transLabel(pTransInst), conn, uv_err_name(nread));
|
||||||
if (nread < 0) {
|
if (nread < 0) {
|
||||||
conn->broken = true;
|
conn->broken = true;
|
||||||
if (conn->status == ConnAcquire) {
|
if (conn->status == ConnAcquire) {
|
||||||
if (conn->regArg.init) {
|
if (conn->regArg.init) {
|
||||||
tTrace("%s conn %p broken, notify server app", transLabel(conn->pTransInst), conn);
|
tTrace("%s conn %p broken, notify server app", transLabel(pTransInst), conn);
|
||||||
STrans* pTransInst = conn->pTransInst;
|
STrans* pTransInst = conn->pTransInst;
|
||||||
(*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
|
(*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
|
||||||
memset(&conn->regArg, 0, sizeof(conn->regArg));
|
memset(&conn->regArg, 0, sizeof(conn->regArg));
|
||||||
|
@ -414,8 +415,9 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
|
||||||
char* msg = (char*)pHead;
|
char* msg = (char*)pHead;
|
||||||
int32_t len = transMsgLenFromCont(pMsg->contLen);
|
int32_t len = transMsgLenFromCont(pMsg->contLen);
|
||||||
|
|
||||||
|
STrans* pTransInst = pConn->pTransInst;
|
||||||
STraceId* trace = &pMsg->info.traceId;
|
STraceId* trace = &pMsg->info.traceId;
|
||||||
tGTrace("%s conn %p %s is sent to %s:%d, local info: %s:%d, msglen:%d", transLabel(pConn->pTransInst), pConn,
|
tGTrace("%s conn %p %s is sent to %s:%d, local info: %s:%d, msglen:%d", transLabel(pTransInst), pConn,
|
||||||
TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
||||||
taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), len);
|
taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), len);
|
||||||
pHead->msgLen = htonl(len);
|
pHead->msgLen = htonl(len);
|
||||||
|
@ -761,9 +763,10 @@ static SSvrConn* createConn(void* hThrd) {
|
||||||
exh->refId = transAddExHandle(transGetRefMgt(), exh);
|
exh->refId = transAddExHandle(transGetRefMgt(), exh);
|
||||||
transAcquireExHandle(transGetRefMgt(), exh->refId);
|
transAcquireExHandle(transGetRefMgt(), exh->refId);
|
||||||
|
|
||||||
|
STrans* pTransInst = pThrd->pTransInst;
|
||||||
pConn->refId = exh->refId;
|
pConn->refId = exh->refId;
|
||||||
transRefSrvHandle(pConn);
|
transRefSrvHandle(pConn);
|
||||||
tTrace("%s handle %p, conn %p created, refId: %" PRId64 "", transLabel(pThrd->pTransInst), exh, pConn, pConn->refId);
|
tTrace("%s handle %p, conn %p created, refId: %" PRId64 "", transLabel(pTransInst), exh, pConn, pConn->refId);
|
||||||
return pConn;
|
return pConn;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -812,7 +815,13 @@ static void uvDestroyConn(uv_handle_t* handle) {
|
||||||
transReleaseExHandle(transGetRefMgt(), conn->refId);
|
transReleaseExHandle(transGetRefMgt(), conn->refId);
|
||||||
transRemoveExHandle(transGetRefMgt(), conn->refId);
|
transRemoveExHandle(transGetRefMgt(), conn->refId);
|
||||||
|
|
||||||
tDebug("%s conn %p destroy", transLabel(thrd->pTransInst), conn);
|
STrans* pTransInst = thrd->pTransInst;
|
||||||
|
tDebug("%s conn %p destroy", transLabel(pTransInst), conn);
|
||||||
|
|
||||||
|
for (int i = 0; i < transQueueSize(&conn->srvMsgs); i++) {
|
||||||
|
SSvrMsg* msg = transQueueGet(&conn->srvMsgs, i);
|
||||||
|
destroySmsg(msg);
|
||||||
|
}
|
||||||
transQueueDestroy(&conn->srvMsgs);
|
transQueueDestroy(&conn->srvMsgs);
|
||||||
|
|
||||||
QUEUE_REMOVE(&conn->queue);
|
QUEUE_REMOVE(&conn->queue);
|
||||||
|
@ -1103,7 +1112,8 @@ void transRegisterMsg(const STransMsg* msg) {
|
||||||
m->msg = tmsg;
|
m->msg = tmsg;
|
||||||
m->type = Register;
|
m->type = Register;
|
||||||
|
|
||||||
tTrace("%s conn %p start to register brokenlink callback", transLabel(pThrd->pTransInst), exh->handle);
|
STrans* pTransInst = pThrd->pTransInst;
|
||||||
|
tTrace("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle);
|
||||||
transAsyncSend(pThrd->asyncPool, &m->q);
|
transAsyncSend(pThrd->asyncPool, &m->q);
|
||||||
transReleaseExHandle(transGetRefMgt(), refId);
|
transReleaseExHandle(transGetRefMgt(), refId);
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -24,7 +24,8 @@ class TDTestCase:
|
||||||
]
|
]
|
||||||
self.db_param_precision = ['ms','us','ns']
|
self.db_param_precision = ['ms','us','ns']
|
||||||
self.time_unit = ['1w','1d','1h','1m','1s','1a','1u']
|
self.time_unit = ['1w','1d','1h','1m','1s','1a','1u']
|
||||||
self.error_unit = ['1b','2w','2d','2h','2m','2s','2a','2u','1c','#1']
|
#self.error_unit = ['1b','2w','2d','2h','2m','2s','2a','2u','1c','#1']
|
||||||
|
self.error_unit = ['2w','2d','2h','2m','2s','2a','2u','1c','#1']
|
||||||
self.ntbname = 'ntb'
|
self.ntbname = 'ntb'
|
||||||
self.stbname = 'stb'
|
self.stbname = 'stb'
|
||||||
self.ctbname = 'ctb'
|
self.ctbname = 'ctb'
|
||||||
|
|
Loading…
Reference in New Issue