From 4be196491e6671e383187bc703a826cbc87f9bc2 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 5 Dec 2022 22:39:18 +0800 Subject: [PATCH 1/4] add fail-fast --- include/libs/transport/trpc.h | 6 +++ source/libs/transport/inc/transportInt.h | 4 ++ source/libs/transport/src/trans.c | 4 ++ source/libs/transport/src/transCli.c | 57 ++++++++++++++++++++++-- 4 files changed, 68 insertions(+), 3 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index d761813db1..87f753e6aa 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -72,6 +72,7 @@ typedef struct SRpcMsg { typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *epset); typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType); typedef bool (*RpcTfp)(int32_t code, tmsg_t msgType); +typedef bool (*RpcFFfp)(tmsg_t msgType); typedef void (*RpcDfp)(void *ahandle); typedef struct SRpcInit { @@ -90,6 +91,9 @@ typedef struct SRpcInit { int32_t retryMaxInterval; // retry max interval int64_t retryMaxTimouet; + int32_t failFastThreshold; + int32_t failFastInterval; + int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size int8_t encryption; // encrypt or not @@ -107,6 +111,8 @@ typedef struct SRpcInit { // destroy client ahandle; RpcDfp dfp; + // fail fast fp + RpcFFfp ffp; void *parent; } SRpcInit; diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 833937aa41..57aba67b1d 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -57,10 +57,14 @@ typedef struct { int32_t retryMaxInterval; // retry max interval int32_t retryMaxTimouet; + int32_t failFastThreshold; + int32_t failFastInterval; + void (*cfp)(void* parent, SRpcMsg*, SEpSet*); bool (*retry)(int32_t code, tmsg_t msgType); bool (*startTimer)(int32_t code, tmsg_t msgType); void (*destroyFp)(void* ahandle); + bool (*failFastFp)(tmsg_t msgType); int index; void* parent; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index c6a5cfdc95..0eac12f7c5 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -56,11 +56,15 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->retryMaxInterval = pInit->retryMaxInterval; pRpc->retryMaxTimouet = pInit->retryMaxTimouet; + pRpc->failFastThreshold = pInit->failFastThreshold; + pRpc->failFastInterval = pInit->failFastInterval; + // register callback handle pRpc->cfp = pInit->cfp; pRpc->retry = pInit->rfp; pRpc->startTimer = pInit->tfp; pRpc->destroyFp = pInit->dfp; + pRpc->failFastFp = pInit->ffp; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; if (pRpc->numOfThreads <= 0) { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 7339d487d1..3e9b27dc67 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -84,6 +84,8 @@ typedef struct SCliThrd { SHashObj* fqdn2ipCache; SCvtAddr cvtAddr; + SHashObj* failFastCache; + SCliMsg* stopMsg; bool quit; @@ -96,6 +98,13 @@ typedef struct SCliObj { SCliThrd** pThreadObj; } SCliObj; +typedef struct { + int32_t reinit; + int64_t timestamp; + int32_t count; + int32_t threshold; + int64_t interval; +} SFailFastItem; // conn pool // add expire timeout and capacity limit static void* createConnPool(int size); @@ -853,7 +862,7 @@ void cliSend(SCliConn* pConn) { int status = uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); if (status != 0) { - tGError("%s conn %p failed to sent msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType), + tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType), uv_err_name(status)); cliHandleExcept(pConn); } @@ -863,7 +872,6 @@ _RETURN: } void cliConnCb(uv_connect_t* req, int status) { - // impl later SCliConn* pConn = req->data; SCliThrd* pThrd = pConn->hostThrd; @@ -875,7 +883,29 @@ void cliConnCb(uv_connect_t* req, int status) { } if (status != 0) { - tError("%s conn %p failed to connect server:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status)); + tError("%s conn %p failed to connect to %s:%d, reason:%s", CONN_GET_INST_LABEL(pConn), pConn, pConn->ip, + pConn->port, uv_strerror(status)); + SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0); + if (REQUEST_NO_RESP(&pMsg->msg)) { + char* ip = pConn->ip; + uint32_t port = pConn->port; + char key[TSDB_FQDN_LEN + 64] = {0}; + CONN_CONSTRUCT_HASH_KEY(key, ip, port); + + SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key)); + int64_t cTimestamp = taosGetTimestampMs(); + if (item != NULL) { + if (cTimestamp - item->timestamp < ((STrans*)pThrd->pTransInst)->failFastInterval) { + item->count++; + } else { + item->count = 1; + item->timestamp = cTimestamp; + } + } else { + SFailFastItem item = {.count = 1, .timestamp = cTimestamp}; + taosHashPut(pThrd->failFastCache, key, strlen(key), &item, sizeof(SFailFastItem)); + } + } cliHandleExcept(pConn); return; } @@ -1027,6 +1057,25 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { return; } + if (REQUEST_NO_RESP(&pMsg->msg)) { + char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet); + uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); + char key[TSDB_FQDN_LEN + 64] = {0}; + CONN_CONSTRUCT_HASH_KEY(key, ip, port); + + SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key)); + if (item != NULL) { + int32_t elapse = taosGetTimestampMs() - item->timestamp; + if (item->count >= pTransInst->failFastThreshold && elapse <= pTransInst->failFastInterval) { + STraceId* trace = &(pMsg->msg.info.traceId); + tGTrace("%s, msg %p cancel to send, reason: failed to connect %s:%d: count: %d, at %d", pTransInst->label, pMsg, + ip, port, item->count, elapse); + destroyCmsg(pMsg); + return; + } + } + } + bool ignore = false; SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore); if (ignore == true) { @@ -1299,6 +1348,8 @@ static SCliThrd* createThrdObj(void* trans) { pThrd->destroyAhandleFp = pTransInst->destroyFp; pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + pThrd->quit = false; return pThrd; } From 9ce9d2150748310480d6e3bd3f563707b1e98774 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 6 Dec 2022 10:30:14 +0800 Subject: [PATCH 2/4] support fail fast --- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 9 +++++++++ source/libs/transport/src/transCli.c | 8 +++++--- source/os/src/osTime.c | 2 +- source/util/src/tlog.c | 2 +- 4 files changed, 16 insertions(+), 5 deletions(-) diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 12aba130d5..04c12abcf9 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -48,6 +48,11 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { return (*msgFp)(pWrapper->pMgmt, pMsg); } +static bool dmFailFastFp(tmsg_t msgType) { + // add more msg type later + return msgType == TDMT_SYNC_HEARTBEAT; +} + static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { SDnodeTrans *pTrans = &pDnode->trans; int32_t code = -1; @@ -260,6 +265,10 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.retryMaxInterval = tsRedirectMaxPeriod; rpcInit.retryMaxTimouet = tsMaxRetryWaitTime; + rpcInit.failFastInterval = 1000; // interval threshold(ms) + rpcInit.failFastThreshold = 3; // failed threshold + rpcInit.ffp = dmFailFastFp; + pTrans->clientRpc = rpcOpen(&rpcInit); if (pTrans->clientRpc == NULL) { dError("failed to init dnode rpc client"); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 3e9b27dc67..c74d9839c8 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -886,7 +886,8 @@ void cliConnCb(uv_connect_t* req, int status) { tError("%s conn %p failed to connect to %s:%d, reason:%s", CONN_GET_INST_LABEL(pConn), pConn, pConn->ip, pConn->port, uv_strerror(status)); SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0); - if (REQUEST_NO_RESP(&pMsg->msg)) { + STrans* pTransInst = pThrd->pTransInst; + if (REQUEST_NO_RESP(&pMsg->msg) && (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) { char* ip = pConn->ip; uint32_t port = pConn->port; char key[TSDB_FQDN_LEN + 64] = {0}; @@ -895,7 +896,7 @@ void cliConnCb(uv_connect_t* req, int status) { SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key)); int64_t cTimestamp = taosGetTimestampMs(); if (item != NULL) { - if (cTimestamp - item->timestamp < ((STrans*)pThrd->pTransInst)->failFastInterval) { + if (cTimestamp - item->timestamp < pTransInst->failFastInterval) { item->count++; } else { item->count = 1; @@ -1057,7 +1058,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { return; } - if (REQUEST_NO_RESP(&pMsg->msg)) { + if (REQUEST_NO_RESP(&pMsg->msg) && (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) { char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet); uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); char key[TSDB_FQDN_LEN + 64] = {0}; @@ -1376,6 +1377,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { taosMemoryFree(pThrd->prepare); taosMemoryFree(pThrd->loop); taosHashCleanup(pThrd->fqdn2ipCache); + taosHashCleanup(pThrd->failFastCache); taosMemoryFree(pThrd); } diff --git a/source/os/src/osTime.c b/source/os/src/osTime.c index 68dfba14e9..cd4324a592 100644 --- a/source/os/src/osTime.c +++ b/source/os/src/osTime.c @@ -572,7 +572,7 @@ int32_t taosClockGetTime(int clock_id, struct timespec *pTS) { offsetInitFinished = true; } else { while (!offsetInitFinished) - ; // Ensure initialization is completed. + ; // Ensure initialization is completed. } GetSystemTimeAsFileTime(&f); diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index fc9d90c985..be1db74f1a 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -496,7 +496,7 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons if (!osLogSpaceAvailable()) return; if (!(dflag & DEBUG_FILE) && !(dflag & DEBUG_SCREEN)) return; - char *buffer = taosMemoryMalloc(LOG_MAX_LINE_DUMP_BUFFER_SIZE); + char *buffer = taosMemoryMalloc(LOG_MAX_LINE_DUMP_BUFFER_SIZE); int32_t len = taosBuildLogHead(buffer, flags); va_list argpointer; From 4e87bcc97af1008d47f8c2034032e8c9dee23757 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 6 Dec 2022 10:53:31 +0800 Subject: [PATCH 3/4] support fail fast --- source/libs/transport/src/transCli.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index c74d9839c8..dad3806d3f 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -896,7 +896,8 @@ void cliConnCb(uv_connect_t* req, int status) { SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key)); int64_t cTimestamp = taosGetTimestampMs(); if (item != NULL) { - if (cTimestamp - item->timestamp < pTransInst->failFastInterval) { + int32_t elapse = cTimestamp - item->timestamp; + if (elapse >= 0 && elapse <= pTransInst->failFastInterval) { item->count++; } else { item->count = 1; @@ -1066,8 +1067,8 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key)); if (item != NULL) { - int32_t elapse = taosGetTimestampMs() - item->timestamp; - if (item->count >= pTransInst->failFastThreshold && elapse <= pTransInst->failFastInterval) { + int32_t elapse = (int32_t)(taosGetTimestampMs() - item->timestamp); + if (item->count >= pTransInst->failFastThreshold && (elapse >= 0 && elapse <= pTransInst->failFastInterval)) { STraceId* trace = &(pMsg->msg.info.traceId); tGTrace("%s, msg %p cancel to send, reason: failed to connect %s:%d: count: %d, at %d", pTransInst->label, pMsg, ip, port, item->count, elapse); From 0254e90ce61e9804c177c25d7398ed182fdab853 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 6 Dec 2022 13:42:59 +0800 Subject: [PATCH 4/4] avoid null pointer --- source/libs/transport/src/transCli.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index dad3806d3f..187e17fc30 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -887,7 +887,8 @@ void cliConnCb(uv_connect_t* req, int status) { pConn->port, uv_strerror(status)); SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0); STrans* pTransInst = pThrd->pTransInst; - if (REQUEST_NO_RESP(&pMsg->msg) && (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) { + if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) && + (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) { char* ip = pConn->ip; uint32_t port = pConn->port; char key[TSDB_FQDN_LEN + 64] = {0};