From 0b1e54568c5651a8a6d4f84fa256dde217a304b0 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 9 Sep 2024 18:42:36 +0800 Subject: [PATCH] opt transport --- source/common/src/tglobal.c | 58 ++++----- source/libs/transport/inc/transComm.h | 7 +- source/libs/transport/src/transCli.c | 168 +++++++++++++++++--------- source/libs/transport/src/transComm.c | 31 +++-- source/libs/transport/src/transSvr.c | 65 +++++++--- 5 files changed, 210 insertions(+), 119 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index cf0a4725c1..69171892cb 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -177,12 +177,12 @@ int32_t tsRedirectFactor = 2; int32_t tsRedirectMaxPeriod = 1000; int32_t tsMaxRetryWaitTime = 10000; bool tsUseAdapter = false; -int32_t tsMetaCacheMaxSize = -1; // MB -int32_t tsSlowLogThreshold = 10; // seconds -int32_t tsSlowLogThresholdTest = INT32_MAX; // seconds -char tsSlowLogExceptDb[TSDB_DB_NAME_LEN] = ""; // seconds +int32_t tsMetaCacheMaxSize = -1; // MB +int32_t tsSlowLogThreshold = 10; // seconds +int32_t tsSlowLogThresholdTest = INT32_MAX; // seconds +char tsSlowLogExceptDb[TSDB_DB_NAME_LEN] = ""; // seconds int32_t tsSlowLogScope = SLOW_LOG_TYPE_QUERY; -char* tsSlowLogScopeString = "query"; +char *tsSlowLogScopeString = "query"; int32_t tsSlowLogMaxLen = 4096; int32_t tsTimeSeriesThreshold = 50; bool tsMultiResultFunctionStarReturnTags = false; @@ -320,7 +320,6 @@ int32_t tsMaxTsmaNum = 3; int32_t tsMaxTsmaCalcDelay = 600; int64_t tsmaDataDeleteMark = 1000 * 60 * 60 * 24; // in ms, default to 1d - #define TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, pName) \ if ((pItem = cfgGetItem(pCfg, pName)) == NULL) { \ TAOS_RETURN(TSDB_CODE_CFG_NOT_FOUND); \ @@ -359,7 +358,7 @@ static int32_t taosSplitS3Cfg(SConfig *pCfg, const char *name, char gVarible[TSD TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, name); char *strDup = NULL; - if ((strDup = taosStrdup(pItem->str))== NULL){ + if ((strDup = taosStrdup(pItem->str)) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } @@ -448,7 +447,9 @@ int32_t taosSetS3Cfg(SConfig *pCfg) { TAOS_RETURN(TSDB_CODE_SUCCESS); } -struct SConfig *taosGetCfg() { return tsCfg; } +struct SConfig *taosGetCfg() { + return tsCfg; +} static int32_t taosLoadCfg(SConfig *pCfg, const char **envCmd, const char *inputCfgDir, const char *envFile, char *apolloUrl) { @@ -596,10 +597,11 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { TAOS_CHECK_RETURN( cfgAddInt32(pCfg, "metaCacheMaxSize", tsMetaCacheMaxSize, -1, INT32_MAX, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "randErrorChance", tsRandErrChance, 0, 10000, CFG_SCOPE_BOTH, CFG_DYN_BOTH)); - TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "randErrorDivisor", tsRandErrDivisor, 1, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_BOTH)); + TAOS_CHECK_RETURN( + cfgAddInt64(pCfg, "randErrorDivisor", tsRandErrDivisor, 1, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_BOTH)); TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "randErrorScope", tsRandErrScope, 0, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_BOTH)); - tsNumOfRpcThreads = tsNumOfCores / 2; + // tsNumOfRpcThreads = tsNumOfCores / 2; tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfRpcThreads", tsNumOfRpcThreads, 1, 1024, CFG_SCOPE_BOTH, CFG_DYN_NONE)); @@ -862,7 +864,7 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { pItem = cfgGetItem(pCfg, "numOfRpcThreads"); if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { - tsNumOfRpcThreads = numOfCores / 2; + // tsNumOfRpcThreads = numOfCores / 2; tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS); pItem->i32 = tsNumOfRpcThreads; pItem->stype = stype; @@ -1072,9 +1074,9 @@ int32_t taosSetSlowLogScope(char *pScopeStr, int32_t *pScope) { int32_t slowScope = 0; - char* scope = NULL; - char *tmp = NULL; - while((scope = strsep(&pScopeStr, "|")) != NULL){ + char *scope = NULL; + char *tmp = NULL; + while ((scope = strsep(&pScopeStr, "|")) != NULL) { taosMemoryFreeClear(tmp); tmp = taosStrdup(scope); (void)strtrim(tmp); @@ -1128,13 +1130,13 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { (void)snprintf(defaultFirstEp, TSDB_EP_LEN, "%s:%u", tsLocalFqdn, tsServerPort); TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "firstEp"); - SEp firstEp = {0}; + SEp firstEp = {0}; TAOS_CHECK_RETURN(taosGetFqdnPortFromEp(strlen(pItem->str) == 0 ? defaultFirstEp : pItem->str, &firstEp)); (void)snprintf(tsFirst, sizeof(tsFirst), "%s:%u", firstEp.fqdn, firstEp.port); TAOS_CHECK_RETURN(cfgSetItem(pCfg, "firstEp", tsFirst, pItem->stype, true)); TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "secondEp"); - SEp secondEp = {0}; + SEp secondEp = {0}; TAOS_CHECK_RETURN(taosGetFqdnPortFromEp(strlen(pItem->str) == 0 ? defaultFirstEp : pItem->str, &secondEp)); (void)snprintf(tsSecond, sizeof(tsSecond), "%s:%u", secondEp.fqdn, secondEp.port); TAOS_CHECK_RETURN(cfgSetItem(pCfg, "secondEp", tsSecond, pItem->stype, true)); @@ -1622,8 +1624,8 @@ static int32_t taosSetAllDebugFlag(SConfig *pCfg, int32_t flag); int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd, const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc) { - int32_t code = TSDB_CODE_SUCCESS; - int32_t lino = 0; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SConfig *pCfg = NULL; if (tsCfg == NULL) { @@ -1691,7 +1693,7 @@ int32_t taosReadDataFolder(const char *cfgDir, const char **envCmd, const char * TAOS_CHECK_RETURN(cfgInit(&pCfg)); TAOS_CHECK_GOTO(cfgAddDir(pCfg, "dataDir", tsDataDir, CFG_SCOPE_SERVER, CFG_DYN_NONE), NULL, _exit); - TAOS_CHECK_GOTO(cfgAddInt32(pCfg, "dDebugFlag", dDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER) ,NULL, _exit); + TAOS_CHECK_GOTO(cfgAddInt32(pCfg, "dDebugFlag", dDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER), NULL, _exit); if ((code = taosLoadCfg(pCfg, envCmd, cfgDir, envFile, apolloUrl)) != 0) { printf("failed to load cfg since %s\n", tstrerror(code)); @@ -1720,7 +1722,7 @@ _exit: static int32_t taosCheckGlobalCfg() { uint32_t ipv4 = 0; - int32_t code = taosGetIpv4FromFqdn(tsLocalFqdn, &ipv4); + int32_t code = taosGetIpv4FromFqdn(tsLocalFqdn, &ipv4); if (code) { uError("failed to get ip from fqdn:%s since %s, dnode can not be initialized", tsLocalFqdn, tstrerror(code)); TAOS_RETURN(TSDB_CODE_RPC_FQDN_ERROR); @@ -1825,7 +1827,7 @@ typedef struct { static int32_t taosCfgSetOption(OptionNameAndVar *pOptions, int32_t optionSize, SConfigItem *pItem, bool isDebugflag) { int32_t code = TSDB_CODE_CFG_NOT_FOUND; - char *name = pItem->name; + char *name = pItem->name; for (int32_t d = 0; d < optionSize; ++d) { const char *optName = pOptions[d].optionName; if (strcasecmp(name, optName) != 0) continue; @@ -2012,8 +2014,8 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) { } case 'f': { if (strcasecmp("fqdn", name) == 0) { - SConfigItem* pFqdnItem = cfgGetItem(pCfg, "fqdn"); - SConfigItem* pServerPortItem = cfgGetItem(pCfg, "serverPort"); + SConfigItem *pFqdnItem = cfgGetItem(pCfg, "fqdn"); + SConfigItem *pServerPortItem = cfgGetItem(pCfg, "serverPort"); SConfigItem *pFirstEpItem = cfgGetItem(pCfg, "firstEp"); if (pFqdnItem == NULL || pServerPortItem == NULL || pFirstEpItem == NULL) { uError("failed to get fqdn or serverPort or firstEp from cfg"); @@ -2028,7 +2030,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) { char defaultFirstEp[TSDB_EP_LEN] = {0}; (void)snprintf(defaultFirstEp, TSDB_EP_LEN, "%s:%u", tsLocalFqdn, tsServerPort); - SEp firstEp = {0}; + SEp firstEp = {0}; TAOS_CHECK_GOTO( taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp), &lino, _out); @@ -2068,8 +2070,8 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) { } case 'l': { if (strcasecmp("locale", name) == 0) { - SConfigItem* pLocaleItem = cfgGetItem(pCfg, "locale"); - SConfigItem* pCharsetItem = cfgGetItem(pCfg, "charset"); + SConfigItem *pLocaleItem = cfgGetItem(pCfg, "locale"); + SConfigItem *pCharsetItem = cfgGetItem(pCfg, "charset"); if (pLocaleItem == NULL || pCharsetItem == NULL) { uError("failed to get locale or charset from cfg"); code = TSDB_CODE_CFG_NOT_FOUND; @@ -2147,7 +2149,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) { char defaultFirstEp[TSDB_EP_LEN] = {0}; (void)snprintf(defaultFirstEp, TSDB_EP_LEN, "%s:%u", tsLocalFqdn, tsServerPort); - SEp firstEp = {0}; + SEp firstEp = {0}; TAOS_CHECK_GOTO( taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp), &lino, _out); @@ -2275,7 +2277,7 @@ int32_t taosSetGlobalDebugFlag(int32_t flag) { return taosSetAllDebugFlag(tsCfg, // NOTE: set all command does not change the tmrDebugFlag static int32_t taosSetAllDebugFlag(SConfig *pCfg, int32_t flag) { if (flag < 0) TAOS_RETURN(TSDB_CODE_INVALID_PARA); - if (flag == 0) TAOS_RETURN(TSDB_CODE_SUCCESS); // just ignore + if (flag == 0) TAOS_RETURN(TSDB_CODE_SUCCESS); // just ignore SArray *noNeedToSetVars = NULL; SConfigItem *pItem = NULL; diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 078ede9d73..4c5b1511b2 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -351,12 +351,13 @@ void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType); // request list typedef struct STransReq { - queue q; - uv_write_t wreq; + queue q; + queue node; + void* conn; } STransReq; void transReqQueueInit(queue* q); -void* transReqQueuePush(queue* q); +void* transReqQueuePush(queue* q, STransReq* req); void* transReqQueueRemove(void* arg); void transReqQueueClear(queue* q); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 994469ba86..23ed1e78a1 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -154,7 +154,7 @@ typedef struct SCliThrd { int32_t (*notifyCb)(void* arg, SCliReq* pReq, STransMsg* pResp); int32_t (*notifyExceptCb)(void* arg, SCliReq* pReq, STransMsg* pResp); - SHashObj* pIdConnTable; + SHashObj* pIdConnTable; // } SCliThrd; typedef struct SCliObj { @@ -209,6 +209,18 @@ static void cliSendBatchCb(uv_write_t* req, int status); SCliBatch* cliGetHeadFromList(SCliBatchList* pList); +void destroyCliConnQTable(SCliConn* conn) { + void* pIter = taosHashIterate(conn->pQTable, NULL); + while (pIter != NULL) { + int64_t* qid = taosHashGetKey(pIter, NULL); + STransCtx* ctx = pIter; + transCtxCleanup(ctx); + pIter = taosHashIterate(conn->pQTable, pIter); + } + taosHashCleanup(conn->pQTable); + conn->pQTable = NULL; +} + // static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead); static int32_t allocConnRef(SCliConn* conn, bool update); @@ -266,7 +278,7 @@ static FORCE_INLINE int cliRBChoseIdx(STrans* pInst); static FORCE_INLINE void destroyReqCtx(SReqCtx* ctx); int32_t cliMayUpdateState(SCliThrd* pThrd, SCliReq* pReq, SCliConn* pConn); -int32_t cliMayGetHandleState(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn); +int32_t cliMayGetStateByQid(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn); static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key); static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn); @@ -503,7 +515,9 @@ int32_t cliBuildRespFromCont(SCliReq* pReq, STransMsg* pResp, STransMsgHead* pHe pResp->pCont = transContFromHead((char*)pHead); pResp->code = pHead->code; pResp->msgType = pHead->msgType; - pResp->info.ahandle = pReq->ctx ? pReq->ctx->ahandle : NULL; + if (pResp->info.ahandle == 0) { + pResp->info.ahandle = (pReq && pReq->ctx) ? pReq->ctx->ahandle : NULL; + } pResp->info.traceId = pHead->traceId; pResp->info.hasEpSet = pHead->hasEpSet; pResp->info.cliVer = htonl(pHead->compatibilityVer); @@ -513,11 +527,17 @@ int32_t cliBuildRespFromCont(SCliReq* pReq, STransMsg* pResp, STransMsgHead* pHe pResp->info.handle = (void*)qid; return 0; } -int32_t cliConnMayHandleReleasReq(SCliConn* conn, STransMsgHead* pHead) { +int32_t cliConnMayHandleState_releaseReq(SCliConn* conn, STransMsgHead* pHead) { int32_t code = 0; SCliThrd* pThrd = conn->hostThrd; if (pHead->msgType == TDMT_SCH_TASK_RELEASE || pHead->msgType == TDMT_SCH_TASK_RELEASE + 1) { - int64_t qId = taosHton64(pHead->qid); + int64_t qId = taosHton64(pHead->qid); + STraceId* trace = &pHead->traceId; + tGDebug("%s conn %p receive release req, qid:%ld", CONN_GET_INST_LABEL(conn), conn, qId); + + STransCtx* p = taosHashGet(conn->pQTable, &qId, sizeof(qId)); + transCtxCleanup(p); + code = taosHashRemove(conn->pQTable, &qId, sizeof(qId)); if (code != 0) { tDebug("%s conn %p failed to release req %ld from conn", CONN_GET_INST_LABEL(conn), conn, qId); @@ -527,21 +547,41 @@ int32_t cliConnMayHandleReleasReq(SCliConn* conn, STransMsgHead* pHead) { if (code != 0) { tDebug("%s conn %p failed to release req %ld from thrd ", CONN_GET_INST_LABEL(conn), conn, qId); } - STraceId* trace = &pHead->traceId; - tGDebug("%s conn %p receive release req, qid:%ld", CONN_GET_INST_LABEL(conn), conn, qId); + tDebug("%s %p req size:%d", CONN_GET_INST_LABEL(conn), conn, transQueueSize(&conn->reqs)); for (int32_t i = 0; i < transQueueSize(&conn->reqs); i++) { - SCliReq* pReqs = transQueueGet(&conn->reqs, i); - if (pReqs->msg.info.qId == qId) { + SCliReq* pReq = transQueueGet(&conn->reqs, i); + if (pReq->msg.info.qId == qId) { transQueueRm(&conn->reqs, i); - destroyReq(pReqs); + + if (pReq->msg.info.notFreeAhandle == 0 && pThrd != NULL && pThrd->destroyAhandleFp != NULL) { + pThrd->destroyAhandleFp(pReq->ctx->ahandle); + } + destroyReq(pReq); i--; } } + taosMemoryFree(pHead); return 1; } return 0; } +int32_t cliMayHandleState(SCliConn* conn, STransMsgHead* pHead, STransMsg* pResp) { + int32_t code = 0; + int64_t qId = taosHton64(pHead->qid); + if (qId == 0) { + return 0; + } + + STransCtx* pCtx = taosHashGet(conn->pQTable, &qId, sizeof(qId)); + if (pCtx == 0) { + return TSDB_CODE_RPC_NO_STATE; + } + pResp->info.ahandle = transCtxDumpVal(pCtx, pHead->msgType); + tDebug("%s conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(conn), conn, pResp->info.ahandle, + TMSG_INFO(pHead->msgType)); + return 0; +} void cliHandleResp2(SCliConn* conn) { int32_t code = 0; SCliThrd* pThrd = conn->hostThrd; @@ -565,33 +605,38 @@ void cliHandleResp2(SCliConn* conn) { // TODO: notify cb return; } - - if (cliConnMayHandleReleasReq(conn, pHead)) { - if (cliMayRecycleConn(conn)) { - return; - } - - return; - } int64_t qId = taosHton64(pHead->qid); - pHead->code = htonl(pHead->code); pHead->msgLen = htonl(pHead->msgLen); + int32_t seq = htonl(pHead->seqNum); + STransMsg resp = {0}; - int32_t seq = htonl(pHead->seqNum); - - code = cliGetReqBySeq(conn, seq, &pReq); - if (code != 0) { - tDebug("%s conn %p recv unexpected packet, seqNum:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn, seq, - tstrerror(code)); - // TODO: notify cb + if (cliConnMayHandleState_releaseReq(conn, pHead)) { if (cliMayRecycleConn(conn)) { return; } return; } + code = cliGetReqBySeq(conn, seq, &pReq); + if (code == TSDB_CODE_OUT_OF_RANGE) { + code = cliMayHandleState(conn, pHead, &resp); + if (code == 0) { + code = cliBuildRespFromCont(NULL, &resp, pHead); + code = cliNotifyCb(conn, NULL, &resp); + return; + } + + if (code != 0) { + tDebug("%s conn %p recv unexpected packet, seqNum:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn, seq, + tstrerror(code)); + // TODO: notify cb + if (cliMayRecycleConn(conn)) { + return; + } + return; + } + } - STransMsg resp = {0}; code = cliBuildRespFromCont(pReq, &resp, pHead); STraceId* trace = &resp.info.traceId; if (code != 0) { @@ -1195,8 +1240,9 @@ static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn) TAOS_CHECK_GOTO(cliCreateConn(pThrd, &pConn, ip, port), NULL, _exception); code = cliMayUpdateState(pThrd, pReq, pConn); - transQueuePush(&pConn->reqs, pReq); + addConnToHeapCache(pThrd->connHeapCache, pConn); + transQueuePush(&pConn->reqs, pReq); return cliDoConn(pThrd, pConn); _exception: // free conn @@ -1264,6 +1310,8 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int _failed: if (conn) { taosMemoryFree(conn->stream); + + destroyCliConnQTable(conn); taosHashCleanup(conn->pQTable); (void)transDestroyBuffer(&conn->readBuf); transQueueDestroy(&conn->reqs); @@ -1339,10 +1387,10 @@ static void cliDestroy(uv_handle_t* handle) { } cliDestroyConnMsgs(conn, true); + destroyCliConnQTable(conn); tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); transReqQueueClear(&conn->wreqQueue); - taosHashCleanup(conn->pQTable); (void)transDestroyBuffer(&conn->readBuf); taosMemoryFree(conn); @@ -1673,7 +1721,7 @@ void cliConnCb(uv_connect_t* req, int status) { cliConnSetSockInfo(pConn); - addConnToHeapCache(pThrd->connHeapCache, pConn); + // addConnToHeapCache(pThrd->connHeapCache, pConn); tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn); (void)cliSend2(pConn); @@ -1941,21 +1989,22 @@ static void doFreeTimeoutMsg(void* param) { taosMemoryFree(arg); } -int32_t cliConnHandleQueryById(SCliReq* pReq) { - if (pReq->msg.info.handle == 0) { - return 0; +int32_t clConnMayUpdateReqCtx(SCliConn* pConn, SCliReq* pReq) { + int32_t code = 0; + int64_t qid = pReq->msg.info.qId; + SReqCtx* pCtx = pReq->ctx; + + STransCtx* pUserCtx = taosHashGet(pConn->pQTable, &qid, sizeof(qid)); + if (pUserCtx == NULL) { + code = taosHashPut(pConn->pQTable, &qid, sizeof(qid), &pCtx->userCtx, sizeof(pCtx->userCtx)); + tDebug("succ to add conn %p of statue ctx, qid:%ld", pConn, qid); } else { - int64_t queryId = (int64_t)pReq->msg.info.handle; - SExHandle* exh = transAcquireExHandle(transGetRefMgt(), queryId); - if (exh->inited == 1) { - } else { - } - transReleaseExHandle(transGetRefMgt(), queryId); + transCtxMerge(pUserCtx, &pCtx->userCtx); + tDebug("succ to update conn %p of statue ctx, qid:%ld", pConn, qid); } return 0; } - -int32_t cliMayGetHandleState(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn) { +int32_t cliMayGetStateByQid(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn) { int32_t code = 0; int64_t qid = pReq->msg.info.qId; if (qid == 0) { @@ -1965,12 +2014,13 @@ int32_t cliMayGetHandleState(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn) { SReqState* pState = taosHashGet(pThrd->pIdConnTable, &qid, sizeof(qid)); if (pState == NULL) { tDebug("failed to get statue, qid:%ld", qid); + // ASSERT(0); return TSDB_CODE_RPC_ASYNC_IN_PROCESS; } else { *pConn = pState->conn; tDebug("succ to get conn of statue, qid:%ld", qid); } - return code; + return 0; } int32_t cliMayUpdateState(SCliThrd* pThrd, SCliReq* pReq, SCliConn* pConn) { @@ -1984,6 +2034,7 @@ int32_t cliMayUpdateState(SCliThrd* pThrd, SCliReq* pReq, SCliConn* pConn) { tDebug("succ to get conn %p of statue, qid:%ld", pConn, qid); ASSERT(0); } + SReqState state = {.conn = pConn, .arg = NULL}; code = taosHashPut(pThrd->pIdConnTable, &qid, sizeof(qid), &state, sizeof(state)); if (code != 0) { @@ -1992,13 +2043,7 @@ int32_t cliMayUpdateState(SCliThrd* pThrd, SCliReq* pReq, SCliConn* pConn) { tDebug("succ to add conn %p of statue, qid:%ld (1)", pConn, qid); } - int32_t dummy = 0; - code = taosHashPut(pConn->pQTable, &qid, sizeof(qid), &dummy, sizeof(dummy)); - if (code != 0) { - tDebug("failed to add conn %p of statue, qid:%ld", pConn, qid); - } else { - tDebug("succ to add conn %p of statue, qid:%ld(2)", pConn, qid); - } + (void)clConnMayUpdateReqCtx(pConn, pReq); return code; } void cliHandleReq__noShareConn(SCliThrd* pThrd, SCliReq* pReq) { @@ -2009,7 +2054,10 @@ void cliHandleReq__noShareConn(SCliThrd* pThrd, SCliReq* pReq) { STrans* pInst = pThrd->pInst; SCliConn* pConn = NULL; - code = cliMayGetHandleState(pThrd, pReq, &pConn); + code = cliMayGetStateByQid(pThrd, pReq, &pConn); + if (code == 0) { + (void)clConnMayUpdateReqCtx(pConn, pReq); + } if (code == TSDB_CODE_RPC_NO_STATE || code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) { char addr[TSDB_FQDN_LEN + 64] = {0}; @@ -2933,9 +2981,13 @@ void cliMayResetRespCode(SCliReq* pReq, STransMsg* pResp) { int32_t cliNotifyImplCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) { SCliThrd* pThrd = pConn->hostThrd; STrans* pInst = pThrd->pInst; - SReqCtx* pCtx = pReq->ctx; + SReqCtx* pCtx = pReq ? pReq->ctx : NULL; STraceId* trace = &pResp->info.traceId; + if (pCtx == NULL) { + pInst->cfp(pInst->parent, pResp, NULL); + return 0; + } if (pCtx->pSem || pCtx->syncMsgRef != 0) { tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn); if (pCtx->pSem) { @@ -2978,14 +3030,16 @@ int32_t cliNotifyCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) { SCliThrd* pThrd = pConn->hostThrd; STrans* pInst = pThrd->pInst; - if (cliMayRetry(pConn, pReq, pResp)) { - return TSDB_CODE_RPC_ASYNC_IN_PROCESS; - } + if (pReq != NULL) { + if (cliMayRetry(pConn, pReq, pResp)) { + return TSDB_CODE_RPC_ASYNC_IN_PROCESS; + } - cliMayResetRespCode(pReq, pResp); + cliMayResetRespCode(pReq, pResp); - if (cliTryUpdateEpset(pReq, pResp)) { - cliPerfLog_epset(pConn, pReq); + if (cliTryUpdateEpset(pReq, pResp)) { + cliPerfLog_epset(pConn, pReq); + } } return cliNotifyImplCb(pConn, pReq, pResp); } diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 60058bbbd2..326fd434ec 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -15,7 +15,7 @@ #include "transComm.h" -#define BUFFER_CAP 4096 +#define BUFFER_CAP 16 * 4096 static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; @@ -341,7 +341,7 @@ int transAsyncSend(SAsyncPool* pool, queue* q) { void transCtxInit(STransCtx* ctx) { // init transCtx - ctx->args = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UINT), true, HASH_NO_LOCK); + ctx->args = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); } void transCtxCleanup(STransCtx* ctx) { if (ctx->args == NULL) { @@ -350,6 +350,8 @@ void transCtxCleanup(STransCtx* ctx) { STransCtxVal* iter = taosHashIterate(ctx->args, NULL); while (iter) { + int32_t* type = taosHashGetKey(iter, NULL); + tDebug("free msg type %s dump func", TMSG_INFO(*type)); ctx->freeFunc(iter->val); iter = taosHashIterate(ctx->args, iter); } @@ -409,27 +411,22 @@ void transReqQueueInit(queue* q) { // init req queue QUEUE_INIT(q); } -void* transReqQueuePush(queue* q) { - STransReq* req = taosMemoryCalloc(1, sizeof(STransReq)); - if (req == NULL) { - return NULL; - } - req->wreq.data = req; - QUEUE_PUSH(q, &req->q); - return &req->wreq; +void* transReqQueuePush(queue* q, STransReq* userReq) { + uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); + req->data = userReq; + + QUEUE_PUSH(q, &userReq->q); + return req; } void* transReqQueueRemove(void* arg) { void* ret = NULL; - uv_write_t* wreq = arg; + uv_write_t* req = arg; - STransReq* req = wreq ? wreq->data : NULL; + STransReq* userReq = req ? req->data : NULL; if (req == NULL) return NULL; - QUEUE_REMOVE(&req->q); + QUEUE_REMOVE(&userReq->q); - ret = wreq && wreq->handle ? wreq->handle->data : NULL; - taosMemoryFree(req); - - return ret; + return userReq; } void transReqQueueClear(queue* q) { while (!QUEUE_IS_EMPTY(q)) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 1bcd8e8324..2d70e3e344 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -72,6 +72,9 @@ typedef struct SSvrMsg { void* arg; FilteFunc func; int8_t sent; + + queue sendReq; + } SSvrMsg; typedef struct { @@ -618,17 +621,35 @@ void uvOnTimeoutCb(uv_timer_t* handle) { void uvOnSendCb(uv_write_t* req, int status) { STUB_RAND_NETWORK_ERR(status); - SSvrConn* conn = transReqQueueRemove(req); - if (conn == NULL) return; + + queue q; + QUEUE_INIT(&q); + + STransReq* userReq = transReqQueueRemove(req); + SSvrConn* conn = userReq->conn; + + queue* src = &userReq->node; + while (!QUEUE_IS_EMPTY(src)) { + queue* head = QUEUE_HEAD(src); + QUEUE_REMOVE(head); + QUEUE_PUSH(&q, head); + // } + } + // QUEUE_MOVE(src, &q); + + tDebug("%s conn %p send data", transLabel(conn->pInst), conn); if (status == 0) { - for (int32_t i = 0; i < transQueueSize(&conn->srvMsgs); i++) { - SSvrMsg* smsg = transQueueGet(&conn->srvMsgs, i); - if (smsg->sent == 1) { - transQueueRm(&conn->srvMsgs, i); - destroySmsg(smsg); - i--; - } + while (!QUEUE_IS_EMPTY(&q)) { + queue* head = QUEUE_HEAD(&q); + QUEUE_REMOVE(head); + + SSvrMsg* smsg = QUEUE_DATA(head, SSvrMsg, sendReq); + + STraceId* trace = &smsg->msg.info.traceId; + tGDebug("%s conn %p msg already send out, seqNum:%d, qid:%ld", transLabel(conn->pInst), conn, + smsg->msg.info.seqNum, smsg->msg.info.qId); + destroySmsg(smsg); } } else { if (!uv_is_closing((uv_handle_t*)(conn->pTcp))) { @@ -705,7 +726,7 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { wb->len = len; return 0; } -static int32_t uvBuildToSendData(SSvrConn* pConn, uv_buf_t** ppBuf, int32_t* bufNum) { +static int32_t uvBuildToSendData(SSvrConn* pConn, uv_buf_t** ppBuf, int32_t* bufNum, queue* sendReqNode) { int32_t count = 0; int32_t size = transQueueSize(&pConn->srvMsgs); @@ -713,7 +734,9 @@ static int32_t uvBuildToSendData(SSvrConn* pConn, uv_buf_t** ppBuf, int32_t* buf if (pWb == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - for (int32_t i = 0; i < size; i++) { + + tDebug("%s conn %p has %d msg to send", transLabel(pConn->pInst), pConn, size); + for (int32_t i = 0; i < transQueueSize(&pConn->srvMsgs); i++) { SSvrMsg* pMsg = transQueueGet(&pConn->srvMsgs, i); if (pMsg->sent == 1) { continue; @@ -721,8 +744,12 @@ static int32_t uvBuildToSendData(SSvrConn* pConn, uv_buf_t** ppBuf, int32_t* buf uv_buf_t wb; (void)uvPrepareSendData(pMsg, &wb); pWb[count] = wb; - pMsg->sent = 1; + + QUEUE_PUSH(sendReqNode, &pMsg->sendReq); + + transQueueRm(&pConn->srvMsgs, i); + i--; count++; } @@ -743,20 +770,30 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg) { if (pConn->broken) { return; } + queue sendReqNode; + QUEUE_INIT(&sendReqNode); uv_buf_t* pBuf = NULL; int32_t bufNum = 0; - code = uvBuildToSendData(pConn, &pBuf, &bufNum); + code = uvBuildToSendData(pConn, &pBuf, &bufNum, &sendReqNode); if (code != 0) { tError("%s conn %p failed to send data", transLabel(pConn->pInst), pConn); return; } if (bufNum == 0) { + tDebug("%s conn %p no data to send", transLabel(pConn->pInst), pConn); return; } transRefSrvHandle(pConn); - uv_write_t* req = transReqQueuePush(&pConn->wreqQueue); + + STransReq* pWreq = taosMemoryCalloc(1, sizeof(STransReq)); + pWreq->conn = pConn; + QUEUE_INIT(&pWreq->q); + + QUEUE_MOVE(&sendReqNode, &pWreq->node); + + uv_write_t* req = transReqQueuePush(&pConn->wreqQueue, pWreq); if (req == NULL) { if (!uv_is_closing((uv_handle_t*)(pConn->pTcp))) { tError("conn %p failed to write data, reason:%s", pConn, tstrerror(TSDB_CODE_OUT_OF_MEMORY));