[td-255] refactor
This commit is contained in:
parent
fa72b565a9
commit
a3a274d0c1
|
@ -331,12 +331,37 @@ int tscSendMsgToServer(SSqlObj *pSql) {
|
||||||
.handle = NULL,
|
.handle = NULL,
|
||||||
.code = 0
|
.code = 0
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid);
|
rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// handle three situation
|
||||||
|
// 1. epset retry, only return last failure ep
|
||||||
|
// 2. no epset retry, like 'taos -h invalidFqdn', return invalidFqdn
|
||||||
|
// 3. other situation, no expected
|
||||||
|
void tscSetFqdnErrorMsg(SSqlObj* pSql, SRpcEpSet* pEpSet) {
|
||||||
|
SSqlCmd* pCmd = &pSql->cmd;
|
||||||
|
SSqlRes* pRes = &pSql->res;
|
||||||
|
|
||||||
|
char* msgBuf = tscGetErrorMsgPayload(pCmd);
|
||||||
|
|
||||||
|
if (pEpSet) {
|
||||||
|
sprintf(msgBuf, "%s\"%s\"", tstrerror(pRes->code),pEpSet->fqdn[(pEpSet->inUse)%(pEpSet->numOfEps)]);
|
||||||
|
} else if (pCmd->command >= TSDB_SQL_MGMT) {
|
||||||
|
SRpcEpSet tEpset;
|
||||||
|
|
||||||
|
SRpcCorEpSet *pCorEpSet = pSql->pTscObj->tscCorMgmtEpSet;
|
||||||
|
taosCorBeginRead(&pCorEpSet->version);
|
||||||
|
tEpset = pCorEpSet->epSet;
|
||||||
|
taosCorEndRead(&pCorEpSet->version);
|
||||||
|
|
||||||
|
sprintf(msgBuf, "%s\"%s\"", tstrerror(pRes->code),tEpset.fqdn[(tEpset.inUse)%(tEpset.numOfEps)]);
|
||||||
|
} else {
|
||||||
|
sprintf(msgBuf, "%s", tstrerror(pRes->code));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
|
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
|
||||||
TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle;
|
TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle;
|
||||||
SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle);
|
SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle);
|
||||||
|
@ -499,26 +524,9 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
|
||||||
}
|
}
|
||||||
|
|
||||||
rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? (int32_t)pRes->numOfRows : pRes->code;
|
rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? (int32_t)pRes->numOfRows : pRes->code;
|
||||||
if (pRes->code == TSDB_CODE_RPC_FQDN_ERROR) {
|
if (rpcMsg->code == TSDB_CODE_RPC_FQDN_ERROR) {
|
||||||
tscAllocPayload(pCmd, TSDB_FQDN_LEN + 64);
|
tscAllocPayload(pCmd, TSDB_FQDN_LEN + 64);
|
||||||
// handle three situation
|
tscSetFqdnErrorMsg(pSql, pEpSet);
|
||||||
// 1. epset retry, only return last failure ep
|
|
||||||
// 2. no epset retry, like 'taos -h invalidFqdn', return invalidFqdn
|
|
||||||
// 3. other situation, no expected
|
|
||||||
if (pEpSet) {
|
|
||||||
sprintf(tscGetErrorMsgPayload(pCmd), "%s\"%s\"", tstrerror(pRes->code),pEpSet->fqdn[(pEpSet->inUse)%(pEpSet->numOfEps)]);
|
|
||||||
} else if (pCmd->command >= TSDB_SQL_MGMT) {
|
|
||||||
SRpcEpSet tEpset;
|
|
||||||
|
|
||||||
SRpcCorEpSet *pCorEpSet = pSql->pTscObj->tscCorMgmtEpSet;
|
|
||||||
taosCorBeginRead(&pCorEpSet->version);
|
|
||||||
tEpset = pCorEpSet->epSet;
|
|
||||||
taosCorEndRead(&pCorEpSet->version);
|
|
||||||
|
|
||||||
sprintf(tscGetErrorMsgPayload(pCmd), "%s\"%s\"", tstrerror(pRes->code),tEpset.fqdn[(tEpset.inUse)%(tEpset.numOfEps)]);
|
|
||||||
} else {
|
|
||||||
sprintf(tscGetErrorMsgPayload(pCmd), "%s", tstrerror(pRes->code));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
(*pSql->fp)(pSql->param, pSql, rpcMsg->code);
|
(*pSql->fp)(pSql->param, pSql, rpcMsg->code);
|
||||||
|
|
Loading…
Reference in New Issue