opt parameter

This commit is contained in:
yihaoDeng 2024-09-12 19:37:44 +08:00
parent 2f487130ce
commit 1bac4a8cf0
3 changed files with 68 additions and 82 deletions

View File

@ -94,12 +94,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_RPC_ASYNC_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0027) #define TSDB_CODE_RPC_ASYNC_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0027)
#define TSDB_CODE_RPC_ASYNC_IN_PROCESS TAOS_DEF_ERROR_CODE(0, 0x0028) #define TSDB_CODE_RPC_ASYNC_IN_PROCESS TAOS_DEF_ERROR_CODE(0, 0x0028)
#define TSDB_CODE_RPC_NO_STATE TAOS_DEF_ERROR_CODE(0, 0x0029) #define TSDB_CODE_RPC_NO_STATE TAOS_DEF_ERROR_CODE(0, 0x0029)
#define TSDB_CODE_RPC_STATE_DROPED TAOS_DEF_ERROR_CODE(0, 0x002A)
//common & util //common & util
#define TSDB_CODE_OPS_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x0100) // #define TSDB_CODE_OPS_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x0100) //

View File

@ -212,7 +212,7 @@ static FORCE_INLINE void cliAllocRecvBufferCb(uv_handle_t* handle, size_t sugges
// callback after recv nbytes from socket // callback after recv nbytes from socket
static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
// callback after send data to socket // callback after send data to socket
static void cliSendCb(uv_write_t* req, int status); // static void cliSendCb(uv_write_t* req, int status);
// callback after conn to server // callback after conn to server
static void cliConnCb(uv_connect_t* req, int status); static void cliConnCb(uv_connect_t* req, int status);
static void cliAsyncCb(uv_async_t* handle); static void cliAsyncCb(uv_async_t* handle);
@ -620,16 +620,16 @@ void cliConnTimeout(uv_timer_t* handle) {
tTrace("%s conn %p conn timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, TRANS_CONN_REF_GET(conn)); tTrace("%s conn %p conn timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, TRANS_CONN_REF_GET(conn));
cliResetConnTimer(conn); cliResetConnTimer(conn);
cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); // cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr);
cliHandleFastFail(conn, UV_ECANCELED); // cliHandleFastFail(conn, UV_ECANCELED);
}
void cliReadTimeoutCb(uv_timer_t* handle) {
// set up timeout cb
SCliConn* conn = handle->data;
tTrace("%s conn %p timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, TRANS_CONN_REF_GET(conn));
(void)uv_read_stop(conn->stream);
cliHandleExceptImpl(conn, TSDB_CODE_RPC_TIMEOUT);
} }
// void cliReadTimeoutCb(uv_timer_t* handle) {
// // set up timeout cb
// SCliConn* conn = handle->data;
// tTrace("%s conn %p timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, TRANS_CONN_REF_GET(conn));
// (void)uv_read_stop(conn->stream);
// cliHandleExceptImpl(conn, TSDB_CODE_RPC_TIMEOUT);
// }
void* createConnPool(int size) { void* createConnPool(int size) {
// thread local, no lock // thread local, no lock
@ -1070,45 +1070,6 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
} }
} }
// static int32_t cliAddReqToConn(SCliConn* conn, SCliReq* pReq) {
// if (transQueuePush(&conn->reqsToSend, pReq) != 0) {
// return TSDB_CODE_OUT_OF_MEMORY;
// }
// return 0;
// }
// static int32_t cliRmReqFromConn(SCliConn* conn, SCliReq** pReq) {
// // do nothing
// SCliReq* pTail = transQueuePop(&conn->reqsToSend);
// if (pTail == NULL) {
// return TSDB_CODE_INVALID_PARA;
// }
// if (pReq != NULL) {
// *pReq = pTail;
// }
// return 0;
// }
// static int32_t cliPutQReqToTable(SCliConn* pConn, SCliReq* pReq) {
// int32_t code = 0;
// if (pReq->msg.info.handle == 0) {
// return 0;
// }
// queue q;
// QUEUE_INIT(&q);
// queue* p = taosHashGet(pConn->pQueryTable, (void*)pReq->msg.info.handle, sizeof(int64_t));
// if (p == NULL) {
// QUEUE_PUSH(&q, &pReq->qlist);
// code = taosHashPut(pConn->pQueryTable, (void*)pReq->msg.info.handle, sizeof(int64_t), &q, sizeof(queue));
// if (code != 0) {
// return code;
// }
// } else {
// QUEUE_PUSH(p, &pReq->qlist);
// }
// return 0;
// }
static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn) { static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn) {
int32_t code = 0; int32_t code = 0;
SCliConn* pConn = NULL; SCliConn* pConn = NULL;
@ -1291,37 +1252,44 @@ static void cliDestroy(uv_handle_t* handle) {
taosMemoryFree(conn); taosMemoryFree(conn);
} }
bool filterAllReq(void* e, void* arg) { return 1; }
static void cliHandleBatch_shareConnExcept(SCliConn* conn) { static void cliHandleBatch_shareConnExcept(SCliConn* conn) {
int32_t code = 0; int32_t code = 0;
SCliThrd* pThrd = conn->hostThrd; SCliThrd* pThrd = conn->hostThrd;
STrans* pInst = pThrd->pInst; STrans* pInst = pThrd->pInst;
while (!transQueueEmpty(&conn->reqsToSend)) {
queue* el = transQueuePop(&conn->reqsToSend); queue set;
transQueueRemoveByFilter(&conn->reqsSentOut, filterAllReq, NULL, &set, -1);
transQueueRemoveByFilter(&conn->reqsToSend, filterAllReq, NULL, &set, -1);
while (!QUEUE_IS_EMPTY(&set)) {
queue* el = QUEUE_HEAD(&set);
QUEUE_REMOVE(el);
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q); SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
// ASSERT(pReq->type != Release);
// ASSERT(REQUEST_NO_RESP(&pReq->msg) == 0);
SReqCtx* pCtx = pReq ? pReq->ctx : NULL;
SReqCtx* pCtx = pReq ? pReq->ctx : NULL;
STransMsg resp = {0}; STransMsg resp = {0};
resp.code = code == -1 ? (conn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL) : code; resp.code = (conn->connnected ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL);
resp.msgType = pReq ? pReq->msg.msgType + 1 : 0; resp.msgType = pReq ? pReq->msg.msgType + 1 : 0;
resp.info.ahandle = NULL;
resp.info.cliVer = pInst->compatibilityVer; resp.info.cliVer = pInst->compatibilityVer;
resp.info.ahandle = pCtx->ahandle; resp.info.ahandle = pCtx ? pCtx->ahandle : 0;
// not
if (pCtx == NULL) {
destroyReq(pReq);
continue;
}
pReq->seq = 0; pReq->seq = 0;
code = cliNotifyCb(conn, pReq, &resp); code = cliNotifyCb(conn, pReq, &resp);
if (code != 0) { if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) {
continue; continue;
} else { } else {
// already notify user // already notify user
destroyReq(pReq); destroyReq(pReq);
} }
} }
if (TRANS_CONN_REF_GET(conn) > 1) transUnrefCliHandle(conn);
transUnrefCliHandle(conn);
} }
bool fileToRmReq(void* h, void* arg) { bool fileToRmReq(void* h, void* arg) {
@ -1611,6 +1579,23 @@ int32_t cliConnSetSockInfo(SCliConn* pConn) {
return 0; return 0;
}; };
static int32_t cliBuildExeceptMsg(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) {
SCliThrd* pThrd = pConn->hostThrd;
STrans* pInst = pThrd->pInst;
memset(pResp, 0, sizeof(STransMsg));
STransMsg resp = {0};
resp.contLen = 0;
resp.pCont = NULL;
resp.msgType = pReq->msg.msgType + 1;
resp.info.ahandle = pReq->ctx->ahandle;
resp.info.traceId = pReq->msg.info.traceId;
resp.info.hasEpSet = false;
resp.info.cliVer = pInst->compatibilityVer;
return 0;
}
bool filteGetAll(void* q, void* arg) { return true; }
void cliConnCb(uv_connect_t* req, int status) { void cliConnCb(uv_connect_t* req, int status) {
SCliConn* pConn = req->data; SCliConn* pConn = req->data;
SCliThrd* pThrd = pConn->hostThrd; SCliThrd* pThrd = pConn->hostThrd;
@ -1627,16 +1612,16 @@ void cliConnCb(uv_connect_t* req, int status) {
if (status != 0) { if (status != 0) {
tDebug("%s conn %p failed to connect to %s, reason:%s", CONN_GET_INST_LABEL(pConn), pConn, pConn->dstAddr, tDebug("%s conn %p failed to connect to %s, reason:%s", CONN_GET_INST_LABEL(pConn), pConn, pConn->dstAddr,
uv_strerror(status)); uv_strerror(status));
// handle err // queue set;
// 1. update statis // transQueueRemoveByFilter(&pConn->reqsToSend, filteGetAll, NULL, &set, -1);
// 2. notifyCb or retry // transQueueRemoveByFilter(&pConn->reqsSentOut, filteGetAll, NULL, &set, -1);
// 3. clear conn and // while (!QUEUE_IS_EMPTY(&set)) {
// cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, pConn->dstAddr); // queue* el = QUEUE_HEAD(&set);
// if (timeout == false) { // SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
// cliHandleFastFail(pConn, status); // cliBuildExeceptMsg(pConn, pReq, &pReq->msg);
// } else if (timeout == true) { // destroyReq(pReq);
// // already deal by timeout
// } // }
cliHandleExcept(pConn, status);
return; return;
} }
pConn->connnected = 1; pConn->connnected = 1;
@ -1894,9 +1879,12 @@ int32_t cliMayGetStateByQid(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn) {
} }
SReqState* pState = taosHashGet(pThrd->pIdConnTable, &qid, sizeof(qid)); SReqState* pState = taosHashGet(pThrd->pIdConnTable, &qid, sizeof(qid));
if (pState == NULL) { if (pState == NULL) {
if (pReq->ctx == NULL || pReq->ctx->ahandle == 0) {
return TSDB_CODE_RPC_STATE_DROPED;
}
tDebug("failed to get statue, qid:%ld", qid); tDebug("failed to get statue, qid:%ld", qid);
// ASSERT(0);
return TSDB_CODE_RPC_ASYNC_IN_PROCESS; return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
} else { } else {
*pConn = pState->conn; *pConn = pState->conn;
@ -1939,6 +1927,11 @@ void cliHandleReq__noShareConn(SCliThrd* pThrd, SCliReq* pReq) {
code = cliMayGetStateByQid(pThrd, pReq, &pConn); code = cliMayGetStateByQid(pThrd, pReq, &pConn);
if (code == 0) { if (code == 0) {
(void)clConnMayUpdateReqCtx(pConn, pReq); (void)clConnMayUpdateReqCtx(pConn, pReq);
} else if (code == TSDB_CODE_RPC_STATE_DROPED) {
STraceId* trace = &pReq->msg.info.traceId;
tWarn("%s failed to get statue, qid:%ld", pInst->label, pReq->msg.info.qId);
destroyReq(pReq);
return;
} }
if (code == TSDB_CODE_RPC_NO_STATE || code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) { if (code == TSDB_CODE_RPC_NO_STATE || code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) {
@ -2217,6 +2210,7 @@ static FORCE_INLINE void destroyReq(void* arg) {
if (pReq == NULL) { if (pReq == NULL) {
return; return;
} }
tDebug("free memory:%p, free ctx: %p", pReq, pReq->ctx); tDebug("free memory:%p, free ctx: %p", pReq, pReq->ctx);
if (pReq->ctx) destroyReqCtx(pReq->ctx); if (pReq->ctx) destroyReqCtx(pReq->ctx);
@ -2802,7 +2796,6 @@ bool cliMayRetry(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) {
tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen); tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
noDelay = cliResetEpset(pCtx, pResp, false); noDelay = cliResetEpset(pCtx, pResp, false);
transFreeMsg(pResp->pCont); transFreeMsg(pResp->pCont);
// transUnrefCliHandle(pConn);
} else if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_INTERNAL_ERROR || } else if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_INTERNAL_ERROR ||
code == TSDB_CODE_SYN_PROPOSE_NOT_READY || code == TSDB_CODE_VND_STOPPED || code == TSDB_CODE_SYN_PROPOSE_NOT_READY || code == TSDB_CODE_VND_STOPPED ||
code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING || code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING ||
@ -2810,16 +2803,13 @@ bool cliMayRetry(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) {
tTrace("code str %s, contlen:%d 1", tstrerror(code), pResp->contLen); tTrace("code str %s, contlen:%d 1", tstrerror(code), pResp->contLen);
noDelay = cliResetEpset(pCtx, pResp, true); noDelay = cliResetEpset(pCtx, pResp, true);
transFreeMsg(pResp->pCont); transFreeMsg(pResp->pCont);
// addConnToPool(pThrd->pool, pConn);
} else if (code == TSDB_CODE_SYN_RESTORING) { } else if (code == TSDB_CODE_SYN_RESTORING) {
tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen); tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
noDelay = cliResetEpset(pCtx, pResp, true); noDelay = cliResetEpset(pCtx, pResp, true);
// addConnToPool(pThrd->pool, pConn);
transFreeMsg(pResp->pCont); transFreeMsg(pResp->pCont);
} else { } else {
tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen); tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
noDelay = cliResetEpset(pCtx, pResp, false); noDelay = cliResetEpset(pCtx, pResp, false);
// addConnToPool(pThrd->pool, pConn);
transFreeMsg(pResp->pCont); transFreeMsg(pResp->pCont);
} }
if (code != TSDB_CODE_RPC_BROKEN_LINK && code != TSDB_CODE_RPC_NETWORK_UNAVAIL && code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_RPC_BROKEN_LINK && code != TSDB_CODE_RPC_NETWORK_UNAVAIL && code != TSDB_CODE_SUCCESS) {

View File

@ -61,6 +61,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MODULE_QUIT, "rpc module already qu
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_ASYNC_MODULE_QUIT, "rpc async module already quit") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_ASYNC_MODULE_QUIT, "rpc async module already quit")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_ASYNC_IN_PROCESS, "rpc async in process") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_ASYNC_IN_PROCESS, "rpc async in process")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NO_STATE, "rpc no state") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NO_STATE, "rpc no state")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_STATE_DROPED, "rpc state already dropped")
//common & util //common & util
TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized") TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized")