diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index da6d71e07b..9119ae083c 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -187,6 +187,7 @@ typedef struct { uint32_t code; // del later uint32_t msgType; int32_t msgLen; + int32_t seqNum; uint8_t content[0]; // message body starts from here } STransMsgHead; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index d075ace7d5..8ec54167af 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -84,6 +84,7 @@ typedef struct SCliConn { char dst[32]; int64_t refId; + int32_t seq; } SCliConn; typedef struct SCliMsg { @@ -96,6 +97,7 @@ typedef struct SCliMsg { uint64_t st; int sent; //(0: no send, 1: alread sent) queue seqq; + int32_t seqNum; } SCliMsg; typedef struct SCliThrd { @@ -402,6 +404,7 @@ void cliResetConnTimer(SCliConn* conn) { } } void cliHandleBatchResp(SCliConn* conn) { + ASSERT(0); SCliThrd* pThrd = conn->hostThrd; STrans* pTransInst = pThrd->pTransInst; cliResetConnTimer(conn); @@ -430,9 +433,70 @@ void cliHandleBatchResp(SCliConn* conn) { return; } } + +SCliMsg* cliFindMsgBySeqnum(SCliConn* conn, int32_t seqNum) { + SCliMsg* pMsg = NULL; + for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) { + pMsg = transQueueGet(&conn->cliMsgs, i); + if (pMsg->seqNum == seqNum) { + transQueueRm(&conn->cliMsgs, i); + break; + } + } + if (pMsg == NULL) { + ASSERT(0); + } + return pMsg; +} +void cliHandleResp_shareConn(SCliConn* conn) { + SCliThrd* pThrd = conn->hostThrd; + STrans* pTransInst = pThrd->pTransInst; + cliResetConnTimer(conn); + + STransMsgHead* pHead = NULL; + int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead); + + if (msgLen <= 0) { + taosMemoryFree(pHead); + tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn); + return; + } + if (transDecompressMsg((char**)&pHead, msgLen) < 0) { + tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn); + } + pHead->code = htonl(pHead->code); + pHead->msgLen = htonl(pHead->msgLen); + + STransMsg transMsg = {0}; + transMsg.contLen = transContLenFromMsg(pHead->msgLen); + transMsg.pCont = transContFromHead((char*)pHead); + transMsg.code = pHead->code; + transMsg.msgType = pHead->msgType; + transMsg.info.ahandle = NULL; + transMsg.info.traceId = pHead->traceId; + transMsg.info.hasEpSet = pHead->hasEpSet; + transMsg.info.cliVer = htonl(pHead->compatibilityVer); + + SCliMsg* pMsg = cliFindMsgBySeqnum(conn, pHead->seqNum); + pMsg->seqNum = 0; + + STransConnCtx* pCtx = pMsg->ctx; + transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL; + STraceId* trace = &transMsg.info.traceId; + + if (cliAppCb(conn, &transMsg, pMsg) != 0) { + return; + } + + return; +} void cliHandleResp(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; STrans* pTransInst = pThrd->pTransInst; + + if (pTransInst->shareConn) { + return cliHandleResp_shareConn(conn); + } cliResetConnTimer(conn); STransMsgHead* pHead = NULL; @@ -716,6 +780,7 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) { transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task); conn->task = NULL; } + conn->seq++; return conn; } @@ -813,6 +878,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { } cliDestroyConnMsgs(conn, false); + conn->seq = 0; if (conn->list == NULL) { conn->list = taosHashGet((SHashObj*)pool, conn->dstAddr, strlen(conn->dstAddr)); @@ -871,6 +937,7 @@ static int32_t allocConnRef(SCliConn* conn, bool update) { } static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) { + if (handle == 0) return -1; if (update) { transReleaseExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId); @@ -958,7 +1025,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { conn->status = ConnNormal; conn->broken = false; transRefCliHandle(conn); - + conn->seq = 0; allocConnRef(conn, false); return conn; @@ -1078,7 +1145,6 @@ static void cliSendCb(uv_write_t* req, int status) { static void cliSendBatch_shareConnCb(uv_write_t* req, int status) { SCliConn* conn = req->data; - SCliThrd* thrd = conn->hostThrd; if (status != 0) { tDebug("%s conn %p failed to send batch msg, reason:%s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(status)); if (!uv_is_closing((uv_handle_t*)&conn->stream)) { @@ -1093,7 +1159,8 @@ void cliSendBatch_shareConn(SCliConn* pConn) { SCliThrd* pThrd = pConn->hostThrd; STrans* pTransInst = pThrd->pTransInst; int32_t size = transQueueSize(&pConn->cliMsgs); - int32_t totalLen = 0; + + int32_t totalLen = 0; if (size == 0) { tError("%s conn %p not msg to send", pTransInst->label, pConn); cliHandleExcept(pConn); @@ -1101,10 +1168,14 @@ void cliSendBatch_shareConn(SCliConn* pConn) { } uv_buf_t* wb = taosMemoryCalloc(size, sizeof(uv_buf_t)); + int j = 0; for (int i = 0; i < size; i++) { SCliMsg* pCliMsg = transQueueGet(&pConn->cliMsgs, i); - + if (pCliMsg->sent == 1) { + continue; + } STransConnCtx* pCtx = pCliMsg->ctx; + pConn->seq++; STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg); if (pMsg->pCont == 0) { @@ -1112,7 +1183,8 @@ void cliSendBatch_shareConn(SCliConn* pConn) { pMsg->contLen = 0; } - int msgLen = transMsgLenFromCont(pMsg->contLen); + int msgLen = transMsgLenFromCont(pMsg->contLen); + STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); if (pHead->comp == 0) { @@ -1129,6 +1201,7 @@ void cliSendBatch_shareConn(SCliConn* pConn) { pHead->compatibilityVer = htonl(pTransInst->compatibilityVer); } pHead->timestamp = taosHton64(taosGetTimestampUs()); + pHead->seqNum = pConn->seq; if (pHead->comp == 0) { if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) { @@ -1138,14 +1211,17 @@ void cliSendBatch_shareConn(SCliConn* pConn) { } else { msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen)); } - wb[i++] = uv_buf_init((char*)pHead, msgLen); + wb[j++] = uv_buf_init((char*)pHead, msgLen); totalLen += msgLen; + + pCliMsg->sent = 1; + pCliMsg->seqNum = pHead->seqNum; } uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); req->data = pConn; tDebug("%s conn %p start to send batch msg, batch size:%d, len:%d", CONN_GET_INST_LABEL(pConn), pConn, size, totalLen); - uv_write(req, (uv_stream_t*)pConn->stream, wb, size, cliSendBatch_shareConnCb); + uv_write(req, (uv_stream_t*)pConn->stream, wb, j, cliSendBatch_shareConnCb); taosMemoryFree(wb); } void cliSendBatch(SCliConn* pConn) { @@ -1422,41 +1498,30 @@ static void cliSendBatchCb(uv_write_t* req, int status) { cliDestroyBatch(p); taosMemoryFree(req); } -static void cliHandleFastFail(SCliConn* pConn, int status) { + +static void cliHandleFastFail_resp(SCliConn* pConn, int status) { SCliThrd* pThrd = pConn->hostThrd; STrans* pTransInst = pThrd->pTransInst; + SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0); + STraceId* trace = &pMsg->msg.info.traceId; + tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn), + TMSG_INFO(pMsg->msg.msgType), pConn, pConn->dstAddr, uv_strerror(status)); +} + +static void cliHandleFastFail_noresp(SCliConn* pConn, int status) { + tError("%s batch msg failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn), pConn, + pConn->dstAddr, uv_strerror(status)); + cliDestroyBatch(pConn->pBatch); + pConn->pBatch = NULL; +} +static void cliHandleFastFail(SCliConn* pConn, int status) { if (status == -1) status = UV_EADDRNOTAVAIL; if (pConn->pBatch == NULL) { - SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0); - - STraceId* trace = &pMsg->msg.info.traceId; - tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn), - TMSG_INFO(pMsg->msg.msgType), pConn, pConn->dstAddr, uv_strerror(status)); - - if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) && - (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) { - SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr)); - int64_t cTimestamp = taosGetTimestampMs(); - if (item != NULL) { - int32_t elapse = cTimestamp - item->timestamp; - if (elapse >= 0 && elapse <= pTransInst->failFastInterval) { - item->count++; - } else { - item->count = 1; - item->timestamp = cTimestamp; - } - } else { - SFailFastItem item = {.count = 1, .timestamp = cTimestamp}; - taosHashPut(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr), &item, sizeof(SFailFastItem)); - } - } + cliHandleFastFail_resp(pConn, status); } else { - tError("%s batch msg failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn), - pConn, pConn->dstAddr, uv_strerror(status)); - cliDestroyBatch(pConn->pBatch); - pConn->pBatch = NULL; + cliHandleFastFail_noresp(pConn, status); } cliHandleExcept(pConn); } @@ -1616,9 +1681,8 @@ FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) { FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) { if (code != 0) return false; - // if (pCtx->retryCnt == 0) return false; - if (transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) return false; - return true; + + return transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet) ? false : true; } FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) { if (pMsg == NULL) return -1; @@ -1686,11 +1750,12 @@ static void doFreeTimeoutMsg(void* param) { SCliMsg* pMsg = arg->param1; SCliThrd* pThrd = arg->param2; STrans* pTransInst = pThrd->pTransInst; - int32_t code = TSDB_CODE_RPC_MAX_SESSIONS; + QUEUE_REMOVE(&pMsg->q); STraceId* trace = &pMsg->msg.info.traceId; tGTrace("%s msg %s cannot get available conn after timeout", pTransInst->label, TMSG_INFO(pMsg->msg.msgType)); - doNotifyApp(pMsg, pThrd, code); + doNotifyApp(pMsg, pThrd, TSDB_CODE_RPC_MAX_SESSIONS); + taosMemoryFree(arg); } @@ -1718,6 +1783,19 @@ static void addConnToHeapCache(SHashObj* pConnHeapCacahe, char* key, SCliConn* p } transHeapInsert(p, pConn); } +static void delConnFromHeapCache(SHashObj* pConnHeapCache, char* key, SCliConn* pConn) { + size_t klen = strlen(key); + + SHeap* p = taosHashGet(pConnHeapCache, key, klen); + if (p == NULL) { + tDebug("failed to get heap cache for key:%s, no need to del", key); + return; + } + int ret = transHeapDelete(p, pConn); + if (ret != 0) { + tDebug("failed to delete conn %p from heap cache, no need to del", pConn); + } +} void cliHandleReq__shareConn(SCliMsg* pMsg, SCliThrd* pThrd) { STraceId* trace = &pMsg->msg.info.traceId; @@ -1742,6 +1820,7 @@ void cliHandleReq__shareConn(SCliMsg* pMsg, SCliThrd* pThrd) { } pConn = cliCreateConn(pThrd); + pConn->dstAddr = taosStrdup(addr); addConnToHeapCache(pThrd->connHeapCache, addr, pConn); return cliDoConn(pThrd, pConn, EPSET_GET_INUSE_IP(&pMsg->ctx->epSet), EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet)); @@ -1790,15 +1869,13 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { cliSend(conn); } else { conn = cliCreateConn(pThrd); + conn->dstAddr = taosStrdup(addr); - int64_t refId = (int64_t)pMsg->msg.info.handle; - if (refId != 0) specifyConnRef(conn, true, refId); + specifyConnRef(conn, true, (int64_t)pMsg->msg.info.handle); transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx); transQueuePush(&conn->cliMsgs, pMsg); - conn->dstAddr = taosStrdup(addr); - uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn); if (ipaddr == 0xffffffff) { cliResetConnTimer(conn); @@ -2234,7 +2311,6 @@ static SCliThrd* createThrdObj(void* trans) { pThrd->destroyAhandleFp = pTransInst->destroyFp; pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pThrd->connHeapCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); @@ -2267,7 +2343,6 @@ static void destroyThrdObj(SCliThrd* pThrd) { taosMemoryFree(pThrd->prepare); taosMemoryFree(pThrd->loop); taosHashCleanup(pThrd->fqdn2ipCache); - taosHashCleanup(pThrd->failFastCache); void** pIter = taosHashIterate(pThrd->batchCache, NULL); while (pIter != NULL) { diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index fff13e7ebb..edc2d2b899 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -326,10 +326,6 @@ void transCtxMerge(STransCtx* dst, STransCtx* src) { STransCtxVal* sVal = (STransCtxVal*)iter; key = taosHashGetKey(sVal, &klen); - // STransCtxVal* dVal = taosHashGet(dst->args, key, klen); - // if (dVal) { - // dst->freeFunc(dVal->val); - // } taosHashPut(dst->args, key, klen, sVal, sizeof(*sVal)); iter = taosHashIterate(src->args, iter); } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index d47968eeb8..42189de5d4 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -336,6 +336,59 @@ void uvWhiteListSetConnVer(SIpWhiteListTab* pWhite, SSvrConn* pConn) { pConn->whiteListVer = pWhite->ver; } +static void uvPerfLog(SSvrConn* pConn, STransMsgHead* pHead, STransMsg* pTransMsg) { + STrans* pTransInst = pConn->pTransInst; + STraceId* trace = &pHead->traceId; + + int64_t cost = taosGetTimestampUs() - taosNtoh64(pHead->timestamp); + static int64_t EXCEPTION_LIMIT_US = 100 * 1000; + + if (pConn->status == ConnNormal && pHead->noResp == 0) { + transRefSrvHandle(pConn); + if (cost >= EXCEPTION_LIMIT_US) { + tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception", + transLabel(pTransInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, + (int)cost); + } else { + tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus", transLabel(pTransInst), pConn, + TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, (int)cost); + } + } else { + if (cost >= EXCEPTION_LIMIT_US) { + tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, recv exception", + transLabel(pTransInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, + pHead->noResp, pTransMsg->code, (int)(cost)); + } else { + tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus", + transLabel(pTransInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, + pHead->noResp, pTransMsg->code, (int)(cost)); + } + } + tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pTransInst), pTransMsg->info.handle, + pConn, pConn->refId); +} + +static int8_t uvValidConn(SSvrConn* pConn) { + STrans* pTransInst = pConn->pTransInst; + SWorkThrd* pThrd = pConn->hostThrd; + int8_t forbiddenIp = 0; + if (pThrd->enableIpWhiteList) { + forbiddenIp = !uvWhiteListCheckConn(pThrd->pWhiteList, pConn) ? 1 : 0; + if (forbiddenIp == 0) { + uvWhiteListSetConnVer(pThrd->pWhiteList, pConn); + } + } + return forbiddenIp; +} +static void uvMaySetConnAcquired(SSvrConn* pConn, STransMsgHead* pHead) { + if (pConn->status == ConnNormal) { + if (pHead->persist == 1) { + pConn->status = ConnAcquire; + transRefSrvHandle(pConn); + tDebug("conn %p acquired by server app", pConn); + } + } +} static bool uvHandleReq(SSvrConn* pConn) { STrans* pTransInst = pConn->pTransInst; SWorkThrd* pThrd = pConn->hostThrd; @@ -358,14 +411,6 @@ static bool uvHandleReq(SSvrConn* pConn) { pConn->inType = pHead->msgType; memcpy(pConn->user, pHead->user, strlen(pHead->user)); - int8_t forbiddenIp = 0; - if (pThrd->enableIpWhiteList) { - forbiddenIp = !uvWhiteListCheckConn(pThrd->pWhiteList, pConn) ? 1 : 0; - if (forbiddenIp == 0) { - uvWhiteListSetConnVer(pThrd->pWhiteList, pConn); - } - } - if (uvRecvReleaseReq(pConn, pHead)) { return true; } @@ -384,38 +429,7 @@ static bool uvHandleReq(SSvrConn* pConn) { transMsg.msgType = pHead->msgType; transMsg.code = pHead->code; - if (pConn->status == ConnNormal) { - if (pHead->persist == 1) { - pConn->status = ConnAcquire; - transRefSrvHandle(pConn); - tDebug("conn %p acquired by server app", pConn); - } - } - STraceId* trace = &pHead->traceId; - - int64_t cost = taosGetTimestampUs() - taosNtoh64(pHead->timestamp); - static int64_t EXCEPTION_LIMIT_US = 100 * 1000; - - if (pConn->status == ConnNormal && pHead->noResp == 0) { - transRefSrvHandle(pConn); - if (cost >= EXCEPTION_LIMIT_US) { - tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception", - transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, (int)cost); - } else { - tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus", transLabel(pTransInst), pConn, - TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, (int)cost); - } - } else { - if (cost >= EXCEPTION_LIMIT_US) { - tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, recv exception", - transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp, - transMsg.code, (int)(cost)); - } else { - tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus", - transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp, - transMsg.code, (int)(cost)); - } - } + uvMaySetConnAcquired(pConn, pHead); // pHead->noResp = 1, // 1. server application should not send resp on handle @@ -423,21 +437,14 @@ static bool uvHandleReq(SSvrConn* pConn) { // 3. not mixed with persist transMsg.info.ahandle = (void*)pHead->ahandle; transMsg.info.handle = (void*)transAcquireExHandle(transGetRefMgt(), pConn->refId); - transMsg.info.refId = pConn->refId; + ASSERTS(transMsg.info.handle != NULL, "trans-svr failed to alloc handle to msg"); + + transMsg.info.refId = pHead->noResp == 1 ? -1 : pConn->refId; transMsg.info.traceId = pHead->traceId; transMsg.info.cliVer = htonl(pHead->compatibilityVer); - transMsg.info.forbiddenIp = forbiddenIp; + transMsg.info.forbiddenIp = uvValidConn(pConn); - tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pTransInst), transMsg.info.handle, pConn, - pConn->refId); - ASSERTS(transMsg.info.handle != NULL, "trans-svr failed to alloc handle to msg"); - if (transMsg.info.handle == NULL) { - return false; - } - - if (pHead->noResp == 1) { - transMsg.info.refId = -1; - } + uvPerfLog(pConn, pHead, &transMsg); // set up conn info SRpcConnInfo* pConnInfo = &(transMsg.info.conn);