opt transport

This commit is contained in:
yihaoDeng 2024-09-09 18:42:36 +08:00
parent 0b440aad0c
commit 0b1e54568c
5 changed files with 210 additions and 119 deletions

View File

@ -177,12 +177,12 @@ int32_t tsRedirectFactor = 2;
int32_t tsRedirectMaxPeriod = 1000;
int32_t tsMaxRetryWaitTime = 10000;
bool tsUseAdapter = false;
int32_t tsMetaCacheMaxSize = -1; // MB
int32_t tsSlowLogThreshold = 10; // seconds
int32_t tsSlowLogThresholdTest = INT32_MAX; // seconds
char tsSlowLogExceptDb[TSDB_DB_NAME_LEN] = ""; // seconds
int32_t tsMetaCacheMaxSize = -1; // MB
int32_t tsSlowLogThreshold = 10; // seconds
int32_t tsSlowLogThresholdTest = INT32_MAX; // seconds
char tsSlowLogExceptDb[TSDB_DB_NAME_LEN] = ""; // seconds
int32_t tsSlowLogScope = SLOW_LOG_TYPE_QUERY;
char* tsSlowLogScopeString = "query";
char *tsSlowLogScopeString = "query";
int32_t tsSlowLogMaxLen = 4096;
int32_t tsTimeSeriesThreshold = 50;
bool tsMultiResultFunctionStarReturnTags = false;
@ -320,7 +320,6 @@ int32_t tsMaxTsmaNum = 3;
int32_t tsMaxTsmaCalcDelay = 600;
int64_t tsmaDataDeleteMark = 1000 * 60 * 60 * 24; // in ms, default to 1d
#define TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, pName) \
if ((pItem = cfgGetItem(pCfg, pName)) == NULL) { \
TAOS_RETURN(TSDB_CODE_CFG_NOT_FOUND); \
@ -359,7 +358,7 @@ static int32_t taosSplitS3Cfg(SConfig *pCfg, const char *name, char gVarible[TSD
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, name);
char *strDup = NULL;
if ((strDup = taosStrdup(pItem->str))== NULL){
if ((strDup = taosStrdup(pItem->str)) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
@ -448,7 +447,9 @@ int32_t taosSetS3Cfg(SConfig *pCfg) {
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
struct SConfig *taosGetCfg() { return tsCfg; }
struct SConfig *taosGetCfg() {
return tsCfg;
}
static int32_t taosLoadCfg(SConfig *pCfg, const char **envCmd, const char *inputCfgDir, const char *envFile,
char *apolloUrl) {
@ -596,10 +597,11 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(
cfgAddInt32(pCfg, "metaCacheMaxSize", tsMetaCacheMaxSize, -1, INT32_MAX, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "randErrorChance", tsRandErrChance, 0, 10000, CFG_SCOPE_BOTH, CFG_DYN_BOTH));
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "randErrorDivisor", tsRandErrDivisor, 1, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_BOTH));
TAOS_CHECK_RETURN(
cfgAddInt64(pCfg, "randErrorDivisor", tsRandErrDivisor, 1, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_BOTH));
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "randErrorScope", tsRandErrScope, 0, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_BOTH));
tsNumOfRpcThreads = tsNumOfCores / 2;
// tsNumOfRpcThreads = tsNumOfCores / 2;
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS);
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfRpcThreads", tsNumOfRpcThreads, 1, 1024, CFG_SCOPE_BOTH, CFG_DYN_NONE));
@ -862,7 +864,7 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem = cfgGetItem(pCfg, "numOfRpcThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsNumOfRpcThreads = numOfCores / 2;
// tsNumOfRpcThreads = numOfCores / 2;
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS);
pItem->i32 = tsNumOfRpcThreads;
pItem->stype = stype;
@ -1072,9 +1074,9 @@ int32_t taosSetSlowLogScope(char *pScopeStr, int32_t *pScope) {
int32_t slowScope = 0;
char* scope = NULL;
char *tmp = NULL;
while((scope = strsep(&pScopeStr, "|")) != NULL){
char *scope = NULL;
char *tmp = NULL;
while ((scope = strsep(&pScopeStr, "|")) != NULL) {
taosMemoryFreeClear(tmp);
tmp = taosStrdup(scope);
(void)strtrim(tmp);
@ -1128,13 +1130,13 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
(void)snprintf(defaultFirstEp, TSDB_EP_LEN, "%s:%u", tsLocalFqdn, tsServerPort);
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "firstEp");
SEp firstEp = {0};
SEp firstEp = {0};
TAOS_CHECK_RETURN(taosGetFqdnPortFromEp(strlen(pItem->str) == 0 ? defaultFirstEp : pItem->str, &firstEp));
(void)snprintf(tsFirst, sizeof(tsFirst), "%s:%u", firstEp.fqdn, firstEp.port);
TAOS_CHECK_RETURN(cfgSetItem(pCfg, "firstEp", tsFirst, pItem->stype, true));
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "secondEp");
SEp secondEp = {0};
SEp secondEp = {0};
TAOS_CHECK_RETURN(taosGetFqdnPortFromEp(strlen(pItem->str) == 0 ? defaultFirstEp : pItem->str, &secondEp));
(void)snprintf(tsSecond, sizeof(tsSecond), "%s:%u", secondEp.fqdn, secondEp.port);
TAOS_CHECK_RETURN(cfgSetItem(pCfg, "secondEp", tsSecond, pItem->stype, true));
@ -1622,8 +1624,8 @@ static int32_t taosSetAllDebugFlag(SConfig *pCfg, int32_t flag);
int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd,
const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SConfig *pCfg = NULL;
if (tsCfg == NULL) {
@ -1691,7 +1693,7 @@ int32_t taosReadDataFolder(const char *cfgDir, const char **envCmd, const char *
TAOS_CHECK_RETURN(cfgInit(&pCfg));
TAOS_CHECK_GOTO(cfgAddDir(pCfg, "dataDir", tsDataDir, CFG_SCOPE_SERVER, CFG_DYN_NONE), NULL, _exit);
TAOS_CHECK_GOTO(cfgAddInt32(pCfg, "dDebugFlag", dDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER) ,NULL, _exit);
TAOS_CHECK_GOTO(cfgAddInt32(pCfg, "dDebugFlag", dDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER), NULL, _exit);
if ((code = taosLoadCfg(pCfg, envCmd, cfgDir, envFile, apolloUrl)) != 0) {
printf("failed to load cfg since %s\n", tstrerror(code));
@ -1720,7 +1722,7 @@ _exit:
static int32_t taosCheckGlobalCfg() {
uint32_t ipv4 = 0;
int32_t code = taosGetIpv4FromFqdn(tsLocalFqdn, &ipv4);
int32_t code = taosGetIpv4FromFqdn(tsLocalFqdn, &ipv4);
if (code) {
uError("failed to get ip from fqdn:%s since %s, dnode can not be initialized", tsLocalFqdn, tstrerror(code));
TAOS_RETURN(TSDB_CODE_RPC_FQDN_ERROR);
@ -1825,7 +1827,7 @@ typedef struct {
static int32_t taosCfgSetOption(OptionNameAndVar *pOptions, int32_t optionSize, SConfigItem *pItem, bool isDebugflag) {
int32_t code = TSDB_CODE_CFG_NOT_FOUND;
char *name = pItem->name;
char *name = pItem->name;
for (int32_t d = 0; d < optionSize; ++d) {
const char *optName = pOptions[d].optionName;
if (strcasecmp(name, optName) != 0) continue;
@ -2012,8 +2014,8 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) {
}
case 'f': {
if (strcasecmp("fqdn", name) == 0) {
SConfigItem* pFqdnItem = cfgGetItem(pCfg, "fqdn");
SConfigItem* pServerPortItem = cfgGetItem(pCfg, "serverPort");
SConfigItem *pFqdnItem = cfgGetItem(pCfg, "fqdn");
SConfigItem *pServerPortItem = cfgGetItem(pCfg, "serverPort");
SConfigItem *pFirstEpItem = cfgGetItem(pCfg, "firstEp");
if (pFqdnItem == NULL || pServerPortItem == NULL || pFirstEpItem == NULL) {
uError("failed to get fqdn or serverPort or firstEp from cfg");
@ -2028,7 +2030,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) {
char defaultFirstEp[TSDB_EP_LEN] = {0};
(void)snprintf(defaultFirstEp, TSDB_EP_LEN, "%s:%u", tsLocalFqdn, tsServerPort);
SEp firstEp = {0};
SEp firstEp = {0};
TAOS_CHECK_GOTO(
taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp), &lino,
_out);
@ -2068,8 +2070,8 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) {
}
case 'l': {
if (strcasecmp("locale", name) == 0) {
SConfigItem* pLocaleItem = cfgGetItem(pCfg, "locale");
SConfigItem* pCharsetItem = cfgGetItem(pCfg, "charset");
SConfigItem *pLocaleItem = cfgGetItem(pCfg, "locale");
SConfigItem *pCharsetItem = cfgGetItem(pCfg, "charset");
if (pLocaleItem == NULL || pCharsetItem == NULL) {
uError("failed to get locale or charset from cfg");
code = TSDB_CODE_CFG_NOT_FOUND;
@ -2147,7 +2149,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) {
char defaultFirstEp[TSDB_EP_LEN] = {0};
(void)snprintf(defaultFirstEp, TSDB_EP_LEN, "%s:%u", tsLocalFqdn, tsServerPort);
SEp firstEp = {0};
SEp firstEp = {0};
TAOS_CHECK_GOTO(
taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp), &lino,
_out);
@ -2275,7 +2277,7 @@ int32_t taosSetGlobalDebugFlag(int32_t flag) { return taosSetAllDebugFlag(tsCfg,
// NOTE: set all command does not change the tmrDebugFlag
static int32_t taosSetAllDebugFlag(SConfig *pCfg, int32_t flag) {
if (flag < 0) TAOS_RETURN(TSDB_CODE_INVALID_PARA);
if (flag == 0) TAOS_RETURN(TSDB_CODE_SUCCESS); // just ignore
if (flag == 0) TAOS_RETURN(TSDB_CODE_SUCCESS); // just ignore
SArray *noNeedToSetVars = NULL;
SConfigItem *pItem = NULL;

View File

@ -351,12 +351,13 @@ void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType);
// request list
typedef struct STransReq {
queue q;
uv_write_t wreq;
queue q;
queue node;
void* conn;
} STransReq;
void transReqQueueInit(queue* q);
void* transReqQueuePush(queue* q);
void* transReqQueuePush(queue* q, STransReq* req);
void* transReqQueueRemove(void* arg);
void transReqQueueClear(queue* q);

View File

@ -154,7 +154,7 @@ typedef struct SCliThrd {
int32_t (*notifyCb)(void* arg, SCliReq* pReq, STransMsg* pResp);
int32_t (*notifyExceptCb)(void* arg, SCliReq* pReq, STransMsg* pResp);
SHashObj* pIdConnTable;
SHashObj* pIdConnTable; // <qid, conn>
} SCliThrd;
typedef struct SCliObj {
@ -209,6 +209,18 @@ static void cliSendBatchCb(uv_write_t* req, int status);
SCliBatch* cliGetHeadFromList(SCliBatchList* pList);
void destroyCliConnQTable(SCliConn* conn) {
void* pIter = taosHashIterate(conn->pQTable, NULL);
while (pIter != NULL) {
int64_t* qid = taosHashGetKey(pIter, NULL);
STransCtx* ctx = pIter;
transCtxCleanup(ctx);
pIter = taosHashIterate(conn->pQTable, pIter);
}
taosHashCleanup(conn->pQTable);
conn->pQTable = NULL;
}
// static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead);
static int32_t allocConnRef(SCliConn* conn, bool update);
@ -266,7 +278,7 @@ static FORCE_INLINE int cliRBChoseIdx(STrans* pInst);
static FORCE_INLINE void destroyReqCtx(SReqCtx* ctx);
int32_t cliMayUpdateState(SCliThrd* pThrd, SCliReq* pReq, SCliConn* pConn);
int32_t cliMayGetHandleState(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn);
int32_t cliMayGetStateByQid(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn);
static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key);
static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn);
@ -503,7 +515,9 @@ int32_t cliBuildRespFromCont(SCliReq* pReq, STransMsg* pResp, STransMsgHead* pHe
pResp->pCont = transContFromHead((char*)pHead);
pResp->code = pHead->code;
pResp->msgType = pHead->msgType;
pResp->info.ahandle = pReq->ctx ? pReq->ctx->ahandle : NULL;
if (pResp->info.ahandle == 0) {
pResp->info.ahandle = (pReq && pReq->ctx) ? pReq->ctx->ahandle : NULL;
}
pResp->info.traceId = pHead->traceId;
pResp->info.hasEpSet = pHead->hasEpSet;
pResp->info.cliVer = htonl(pHead->compatibilityVer);
@ -513,11 +527,17 @@ int32_t cliBuildRespFromCont(SCliReq* pReq, STransMsg* pResp, STransMsgHead* pHe
pResp->info.handle = (void*)qid;
return 0;
}
int32_t cliConnMayHandleReleasReq(SCliConn* conn, STransMsgHead* pHead) {
int32_t cliConnMayHandleState_releaseReq(SCliConn* conn, STransMsgHead* pHead) {
int32_t code = 0;
SCliThrd* pThrd = conn->hostThrd;
if (pHead->msgType == TDMT_SCH_TASK_RELEASE || pHead->msgType == TDMT_SCH_TASK_RELEASE + 1) {
int64_t qId = taosHton64(pHead->qid);
int64_t qId = taosHton64(pHead->qid);
STraceId* trace = &pHead->traceId;
tGDebug("%s conn %p receive release req, qid:%ld", CONN_GET_INST_LABEL(conn), conn, qId);
STransCtx* p = taosHashGet(conn->pQTable, &qId, sizeof(qId));
transCtxCleanup(p);
code = taosHashRemove(conn->pQTable, &qId, sizeof(qId));
if (code != 0) {
tDebug("%s conn %p failed to release req %ld from conn", CONN_GET_INST_LABEL(conn), conn, qId);
@ -527,21 +547,41 @@ int32_t cliConnMayHandleReleasReq(SCliConn* conn, STransMsgHead* pHead) {
if (code != 0) {
tDebug("%s conn %p failed to release req %ld from thrd ", CONN_GET_INST_LABEL(conn), conn, qId);
}
STraceId* trace = &pHead->traceId;
tGDebug("%s conn %p receive release req, qid:%ld", CONN_GET_INST_LABEL(conn), conn, qId);
tDebug("%s %p req size:%d", CONN_GET_INST_LABEL(conn), conn, transQueueSize(&conn->reqs));
for (int32_t i = 0; i < transQueueSize(&conn->reqs); i++) {
SCliReq* pReqs = transQueueGet(&conn->reqs, i);
if (pReqs->msg.info.qId == qId) {
SCliReq* pReq = transQueueGet(&conn->reqs, i);
if (pReq->msg.info.qId == qId) {
transQueueRm(&conn->reqs, i);
destroyReq(pReqs);
if (pReq->msg.info.notFreeAhandle == 0 && pThrd != NULL && pThrd->destroyAhandleFp != NULL) {
pThrd->destroyAhandleFp(pReq->ctx->ahandle);
}
destroyReq(pReq);
i--;
}
}
taosMemoryFree(pHead);
return 1;
}
return 0;
}
int32_t cliMayHandleState(SCliConn* conn, STransMsgHead* pHead, STransMsg* pResp) {
int32_t code = 0;
int64_t qId = taosHton64(pHead->qid);
if (qId == 0) {
return 0;
}
STransCtx* pCtx = taosHashGet(conn->pQTable, &qId, sizeof(qId));
if (pCtx == 0) {
return TSDB_CODE_RPC_NO_STATE;
}
pResp->info.ahandle = transCtxDumpVal(pCtx, pHead->msgType);
tDebug("%s conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(conn), conn, pResp->info.ahandle,
TMSG_INFO(pHead->msgType));
return 0;
}
void cliHandleResp2(SCliConn* conn) {
int32_t code = 0;
SCliThrd* pThrd = conn->hostThrd;
@ -565,33 +605,38 @@ void cliHandleResp2(SCliConn* conn) {
// TODO: notify cb
return;
}
if (cliConnMayHandleReleasReq(conn, pHead)) {
if (cliMayRecycleConn(conn)) {
return;
}
return;
}
int64_t qId = taosHton64(pHead->qid);
pHead->code = htonl(pHead->code);
pHead->msgLen = htonl(pHead->msgLen);
int32_t seq = htonl(pHead->seqNum);
STransMsg resp = {0};
int32_t seq = htonl(pHead->seqNum);
code = cliGetReqBySeq(conn, seq, &pReq);
if (code != 0) {
tDebug("%s conn %p recv unexpected packet, seqNum:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn, seq,
tstrerror(code));
// TODO: notify cb
if (cliConnMayHandleState_releaseReq(conn, pHead)) {
if (cliMayRecycleConn(conn)) {
return;
}
return;
}
code = cliGetReqBySeq(conn, seq, &pReq);
if (code == TSDB_CODE_OUT_OF_RANGE) {
code = cliMayHandleState(conn, pHead, &resp);
if (code == 0) {
code = cliBuildRespFromCont(NULL, &resp, pHead);
code = cliNotifyCb(conn, NULL, &resp);
return;
}
if (code != 0) {
tDebug("%s conn %p recv unexpected packet, seqNum:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn, seq,
tstrerror(code));
// TODO: notify cb
if (cliMayRecycleConn(conn)) {
return;
}
return;
}
}
STransMsg resp = {0};
code = cliBuildRespFromCont(pReq, &resp, pHead);
STraceId* trace = &resp.info.traceId;
if (code != 0) {
@ -1195,8 +1240,9 @@ static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn)
TAOS_CHECK_GOTO(cliCreateConn(pThrd, &pConn, ip, port), NULL, _exception);
code = cliMayUpdateState(pThrd, pReq, pConn);
transQueuePush(&pConn->reqs, pReq);
addConnToHeapCache(pThrd->connHeapCache, pConn);
transQueuePush(&pConn->reqs, pReq);
return cliDoConn(pThrd, pConn);
_exception:
// free conn
@ -1264,6 +1310,8 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int
_failed:
if (conn) {
taosMemoryFree(conn->stream);
destroyCliConnQTable(conn);
taosHashCleanup(conn->pQTable);
(void)transDestroyBuffer(&conn->readBuf);
transQueueDestroy(&conn->reqs);
@ -1339,10 +1387,10 @@ static void cliDestroy(uv_handle_t* handle) {
}
cliDestroyConnMsgs(conn, true);
destroyCliConnQTable(conn);
tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
transReqQueueClear(&conn->wreqQueue);
taosHashCleanup(conn->pQTable);
(void)transDestroyBuffer(&conn->readBuf);
taosMemoryFree(conn);
@ -1673,7 +1721,7 @@ void cliConnCb(uv_connect_t* req, int status) {
cliConnSetSockInfo(pConn);
addConnToHeapCache(pThrd->connHeapCache, pConn);
// addConnToHeapCache(pThrd->connHeapCache, pConn);
tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
(void)cliSend2(pConn);
@ -1941,21 +1989,22 @@ static void doFreeTimeoutMsg(void* param) {
taosMemoryFree(arg);
}
int32_t cliConnHandleQueryById(SCliReq* pReq) {
if (pReq->msg.info.handle == 0) {
return 0;
int32_t clConnMayUpdateReqCtx(SCliConn* pConn, SCliReq* pReq) {
int32_t code = 0;
int64_t qid = pReq->msg.info.qId;
SReqCtx* pCtx = pReq->ctx;
STransCtx* pUserCtx = taosHashGet(pConn->pQTable, &qid, sizeof(qid));
if (pUserCtx == NULL) {
code = taosHashPut(pConn->pQTable, &qid, sizeof(qid), &pCtx->userCtx, sizeof(pCtx->userCtx));
tDebug("succ to add conn %p of statue ctx, qid:%ld", pConn, qid);
} else {
int64_t queryId = (int64_t)pReq->msg.info.handle;
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), queryId);
if (exh->inited == 1) {
} else {
}
transReleaseExHandle(transGetRefMgt(), queryId);
transCtxMerge(pUserCtx, &pCtx->userCtx);
tDebug("succ to update conn %p of statue ctx, qid:%ld", pConn, qid);
}
return 0;
}
int32_t cliMayGetHandleState(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn) {
int32_t cliMayGetStateByQid(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn) {
int32_t code = 0;
int64_t qid = pReq->msg.info.qId;
if (qid == 0) {
@ -1965,12 +2014,13 @@ int32_t cliMayGetHandleState(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn) {
SReqState* pState = taosHashGet(pThrd->pIdConnTable, &qid, sizeof(qid));
if (pState == NULL) {
tDebug("failed to get statue, qid:%ld", qid);
// ASSERT(0);
return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
} else {
*pConn = pState->conn;
tDebug("succ to get conn of statue, qid:%ld", qid);
}
return code;
return 0;
}
int32_t cliMayUpdateState(SCliThrd* pThrd, SCliReq* pReq, SCliConn* pConn) {
@ -1984,6 +2034,7 @@ int32_t cliMayUpdateState(SCliThrd* pThrd, SCliReq* pReq, SCliConn* pConn) {
tDebug("succ to get conn %p of statue, qid:%ld", pConn, qid);
ASSERT(0);
}
SReqState state = {.conn = pConn, .arg = NULL};
code = taosHashPut(pThrd->pIdConnTable, &qid, sizeof(qid), &state, sizeof(state));
if (code != 0) {
@ -1992,13 +2043,7 @@ int32_t cliMayUpdateState(SCliThrd* pThrd, SCliReq* pReq, SCliConn* pConn) {
tDebug("succ to add conn %p of statue, qid:%ld (1)", pConn, qid);
}
int32_t dummy = 0;
code = taosHashPut(pConn->pQTable, &qid, sizeof(qid), &dummy, sizeof(dummy));
if (code != 0) {
tDebug("failed to add conn %p of statue, qid:%ld", pConn, qid);
} else {
tDebug("succ to add conn %p of statue, qid:%ld(2)", pConn, qid);
}
(void)clConnMayUpdateReqCtx(pConn, pReq);
return code;
}
void cliHandleReq__noShareConn(SCliThrd* pThrd, SCliReq* pReq) {
@ -2009,7 +2054,10 @@ void cliHandleReq__noShareConn(SCliThrd* pThrd, SCliReq* pReq) {
STrans* pInst = pThrd->pInst;
SCliConn* pConn = NULL;
code = cliMayGetHandleState(pThrd, pReq, &pConn);
code = cliMayGetStateByQid(pThrd, pReq, &pConn);
if (code == 0) {
(void)clConnMayUpdateReqCtx(pConn, pReq);
}
if (code == TSDB_CODE_RPC_NO_STATE || code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) {
char addr[TSDB_FQDN_LEN + 64] = {0};
@ -2933,9 +2981,13 @@ void cliMayResetRespCode(SCliReq* pReq, STransMsg* pResp) {
int32_t cliNotifyImplCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) {
SCliThrd* pThrd = pConn->hostThrd;
STrans* pInst = pThrd->pInst;
SReqCtx* pCtx = pReq->ctx;
SReqCtx* pCtx = pReq ? pReq->ctx : NULL;
STraceId* trace = &pResp->info.traceId;
if (pCtx == NULL) {
pInst->cfp(pInst->parent, pResp, NULL);
return 0;
}
if (pCtx->pSem || pCtx->syncMsgRef != 0) {
tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn);
if (pCtx->pSem) {
@ -2978,14 +3030,16 @@ int32_t cliNotifyCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) {
SCliThrd* pThrd = pConn->hostThrd;
STrans* pInst = pThrd->pInst;
if (cliMayRetry(pConn, pReq, pResp)) {
return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
}
if (pReq != NULL) {
if (cliMayRetry(pConn, pReq, pResp)) {
return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
}
cliMayResetRespCode(pReq, pResp);
cliMayResetRespCode(pReq, pResp);
if (cliTryUpdateEpset(pReq, pResp)) {
cliPerfLog_epset(pConn, pReq);
if (cliTryUpdateEpset(pReq, pResp)) {
cliPerfLog_epset(pConn, pReq);
}
}
return cliNotifyImplCb(pConn, pReq, pResp);
}

View File

@ -15,7 +15,7 @@
#include "transComm.h"
#define BUFFER_CAP 4096
#define BUFFER_CAP 16 * 4096
static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
@ -341,7 +341,7 @@ int transAsyncSend(SAsyncPool* pool, queue* q) {
void transCtxInit(STransCtx* ctx) {
// init transCtx
ctx->args = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UINT), true, HASH_NO_LOCK);
ctx->args = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
}
void transCtxCleanup(STransCtx* ctx) {
if (ctx->args == NULL) {
@ -350,6 +350,8 @@ void transCtxCleanup(STransCtx* ctx) {
STransCtxVal* iter = taosHashIterate(ctx->args, NULL);
while (iter) {
int32_t* type = taosHashGetKey(iter, NULL);
tDebug("free msg type %s dump func", TMSG_INFO(*type));
ctx->freeFunc(iter->val);
iter = taosHashIterate(ctx->args, iter);
}
@ -409,27 +411,22 @@ void transReqQueueInit(queue* q) {
// init req queue
QUEUE_INIT(q);
}
void* transReqQueuePush(queue* q) {
STransReq* req = taosMemoryCalloc(1, sizeof(STransReq));
if (req == NULL) {
return NULL;
}
req->wreq.data = req;
QUEUE_PUSH(q, &req->q);
return &req->wreq;
void* transReqQueuePush(queue* q, STransReq* userReq) {
uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
req->data = userReq;
QUEUE_PUSH(q, &userReq->q);
return req;
}
void* transReqQueueRemove(void* arg) {
void* ret = NULL;
uv_write_t* wreq = arg;
uv_write_t* req = arg;
STransReq* req = wreq ? wreq->data : NULL;
STransReq* userReq = req ? req->data : NULL;
if (req == NULL) return NULL;
QUEUE_REMOVE(&req->q);
QUEUE_REMOVE(&userReq->q);
ret = wreq && wreq->handle ? wreq->handle->data : NULL;
taosMemoryFree(req);
return ret;
return userReq;
}
void transReqQueueClear(queue* q) {
while (!QUEUE_IS_EMPTY(q)) {

View File

@ -72,6 +72,9 @@ typedef struct SSvrMsg {
void* arg;
FilteFunc func;
int8_t sent;
queue sendReq;
} SSvrMsg;
typedef struct {
@ -618,17 +621,35 @@ void uvOnTimeoutCb(uv_timer_t* handle) {
void uvOnSendCb(uv_write_t* req, int status) {
STUB_RAND_NETWORK_ERR(status);
SSvrConn* conn = transReqQueueRemove(req);
if (conn == NULL) return;
queue q;
QUEUE_INIT(&q);
STransReq* userReq = transReqQueueRemove(req);
SSvrConn* conn = userReq->conn;
queue* src = &userReq->node;
while (!QUEUE_IS_EMPTY(src)) {
queue* head = QUEUE_HEAD(src);
QUEUE_REMOVE(head);
QUEUE_PUSH(&q, head);
// }
}
// QUEUE_MOVE(src, &q);
tDebug("%s conn %p send data", transLabel(conn->pInst), conn);
if (status == 0) {
for (int32_t i = 0; i < transQueueSize(&conn->srvMsgs); i++) {
SSvrMsg* smsg = transQueueGet(&conn->srvMsgs, i);
if (smsg->sent == 1) {
transQueueRm(&conn->srvMsgs, i);
destroySmsg(smsg);
i--;
}
while (!QUEUE_IS_EMPTY(&q)) {
queue* head = QUEUE_HEAD(&q);
QUEUE_REMOVE(head);
SSvrMsg* smsg = QUEUE_DATA(head, SSvrMsg, sendReq);
STraceId* trace = &smsg->msg.info.traceId;
tGDebug("%s conn %p msg already send out, seqNum:%d, qid:%ld", transLabel(conn->pInst), conn,
smsg->msg.info.seqNum, smsg->msg.info.qId);
destroySmsg(smsg);
}
} else {
if (!uv_is_closing((uv_handle_t*)(conn->pTcp))) {
@ -705,7 +726,7 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
wb->len = len;
return 0;
}
static int32_t uvBuildToSendData(SSvrConn* pConn, uv_buf_t** ppBuf, int32_t* bufNum) {
static int32_t uvBuildToSendData(SSvrConn* pConn, uv_buf_t** ppBuf, int32_t* bufNum, queue* sendReqNode) {
int32_t count = 0;
int32_t size = transQueueSize(&pConn->srvMsgs);
@ -713,7 +734,9 @@ static int32_t uvBuildToSendData(SSvrConn* pConn, uv_buf_t** ppBuf, int32_t* buf
if (pWb == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < size; i++) {
tDebug("%s conn %p has %d msg to send", transLabel(pConn->pInst), pConn, size);
for (int32_t i = 0; i < transQueueSize(&pConn->srvMsgs); i++) {
SSvrMsg* pMsg = transQueueGet(&pConn->srvMsgs, i);
if (pMsg->sent == 1) {
continue;
@ -721,8 +744,12 @@ static int32_t uvBuildToSendData(SSvrConn* pConn, uv_buf_t** ppBuf, int32_t* buf
uv_buf_t wb;
(void)uvPrepareSendData(pMsg, &wb);
pWb[count] = wb;
pMsg->sent = 1;
QUEUE_PUSH(sendReqNode, &pMsg->sendReq);
transQueueRm(&pConn->srvMsgs, i);
i--;
count++;
}
@ -743,20 +770,30 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg) {
if (pConn->broken) {
return;
}
queue sendReqNode;
QUEUE_INIT(&sendReqNode);
uv_buf_t* pBuf = NULL;
int32_t bufNum = 0;
code = uvBuildToSendData(pConn, &pBuf, &bufNum);
code = uvBuildToSendData(pConn, &pBuf, &bufNum, &sendReqNode);
if (code != 0) {
tError("%s conn %p failed to send data", transLabel(pConn->pInst), pConn);
return;
}
if (bufNum == 0) {
tDebug("%s conn %p no data to send", transLabel(pConn->pInst), pConn);
return;
}
transRefSrvHandle(pConn);
uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
STransReq* pWreq = taosMemoryCalloc(1, sizeof(STransReq));
pWreq->conn = pConn;
QUEUE_INIT(&pWreq->q);
QUEUE_MOVE(&sendReqNode, &pWreq->node);
uv_write_t* req = transReqQueuePush(&pConn->wreqQueue, pWreq);
if (req == NULL) {
if (!uv_is_closing((uv_handle_t*)(pConn->pTcp))) {
tError("conn %p failed to write data, reason:%s", pConn, tstrerror(TSDB_CODE_OUT_OF_MEMORY));