refactor transport

This commit is contained in:
yihaoDeng 2024-08-20 11:47:07 +08:00
parent ef3293f316
commit 58569208d5
2 changed files with 133 additions and 126 deletions

View File

@ -173,8 +173,8 @@ static void cliSendCb(uv_write_t* req, int status);
// callback after conn to server // callback after conn to server
static void cliConnCb(uv_connect_t* req, int status); static void cliConnCb(uv_connect_t* req, int status);
static void cliAsyncCb(uv_async_t* handle); static void cliAsyncCb(uv_async_t* handle);
static void cliIdleCb(uv_idle_t* handle); // static void cliIdleCb(uv_idle_t* handle);
static void cliPrepareCb(uv_prepare_t* handle); // static void cliPrepareCb(uv_prepare_t* handle);
static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd); static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd);
static void cliSendBatchCb(uv_write_t* req, int status); static void cliSendBatchCb(uv_write_t* req, int status);
@ -185,7 +185,7 @@ static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead);
static int32_t allocConnRef(SCliConn* conn, bool update); static int32_t allocConnRef(SCliConn* conn, bool update);
static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg); static int cliNotifyCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg);
void cliResetConnTimer(SCliConn* conn); void cliResetConnTimer(SCliConn* conn);
static int32_t cliCreateConn(SCliThrd* thrd, SCliConn** pCliConn); static int32_t cliCreateConn(SCliThrd* thrd, SCliConn** pCliConn);
@ -200,10 +200,11 @@ static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMs
static void cliDestroyBatch(SCliBatch* pBatch); static void cliDestroyBatch(SCliBatch* pBatch);
// cli util func // cli util func
static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx); static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx);
static FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr); static FORCE_INLINE int32_t cliMayCvtFqdnToIp(SEpSet* pEpSet, const SCvtAddr* pCvtAddr);
static FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* resp); static FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* resp);
static FORCE_INLINE int32_t cliBuildExceptRespAndNotifyCb(SCliThrd* pThrd, SCliMsg* pMsg, int32_t code);
static FORCE_INLINE int32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn, uint32_t* ipaddr); static FORCE_INLINE int32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn, uint32_t* ipaddr);
static FORCE_INLINE int32_t cliUpdateFqdnCache(SHashObj* cache, char* fqdn); static FORCE_INLINE int32_t cliUpdateFqdnCache(SHashObj* cache, char* fqdn);
@ -216,7 +217,7 @@ static void cliHandleExcept(SCliConn* conn, int32_t code);
static void cliReleaseUnfinishedMsg(SCliConn* conn); static void cliReleaseUnfinishedMsg(SCliConn* conn);
static void cliHandleFastFail(SCliConn* pConn, int status); static void cliHandleFastFail(SCliConn* pConn, int status);
static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd, int32_t code); static void doNotifyCb(SCliMsg* pMsg, SCliThrd* pThrd, int32_t code);
// handle req from app // handle req from app
static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd); static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd);
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd); static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd);
@ -240,7 +241,7 @@ static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx);
static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key); static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key);
static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn); static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn);
static void delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn); static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn);
// thread obj // thread obj
static int32_t createThrdObj(void* trans, SCliThrd** pThrd); static int32_t createThrdObj(void* trans, SCliThrd** pThrd);
@ -260,11 +261,11 @@ typedef struct {
} SHeap; } SHeap;
int32_t compareHeapNode(const HeapNode* a, const HeapNode* b); int32_t compareHeapNode(const HeapNode* a, const HeapNode* b);
int transHeapInit(SHeap* heap, int32_t (*cmpFunc)(const HeapNode* a, const HeapNode* b)); int32_t transHeapInit(SHeap* heap, int32_t (*cmpFunc)(const HeapNode* a, const HeapNode* b));
void transHeapDestroy(SHeap* heap); void transHeapDestroy(SHeap* heap);
int transHeapGet(SHeap* heap, SCliConn** p); int32_t transHeapGet(SHeap* heap, SCliConn** p);
int transHeapInsert(SHeap* heap, SCliConn* p); int32_t transHeapInsert(SHeap* heap, SCliConn* p);
int transHeapDelete(SHeap* heap, SCliConn* p); int32_t transHeapDelete(SHeap* heap, SCliConn* p);
#define CLI_RELEASE_UV(loop) \ #define CLI_RELEASE_UV(loop) \
do { \ do { \
@ -437,7 +438,7 @@ bool cliShouldAddConnToPool(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd; SCliThrd* pThrd = conn->hostThrd;
bool empty = transQueueEmpty(&conn->cliMsgs); bool empty = transQueueEmpty(&conn->cliMsgs);
if (empty) { if (empty) {
delConnFromHeapCache(pThrd->connHeapCache, conn); (void)delConnFromHeapCache(pThrd->connHeapCache, conn);
} }
return empty; return empty;
@ -478,7 +479,7 @@ void cliHandleResp_shareConn(SCliConn* conn) {
transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL; transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
STraceId* trace = &transMsg.info.traceId; STraceId* trace = &transMsg.info.traceId;
int32_t ret = cliAppCb(conn, &transMsg, pMsg); int32_t ret = cliNotifyCb(conn, &transMsg, pMsg);
if (ret != 0) { if (ret != 0) {
return; return;
} else { } else {
@ -577,7 +578,7 @@ void cliHandleResp(SCliConn* conn) {
} }
if (pMsg == NULL || (pMsg && pMsg->type != Release)) { if (pMsg == NULL || (pMsg && pMsg->type != Release)) {
if (cliAppCb(conn, &transMsg, pMsg) != 0) { if (cliNotifyCb(conn, &transMsg, pMsg) != 0) {
return; return;
} }
} }
@ -675,7 +676,7 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) {
if (pMsg == NULL || (pMsg && pMsg->type != Release)) { if (pMsg == NULL || (pMsg && pMsg->type != Release)) {
int64_t refId = (pMsg == NULL ? 0 : (int64_t)(pMsg->msg.info.handle)); int64_t refId = (pMsg == NULL ? 0 : (int64_t)(pMsg->msg.info.handle));
cliDestroyMsgInExhandle(refId); cliDestroyMsgInExhandle(refId);
if (cliAppCb(pConn, &transMsg, pMsg) != 0) { if (cliNotifyCb(pConn, &transMsg, pMsg) != 0) {
return; return;
} }
} }
@ -735,7 +736,7 @@ void* destroyConnPool(SCliThrd* pThrd) {
transDQCancel(pThrd->waitConnQueue, pMsg->ctx->task); transDQCancel(pThrd->waitConnQueue, pMsg->ctx->task);
pMsg->ctx->task = NULL; pMsg->ctx->task = NULL;
doNotifyApp(pMsg, pThrd, TSDB_CODE_RPC_MAX_SESSIONS); doNotifyCb(pMsg, pThrd, TSDB_CODE_RPC_MAX_SESSIONS);
} }
taosMemoryFree(msglist); taosMemoryFree(msglist);
@ -817,14 +818,14 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
if (pInst->notWaitAvaliableConn || (pInst->noDelayFp != NULL && pInst->noDelayFp((*pMsg)->msg.msgType))) { if (pInst->notWaitAvaliableConn || (pInst->noDelayFp != NULL && pInst->noDelayFp((*pMsg)->msg.msgType))) {
tDebug("%s msg %s not to send, reason: %s", pInst->label, TMSG_INFO((*pMsg)->msg.msgType), tDebug("%s msg %s not to send, reason: %s", pInst->label, TMSG_INFO((*pMsg)->msg.msgType),
tstrerror(TSDB_CODE_RPC_NETWORK_BUSY)); tstrerror(TSDB_CODE_RPC_NETWORK_BUSY));
doNotifyApp(*pMsg, pThrd, TSDB_CODE_RPC_NETWORK_BUSY); doNotifyCb(*pMsg, pThrd, TSDB_CODE_RPC_NETWORK_BUSY);
*pMsg = NULL; *pMsg = NULL;
return NULL; return NULL;
} }
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
if (arg == NULL) { if (arg == NULL) {
doNotifyApp(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); doNotifyCb(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY);
*pMsg = NULL; *pMsg = NULL;
return NULL; return NULL;
} }
@ -834,7 +835,7 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
SDelayTask* task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pInst->timeToGetConn); SDelayTask* task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pInst->timeToGetConn);
if (task == NULL) { if (task == NULL) {
taosMemoryFree(arg); taosMemoryFree(arg);
doNotifyApp(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); doNotifyCb(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY);
*pMsg = NULL; *pMsg = NULL;
return NULL; return NULL;
} }
@ -847,7 +848,7 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
if (!(QUEUE_IS_EMPTY(&(list)->msgQ))) { if (!(QUEUE_IS_EMPTY(&(list)->msgQ))) {
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
if (arg == NULL) { if (arg == NULL) {
doNotifyApp(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); doNotifyCb(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY);
*pMsg = NULL; *pMsg = NULL;
return NULL; return NULL;
} }
@ -857,7 +858,7 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
SDelayTask* task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pInst->timeToGetConn); SDelayTask* task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pInst->timeToGetConn);
if (task == NULL) { if (task == NULL) {
taosMemoryFree(arg); taosMemoryFree(arg);
doNotifyApp(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); doNotifyCb(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY);
*pMsg = NULL; *pMsg = NULL;
return NULL; return NULL;
} }
@ -1255,7 +1256,7 @@ static void cliHandleBatch_shareConnExcept(SCliConn* conn) {
transMsg.info.ahandle = pCtx->ahandle; transMsg.info.ahandle = pCtx->ahandle;
pMsg->seqNum = 0; pMsg->seqNum = 0;
code = cliAppCb(conn, &transMsg, pMsg); code = cliNotifyCb(conn, &transMsg, pMsg);
if (code != 0) { if (code != 0) {
continue; continue;
} else { } else {
@ -1752,7 +1753,7 @@ void cliConnCb(uv_connect_t* req, int status) {
return cliSend(pConn); return cliSend(pConn);
} }
static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd, int32_t code) { static void doNotifyCb(SCliMsg* pMsg, SCliThrd* pThrd, int32_t code) {
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
STrans* pInst = pThrd->pInst; STrans* pInst = pThrd->pInst;
@ -1891,14 +1892,22 @@ SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr)
} }
return conn; return conn;
} }
FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) { FORCE_INLINE int32_t cliMayCvtFqdnToIp(SEpSet* pEpSet, const SCvtAddr* pCvtAddr) {
if (pCvtAddr->cvt == false) { if (pCvtAddr->cvt == false) {
return; if (EPSET_IS_VALID(pEpSet)) {
return 0;
} else {
return TSDB_CODE_RPC_FQDN_ERROR;
}
} }
if (pEpSet->numOfEps == 1 && strncmp(pEpSet->eps[0].fqdn, pCvtAddr->fqdn, TSDB_FQDN_LEN) == 0) { if (pEpSet->numOfEps == 1 && strncmp(pEpSet->eps[0].fqdn, pCvtAddr->fqdn, TSDB_FQDN_LEN) == 0) {
memset(pEpSet->eps[0].fqdn, 0, TSDB_FQDN_LEN); memset(pEpSet->eps[0].fqdn, 0, TSDB_FQDN_LEN);
memcpy(pEpSet->eps[0].fqdn, pCvtAddr->ip, TSDB_FQDN_LEN); memcpy(pEpSet->eps[0].fqdn, pCvtAddr->ip, TSDB_FQDN_LEN);
} }
if (EPSET_IS_VALID(pEpSet)) {
return 0;
}
return TSDB_CODE_RPC_FQDN_ERROR;
} }
FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) { FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) {
@ -1906,11 +1915,10 @@ FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) {
return transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet) ? false : true; return transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet) ? false : true;
} }
FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) { FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) {
if (pMsg == NULL) return -1; if (pMsg == NULL) return -1;
// memset(pResp, 0, sizeof(STransMsg));
if (pResp->code == 0) { if (pResp->code == 0) {
pResp->code = TSDB_CODE_RPC_BROKEN_LINK; pResp->code = TSDB_CODE_RPC_BROKEN_LINK;
} }
@ -1921,6 +1929,20 @@ FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) {
return 0; return 0;
} }
FORCE_INLINE int32_t cliBuildExceptRespAndNotifyCb(SCliThrd* pThrd, SCliMsg* pMsg, int32_t code) {
STrans* pInst = pThrd->pInst;
STransMsg resp = {.code = code};
code = cliBuildExceptResp(pMsg, &resp);
if (code != 0) {
return code;
}
resp.info.cliVer = pInst->compatibilityVer;
pInst->cfp(pInst->parent, &resp, NULL);
return 0;
}
static FORCE_INLINE int32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn, uint32_t* ip) { static FORCE_INLINE int32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn, uint32_t* ip) {
int32_t code = 0; int32_t code = 0;
uint32_t addr = 0; uint32_t addr = 0;
@ -1987,7 +2009,7 @@ static void doFreeTimeoutMsg(void* param) {
STraceId* trace = &pMsg->msg.info.traceId; STraceId* trace = &pMsg->msg.info.traceId;
tGTrace("%s msg %s cannot get available conn after timeout", pInst->label, TMSG_INFO(pMsg->msg.msgType)); tGTrace("%s msg %s cannot get available conn after timeout", pInst->label, TMSG_INFO(pMsg->msg.msgType));
doNotifyApp(pMsg, pThrd, TSDB_CODE_RPC_MAX_SESSIONS); doNotifyCb(pMsg, pThrd, TSDB_CODE_RPC_MAX_SESSIONS);
taosMemoryFree(arg); taosMemoryFree(arg);
} }
@ -2044,28 +2066,33 @@ static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn) {
} }
return transHeapInsert(p, pConn); return transHeapInsert(p, pConn);
} }
static void delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn) {
static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn) {
SHeap* p = taosHashGet(pConnHeapCache, pConn->dstAddr, strlen(pConn->dstAddr)); SHeap* p = taosHashGet(pConnHeapCache, pConn->dstAddr, strlen(pConn->dstAddr));
if (p == NULL) { if (p == NULL) {
tDebug("failed to get heap cache for key:%s, no need to del", pConn->dstAddr); tDebug("failed to get heap cache for key:%s, no need to del", pConn->dstAddr);
return; return 0;
} }
int32_t code = transHeapDelete(p, pConn); int32_t code = transHeapDelete(p, pConn);
if (code != 0) { if (code != 0) {
tDebug("failed to delete conn %p from heap cache since %s", pConn, tstrerror(code)); tDebug("failed to delete conn %p from heap cache since %s", pConn, tstrerror(code));
} }
return code;
} }
void cliHandleReq__shareConn(SCliMsg* pMsg, SCliThrd* pThrd) { void cliHandleReq__shareConn(SCliMsg* pMsg, SCliThrd* pThrd) {
int32_t code = 0; int32_t code = 0;
STraceId* trace = &pMsg->msg.info.traceId; STraceId* trace = &pMsg->msg.info.traceId;
STrans* pInst = pThrd->pInst; STrans* pInst = pThrd->pInst;
cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr); code = cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr);
if (!EPSET_IS_VALID(&pMsg->ctx->epSet)) { if (code != 0) {
// notifyCb
destroyCmsg(pMsg); destroyCmsg(pMsg);
return; return;
} }
char addr[TSDB_FQDN_LEN + 64] = {0}; char addr[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(addr, EPSET_GET_INUSE_IP(&pMsg->ctx->epSet), EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet)); CONN_CONSTRUCT_HASH_KEY(addr, EPSET_GET_INUSE_IP(&pMsg->ctx->epSet), EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet));
@ -2093,18 +2120,14 @@ void cliHandleReq__shareConn(SCliMsg* pMsg, SCliThrd* pThrd) {
transQueuePush(&pConn->cliMsgs, pMsg); transQueuePush(&pConn->cliMsgs, pMsg);
return cliDoConn(pThrd, pConn, EPSET_GET_INUSE_IP(&pMsg->ctx->epSet), EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet)); return cliDoConn(pThrd, pConn, EPSET_GET_INUSE_IP(&pMsg->ctx->epSet), EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet));
} }
void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
int32_t code = 0; void cliHandleReq__noShareConn(SCliMsg* pMsg, SCliThrd* pThrd) {
int32_t code;
STrans* pInst = pThrd->pInst; STrans* pInst = pThrd->pInst;
code = cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr);
if (pInst->shareConn == 1) { if (code != 0) {
return cliHandleReq__shareConn(pMsg, pThrd); // notifyCb
}
cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr);
if (!EPSET_IS_VALID(&pMsg->ctx->epSet)) {
destroyCmsg(pMsg); destroyCmsg(pMsg);
return;
} }
char* fqdn = EPSET_GET_INUSE_IP(&pMsg->ctx->epSet); char* fqdn = EPSET_GET_INUSE_IP(&pMsg->ctx->epSet);
@ -2117,12 +2140,8 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
if (ignore == true) { if (ignore == true) {
// persist conn already release by server // persist conn already release by server
STransMsg resp = {0}; STransMsg resp = {0};
(void)cliBuildExceptResp(pMsg, &resp);
// refactorr later
resp.info.cliVer = pInst->compatibilityVer;
if (pMsg->type != Release) { if (pMsg->type != Release) {
pInst->cfp(pInst->parent, &resp, NULL); (void)cliBuildExceptRespAndNotifyCb(pThrd, pMsg, 0);
} }
destroyCmsg(pMsg); destroyCmsg(pMsg);
return; return;
@ -2140,13 +2159,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
code = cliCreateConn(pThrd, &conn); code = cliCreateConn(pThrd, &conn);
if (code != 0) { if (code != 0) {
tError("%s failed to create conn, reason:%s", pInst->label, tstrerror(code)); tError("%s failed to create conn, reason:%s", pInst->label, tstrerror(code));
STransMsg resp = {.code = code}; (void)cliBuildExceptRespAndNotifyCb(pThrd, pMsg, code);
(void)cliBuildExceptResp(pMsg, &resp);
resp.info.cliVer = pInst->compatibilityVer;
if (pMsg->type != Release) {
pInst->cfp(pInst->parent, &resp, NULL);
}
destroyCmsg(pMsg); destroyCmsg(pMsg);
return; return;
} }
@ -2207,6 +2220,15 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
tGTrace("%s conn %p ready", pInst->label, conn); tGTrace("%s conn %p ready", pInst->label, conn);
} }
void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
STrans* pInst = pThrd->pInst;
if (pInst->shareConn == 1) {
return cliHandleReq__shareConn(pMsg, pThrd);
} else {
return cliHandleReq__noShareConn(pMsg, pThrd);
}
}
static void cliDealReq(queue* wq, SCliThrd* pThrd) { static void cliDealReq(queue* wq, SCliThrd* pThrd) {
int count = 0; int count = 0;
@ -2243,6 +2265,7 @@ SCliBatch* cliGetHeadFromList(SCliBatchList* pList) {
static void cliBuildBatch(SCliMsg* pMsg, queue* h, SCliThrd* pThrd) { static void cliBuildBatch(SCliMsg* pMsg, queue* h, SCliThrd* pThrd) {
STrans* pInst = pThrd->pInst; STrans* pInst = pThrd->pInst;
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
return;
} }
static int32_t createBatchList(SCliBatchList** ppBatchList, char* key, char* ip, uint32_t port) { static int32_t createBatchList(SCliBatchList** ppBatchList, char* key, char* ip, uint32_t port) {
SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList)); SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList));
@ -2316,11 +2339,6 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
// if (pMsg->type == Normal) {
// cliBuildBatch(pMsg, h, pThrd);
// continue;
// // count++;
// }
if (pMsg->type == Normal && REQUEST_NO_RESP(&pMsg->msg)) { if (pMsg->type == Normal && REQUEST_NO_RESP(&pMsg->msg)) {
cliBuildBatch(pMsg, h, pThrd); cliBuildBatch(pMsg, h, pThrd);
continue; continue;
@ -2342,6 +2360,7 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
destroyCmsg(pMsg); destroyCmsg(pMsg);
continue; continue;
} }
pBatchList->batchLenLimit = pInst->batchSize; pBatchList->batchLenLimit = pInst->batchSize;
SCliBatch* pBatch = NULL; SCliBatch* pBatch = NULL;
@ -2416,33 +2435,6 @@ static void cliAsyncCb(uv_async_t* handle) {
if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd); if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd);
} }
static void cliPrepareCb(uv_prepare_t* handle) {
SCliThrd* thrd = handle->data;
tTrace("prepare work start");
SAsyncPool* pool = thrd->asyncPool;
for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]);
SAsyncItem* item = async->data;
queue wq;
(void)taosThreadMutexLock(&item->mtx);
QUEUE_MOVE(&item->qmsg, &wq);
(void)taosThreadMutexUnlock(&item->mtx);
int count = 0;
while (!QUEUE_IS_EMPTY(&wq)) {
queue* h = QUEUE_HEAD(&wq);
QUEUE_REMOVE(h);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
(*cliAsyncHandle[pMsg->type])(pMsg, thrd);
count++;
}
}
tTrace("prepare work end");
if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd);
}
void cliDestroyConnMsgs(SCliConn* conn, bool destroy) { void cliDestroyConnMsgs(SCliConn* conn, bool destroy) {
transCtxCleanup(&conn->ctx); transCtxCleanup(&conn->ctx);
@ -2464,12 +2456,9 @@ void cliConnFreeMsgs(SCliConn* conn) {
continue; continue;
} }
STransMsg resp = {0}; if (cliBuildExceptRespAndNotifyCb(pThrd, cmsg, 0) != 0) {
if (-1 == cliBuildExceptResp(cmsg, &resp)) {
continue; continue;
} }
resp.info.cliVer = pInst->compatibilityVer;
pInst->cfp(pInst->parent, &resp, NULL);
cmsg->ctx->ahandle = NULL; cmsg->ctx->ahandle = NULL;
} }
@ -2637,19 +2626,6 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) {
TAOS_CHECK_GOTO(code, NULL, _end); TAOS_CHECK_GOTO(code, NULL, _end);
} }
pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t));
if (pThrd->prepare == NULL) {
tError("failed to create prepre since:%s", tstrerror(code));
TAOS_CHECK_GOTO(code, NULL, _end);
}
code = uv_prepare_init(pThrd->loop, pThrd->prepare);
if (code != 0) {
tError("failed to create prepre since:%s", uv_err_name(code));
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, NULL, _end);
}
pThrd->prepare->data = pThrd;
int32_t timerSize = 64; int32_t timerSize = 64;
pThrd->timerList = taosArrayInit(timerSize, sizeof(void*)); pThrd->timerList = taosArrayInit(timerSize, sizeof(void*));
if (pThrd->timerList == NULL) { if (pThrd->timerList == NULL) {
@ -2716,7 +2692,6 @@ _end:
if (pThrd) { if (pThrd) {
(void)uv_loop_close(pThrd->loop); (void)uv_loop_close(pThrd->loop);
taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd->loop);
taosMemoryFree(pThrd->prepare);
(void)taosThreadMutexDestroy(&pThrd->msgMtx); (void)taosThreadMutexDestroy(&pThrd->msgMtx);
transAsyncPoolDestroy(pThrd->asyncPool); transAsyncPoolDestroy(pThrd->asyncPool);
for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) { for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) {
@ -2840,7 +2815,7 @@ static FORCE_INLINE void doDelayTask(void* param) {
taosMemoryFree(arg); taosMemoryFree(arg);
} }
static void doCloseIdleConn(void* param) { static FORCE_INLINE void doCloseIdleConn(void* param) {
STaskArg* arg = param; STaskArg* arg = param;
SCliConn* conn = arg->param1; SCliConn* conn = arg->param1;
tDebug("%s conn %p idle, close it", CONN_GET_INST_LABEL(conn), conn); tDebug("%s conn %p idle, close it", CONN_GET_INST_LABEL(conn), conn);
@ -2848,7 +2823,7 @@ static void doCloseIdleConn(void* param) {
cliDestroyConn(conn, true); cliDestroyConn(conn, true);
taosMemoryFree(arg); taosMemoryFree(arg);
} }
static void cliPerfLog_schedMsg(SCliMsg* pMsg, char* label) { static FORCE_INLINE void cliPerfLog_schedMsg(SCliMsg* pMsg, char* label) {
if (!(rpcDebugFlag & DEBUG_DEBUG)) { if (!(rpcDebugFlag & DEBUG_DEBUG)) {
return; return;
} }
@ -2856,21 +2831,30 @@ static void cliPerfLog_schedMsg(SCliMsg* pMsg, char* label) {
STraceId* trace = &pMsg->msg.info.traceId; STraceId* trace = &pMsg->msg.info.traceId;
char tbuf[512] = {0}; char tbuf[512] = {0};
(void)epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf)); (void)epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf));
tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", label, tbuf, pCtx->retryStep, tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", label, tbuf, pCtx->retryStep,
pCtx->retryNextInterval); pCtx->retryNextInterval);
return; return;
} }
static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { static FORCE_INLINE int32_t cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
STrans* pInst = pThrd->pInst; STrans* pInst = pThrd->pInst;
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
cliPerfLog_schedMsg(pMsg, transLabel(pThrd->pInst)); cliPerfLog_schedMsg(pMsg, transLabel(pThrd->pInst));
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
if (arg == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
arg->param1 = pMsg; arg->param1 = pMsg;
arg->param2 = pThrd; arg->param2 = pThrd;
(void)transDQSched(pThrd->delayQueue, doDelayTask, arg, pCtx->retryNextInterval); SDelayTask* pTask = transDQSched(pThrd->delayQueue, doDelayTask, arg, pCtx->retryNextInterval);
if (pTask == NULL) {
taosMemoryFree(arg);
return TSDB_CODE_OUT_OF_MEMORY;
}
return 0;
} }
FORCE_INLINE bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) { FORCE_INLINE bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) {
@ -2963,18 +2947,17 @@ bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp, bool hasEpSet) {
} }
return noDelay; return noDelay;
} }
bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
SCliThrd* pThrd = pConn->hostThrd;
STrans* pInst = pThrd->pInst;
STransConnCtx* pCtx = pMsg->ctx; int8_t cliRetryShouldRetry(STrans* pInst, STransMsg* pResp) {
int32_t code = pResp->code; bool retry = pInst->retry != NULL ? pInst->retry(pResp->code, pResp->msgType - 1) : false;
bool retry = pInst->retry != NULL ? pInst->retry(code, pResp->msgType - 1) : false;
if (retry == false) { if (retry == false) {
return false; return 0;
} }
return 1;
}
void cliRetryMayInitCtx(STrans* pInst, SCliMsg* pMsg) {
STransConnCtx* pCtx = pMsg->ctx;
if (!pCtx->retryInit) { if (!pCtx->retryInit) {
pCtx->retryMinInterval = pInst->retryMinInterval; pCtx->retryMinInterval = pInst->retryMinInterval;
pCtx->retryMaxInterval = pInst->retryMaxInterval; pCtx->retryMaxInterval = pInst->retryMaxInterval;
@ -2985,15 +2968,32 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
pCtx->retryStep = 0; pCtx->retryStep = 0;
pCtx->retryInit = true; pCtx->retryInit = true;
pCtx->retryCode = TSDB_CODE_SUCCESS; pCtx->retryCode = TSDB_CODE_SUCCESS;
// already retry, not use handle specified by app;
pMsg->msg.info.handle = 0; pMsg->msg.info.handle = 0;
} }
}
int32_t cliRetryIsTimeout(STrans* pInst, SCliMsg* pMsg) {
STransConnCtx* pCtx = pMsg->ctx;
if (pCtx->retryMaxTimeout != -1 && taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) {
return 1;
}
return 0;
}
bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
SCliThrd* pThrd = pConn->hostThrd;
STrans* pInst = pThrd->pInst;
if (-1 != pCtx->retryMaxTimeout && taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) { STransConnCtx* pCtx = pMsg->ctx;
int32_t code = pResp->code;
cliRetryMayInitCtx(pInst, pMsg);
if (!cliRetryShouldRetry(pInst, pResp)) {
return false; return false;
} }
if (cliRetryIsTimeout(pInst, pMsg)) {
return false;
}
// code, msgType // code, msgType
// A: epset,leader, not self // A: epset,leader, not self
@ -3045,7 +3045,12 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
} }
pMsg->sent = 0; pMsg->sent = 0;
cliSchedMsgToNextNode(pMsg, pThrd); code = cliSchedMsgToNextNode(pMsg, pThrd);
if (code != 0) {
pResp->code = code;
tError("failed to sched msg to next node, reason:%s", tstrerror(code));
return false;
}
return true; return true;
} }
void cliMayReSetRespCode(STransConnCtx* pCtx, STransMsg* pResp) { void cliMayReSetRespCode(STransConnCtx* pCtx, STransMsg* pResp) {
@ -3067,7 +3072,7 @@ void cliMayReSetRespCode(STransConnCtx* pCtx, STransMsg* pResp) {
} }
} }
} }
int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { int cliNotifyCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
SCliThrd* pThrd = pConn->hostThrd; SCliThrd* pThrd = pConn->hostThrd;
STrans* pInst = pThrd->pInst; STrans* pInst = pThrd->pInst;
@ -3277,7 +3282,7 @@ int32_t transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, S
int64_t handle = (int64_t)pReq->info.handle; int64_t handle = (int64_t)pReq->info.handle;
SCliThrd* pThrd = transGetWorkThrd(pInst, handle); SCliThrd* pThrd = transGetWorkThrd(pInst, handle);
if (pThrd == NULL) { if (pThrd == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_RPC_BROKEN_LINK, NULL, _exception;); TAOS_CHECK_GOTO(TSDB_CODE_RPC_BROKEN_LINK, NULL, _exception);
} }
if (handle != 0) { if (handle != 0) {
@ -3702,6 +3707,7 @@ _exception:
return code; return code;
} }
// conn heap
int32_t compareHeapNode(const HeapNode* a, const HeapNode* b) { int32_t compareHeapNode(const HeapNode* a, const HeapNode* b) {
SCliConn* args1 = container_of(a, SCliConn, node); SCliConn* args1 = container_of(a, SCliConn, node);
SCliConn* args2 = container_of(b, SCliConn, node); SCliConn* args2 = container_of(b, SCliConn, node);
@ -3724,7 +3730,7 @@ void transHeapDestroy(SHeap* heap) {
heapDestroy(heap->heap); heapDestroy(heap->heap);
} }
} }
int transHeapGet(SHeap* heap, SCliConn** p) { int32_t transHeapGet(SHeap* heap, SCliConn** p) {
if (heapSize(heap->heap) == 0) { if (heapSize(heap->heap) == 0) {
*p = NULL; *p = NULL;
return -1; return -1;
@ -3737,7 +3743,7 @@ int transHeapGet(SHeap* heap, SCliConn** p) {
*p = container_of(minNode, SCliConn, node); *p = container_of(minNode, SCliConn, node);
return 0; return 0;
} }
int transHeapInsert(SHeap* heap, SCliConn* p) { int32_t transHeapInsert(SHeap* heap, SCliConn* p) {
// impl later // impl later
if (p->inHeap == 1) { if (p->inHeap == 1) {
return TSDB_CODE_DUP_KEY; return TSDB_CODE_DUP_KEY;
@ -3747,7 +3753,7 @@ int transHeapInsert(SHeap* heap, SCliConn* p) {
p->inHeap = 1; p->inHeap = 1;
return 0; return 0;
} }
int transHeapDelete(SHeap* heap, SCliConn* p) { int32_t transHeapDelete(SHeap* heap, SCliConn* p) {
// impl later // impl later
if (p->inHeap == 0) { if (p->inHeap == 0) {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;

View File

@ -330,6 +330,7 @@ int transAsyncSend(SAsyncPool* pool, queue* q) {
(void)taosThreadMutexLock(&item->mtx); (void)taosThreadMutexLock(&item->mtx);
QUEUE_PUSH(&item->qmsg, q); QUEUE_PUSH(&item->qmsg, q);
(void)taosThreadMutexUnlock(&item->mtx); (void)taosThreadMutexUnlock(&item->mtx);
int ret = uv_async_send(async); int ret = uv_async_send(async);
if (ret != 0) { if (ret != 0) {
tError("failed to send async,reason:%s", uv_err_name(ret)); tError("failed to send async,reason:%s", uv_err_name(ret));