refactor transport

This commit is contained in:
Yihao Deng 2024-06-14 12:57:28 +00:00
parent 3cf063e135
commit e9c358c9d4
4 changed files with 181 additions and 102 deletions

View File

@ -187,6 +187,7 @@ typedef struct {
uint32_t code; // del later uint32_t code; // del later
uint32_t msgType; uint32_t msgType;
int32_t msgLen; int32_t msgLen;
int32_t seqNum;
uint8_t content[0]; // message body starts from here uint8_t content[0]; // message body starts from here
} STransMsgHead; } STransMsgHead;

View File

@ -84,6 +84,7 @@ typedef struct SCliConn {
char dst[32]; char dst[32];
int64_t refId; int64_t refId;
int32_t seq;
} SCliConn; } SCliConn;
typedef struct SCliMsg { typedef struct SCliMsg {
@ -96,6 +97,7 @@ typedef struct SCliMsg {
uint64_t st; uint64_t st;
int sent; //(0: no send, 1: alread sent) int sent; //(0: no send, 1: alread sent)
queue seqq; queue seqq;
int32_t seqNum;
} SCliMsg; } SCliMsg;
typedef struct SCliThrd { typedef struct SCliThrd {
@ -402,6 +404,7 @@ void cliResetConnTimer(SCliConn* conn) {
} }
} }
void cliHandleBatchResp(SCliConn* conn) { void cliHandleBatchResp(SCliConn* conn) {
ASSERT(0);
SCliThrd* pThrd = conn->hostThrd; SCliThrd* pThrd = conn->hostThrd;
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
cliResetConnTimer(conn); cliResetConnTimer(conn);
@ -430,9 +433,70 @@ void cliHandleBatchResp(SCliConn* conn) {
return; return;
} }
} }
SCliMsg* cliFindMsgBySeqnum(SCliConn* conn, int32_t seqNum) {
SCliMsg* pMsg = NULL;
for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) {
pMsg = transQueueGet(&conn->cliMsgs, i);
if (pMsg->seqNum == seqNum) {
transQueueRm(&conn->cliMsgs, i);
break;
}
}
if (pMsg == NULL) {
ASSERT(0);
}
return pMsg;
}
void cliHandleResp_shareConn(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd;
STrans* pTransInst = pThrd->pTransInst;
cliResetConnTimer(conn);
STransMsgHead* pHead = NULL;
int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead);
if (msgLen <= 0) {
taosMemoryFree(pHead);
tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
return;
}
if (transDecompressMsg((char**)&pHead, msgLen) < 0) {
tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn);
}
pHead->code = htonl(pHead->code);
pHead->msgLen = htonl(pHead->msgLen);
STransMsg transMsg = {0};
transMsg.contLen = transContLenFromMsg(pHead->msgLen);
transMsg.pCont = transContFromHead((char*)pHead);
transMsg.code = pHead->code;
transMsg.msgType = pHead->msgType;
transMsg.info.ahandle = NULL;
transMsg.info.traceId = pHead->traceId;
transMsg.info.hasEpSet = pHead->hasEpSet;
transMsg.info.cliVer = htonl(pHead->compatibilityVer);
SCliMsg* pMsg = cliFindMsgBySeqnum(conn, pHead->seqNum);
pMsg->seqNum = 0;
STransConnCtx* pCtx = pMsg->ctx;
transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
STraceId* trace = &transMsg.info.traceId;
if (cliAppCb(conn, &transMsg, pMsg) != 0) {
return;
}
return;
}
void cliHandleResp(SCliConn* conn) { void cliHandleResp(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd; SCliThrd* pThrd = conn->hostThrd;
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
if (pTransInst->shareConn) {
return cliHandleResp_shareConn(conn);
}
cliResetConnTimer(conn); cliResetConnTimer(conn);
STransMsgHead* pHead = NULL; STransMsgHead* pHead = NULL;
@ -716,6 +780,7 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task); transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
conn->task = NULL; conn->task = NULL;
} }
conn->seq++;
return conn; return conn;
} }
@ -813,6 +878,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
} }
cliDestroyConnMsgs(conn, false); cliDestroyConnMsgs(conn, false);
conn->seq = 0;
if (conn->list == NULL) { if (conn->list == NULL) {
conn->list = taosHashGet((SHashObj*)pool, conn->dstAddr, strlen(conn->dstAddr)); conn->list = taosHashGet((SHashObj*)pool, conn->dstAddr, strlen(conn->dstAddr));
@ -871,6 +937,7 @@ static int32_t allocConnRef(SCliConn* conn, bool update) {
} }
static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) { static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) {
if (handle == 0) return -1;
if (update) { if (update) {
transReleaseExHandle(transGetRefMgt(), conn->refId); transReleaseExHandle(transGetRefMgt(), conn->refId);
transRemoveExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId);
@ -958,7 +1025,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
conn->status = ConnNormal; conn->status = ConnNormal;
conn->broken = false; conn->broken = false;
transRefCliHandle(conn); transRefCliHandle(conn);
conn->seq = 0;
allocConnRef(conn, false); allocConnRef(conn, false);
return conn; return conn;
@ -1078,7 +1145,6 @@ static void cliSendCb(uv_write_t* req, int status) {
static void cliSendBatch_shareConnCb(uv_write_t* req, int status) { static void cliSendBatch_shareConnCb(uv_write_t* req, int status) {
SCliConn* conn = req->data; SCliConn* conn = req->data;
SCliThrd* thrd = conn->hostThrd;
if (status != 0) { if (status != 0) {
tDebug("%s conn %p failed to send batch msg, reason:%s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(status)); tDebug("%s conn %p failed to send batch msg, reason:%s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(status));
if (!uv_is_closing((uv_handle_t*)&conn->stream)) { if (!uv_is_closing((uv_handle_t*)&conn->stream)) {
@ -1093,7 +1159,8 @@ void cliSendBatch_shareConn(SCliConn* pConn) {
SCliThrd* pThrd = pConn->hostThrd; SCliThrd* pThrd = pConn->hostThrd;
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
int32_t size = transQueueSize(&pConn->cliMsgs); int32_t size = transQueueSize(&pConn->cliMsgs);
int32_t totalLen = 0;
int32_t totalLen = 0;
if (size == 0) { if (size == 0) {
tError("%s conn %p not msg to send", pTransInst->label, pConn); tError("%s conn %p not msg to send", pTransInst->label, pConn);
cliHandleExcept(pConn); cliHandleExcept(pConn);
@ -1101,10 +1168,14 @@ void cliSendBatch_shareConn(SCliConn* pConn) {
} }
uv_buf_t* wb = taosMemoryCalloc(size, sizeof(uv_buf_t)); uv_buf_t* wb = taosMemoryCalloc(size, sizeof(uv_buf_t));
int j = 0;
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
SCliMsg* pCliMsg = transQueueGet(&pConn->cliMsgs, i); SCliMsg* pCliMsg = transQueueGet(&pConn->cliMsgs, i);
if (pCliMsg->sent == 1) {
continue;
}
STransConnCtx* pCtx = pCliMsg->ctx; STransConnCtx* pCtx = pCliMsg->ctx;
pConn->seq++;
STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg); STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg);
if (pMsg->pCont == 0) { if (pMsg->pCont == 0) {
@ -1112,7 +1183,8 @@ void cliSendBatch_shareConn(SCliConn* pConn) {
pMsg->contLen = 0; pMsg->contLen = 0;
} }
int msgLen = transMsgLenFromCont(pMsg->contLen); int msgLen = transMsgLenFromCont(pMsg->contLen);
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
if (pHead->comp == 0) { if (pHead->comp == 0) {
@ -1129,6 +1201,7 @@ void cliSendBatch_shareConn(SCliConn* pConn) {
pHead->compatibilityVer = htonl(pTransInst->compatibilityVer); pHead->compatibilityVer = htonl(pTransInst->compatibilityVer);
} }
pHead->timestamp = taosHton64(taosGetTimestampUs()); pHead->timestamp = taosHton64(taosGetTimestampUs());
pHead->seqNum = pConn->seq;
if (pHead->comp == 0) { if (pHead->comp == 0) {
if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) { if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) {
@ -1138,14 +1211,17 @@ void cliSendBatch_shareConn(SCliConn* pConn) {
} else { } else {
msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen)); msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen));
} }
wb[i++] = uv_buf_init((char*)pHead, msgLen); wb[j++] = uv_buf_init((char*)pHead, msgLen);
totalLen += msgLen; totalLen += msgLen;
pCliMsg->sent = 1;
pCliMsg->seqNum = pHead->seqNum;
} }
uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
req->data = pConn; req->data = pConn;
tDebug("%s conn %p start to send batch msg, batch size:%d, len:%d", CONN_GET_INST_LABEL(pConn), pConn, size, tDebug("%s conn %p start to send batch msg, batch size:%d, len:%d", CONN_GET_INST_LABEL(pConn), pConn, size,
totalLen); totalLen);
uv_write(req, (uv_stream_t*)pConn->stream, wb, size, cliSendBatch_shareConnCb); uv_write(req, (uv_stream_t*)pConn->stream, wb, j, cliSendBatch_shareConnCb);
taosMemoryFree(wb); taosMemoryFree(wb);
} }
void cliSendBatch(SCliConn* pConn) { void cliSendBatch(SCliConn* pConn) {
@ -1422,41 +1498,30 @@ static void cliSendBatchCb(uv_write_t* req, int status) {
cliDestroyBatch(p); cliDestroyBatch(p);
taosMemoryFree(req); taosMemoryFree(req);
} }
static void cliHandleFastFail(SCliConn* pConn, int status) {
static void cliHandleFastFail_resp(SCliConn* pConn, int status) {
SCliThrd* pThrd = pConn->hostThrd; SCliThrd* pThrd = pConn->hostThrd;
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0);
STraceId* trace = &pMsg->msg.info.traceId;
tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn),
TMSG_INFO(pMsg->msg.msgType), pConn, pConn->dstAddr, uv_strerror(status));
}
static void cliHandleFastFail_noresp(SCliConn* pConn, int status) {
tError("%s batch msg failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn), pConn,
pConn->dstAddr, uv_strerror(status));
cliDestroyBatch(pConn->pBatch);
pConn->pBatch = NULL;
}
static void cliHandleFastFail(SCliConn* pConn, int status) {
if (status == -1) status = UV_EADDRNOTAVAIL; if (status == -1) status = UV_EADDRNOTAVAIL;
if (pConn->pBatch == NULL) { if (pConn->pBatch == NULL) {
SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0); cliHandleFastFail_resp(pConn, status);
STraceId* trace = &pMsg->msg.info.traceId;
tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn),
TMSG_INFO(pMsg->msg.msgType), pConn, pConn->dstAddr, uv_strerror(status));
if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) &&
(pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) {
SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr));
int64_t cTimestamp = taosGetTimestampMs();
if (item != NULL) {
int32_t elapse = cTimestamp - item->timestamp;
if (elapse >= 0 && elapse <= pTransInst->failFastInterval) {
item->count++;
} else {
item->count = 1;
item->timestamp = cTimestamp;
}
} else {
SFailFastItem item = {.count = 1, .timestamp = cTimestamp};
taosHashPut(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr), &item, sizeof(SFailFastItem));
}
}
} else { } else {
tError("%s batch msg failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn), cliHandleFastFail_noresp(pConn, status);
pConn, pConn->dstAddr, uv_strerror(status));
cliDestroyBatch(pConn->pBatch);
pConn->pBatch = NULL;
} }
cliHandleExcept(pConn); cliHandleExcept(pConn);
} }
@ -1616,9 +1681,8 @@ FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) {
FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) { FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) {
if (code != 0) return false; if (code != 0) return false;
// if (pCtx->retryCnt == 0) return false;
if (transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) return false; return transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet) ? false : true;
return true;
} }
FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) { FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) {
if (pMsg == NULL) return -1; if (pMsg == NULL) return -1;
@ -1686,11 +1750,12 @@ static void doFreeTimeoutMsg(void* param) {
SCliMsg* pMsg = arg->param1; SCliMsg* pMsg = arg->param1;
SCliThrd* pThrd = arg->param2; SCliThrd* pThrd = arg->param2;
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
int32_t code = TSDB_CODE_RPC_MAX_SESSIONS;
QUEUE_REMOVE(&pMsg->q); QUEUE_REMOVE(&pMsg->q);
STraceId* trace = &pMsg->msg.info.traceId; STraceId* trace = &pMsg->msg.info.traceId;
tGTrace("%s msg %s cannot get available conn after timeout", pTransInst->label, TMSG_INFO(pMsg->msg.msgType)); tGTrace("%s msg %s cannot get available conn after timeout", pTransInst->label, TMSG_INFO(pMsg->msg.msgType));
doNotifyApp(pMsg, pThrd, code); doNotifyApp(pMsg, pThrd, TSDB_CODE_RPC_MAX_SESSIONS);
taosMemoryFree(arg); taosMemoryFree(arg);
} }
@ -1718,6 +1783,19 @@ static void addConnToHeapCache(SHashObj* pConnHeapCacahe, char* key, SCliConn* p
} }
transHeapInsert(p, pConn); transHeapInsert(p, pConn);
} }
static void delConnFromHeapCache(SHashObj* pConnHeapCache, char* key, SCliConn* pConn) {
size_t klen = strlen(key);
SHeap* p = taosHashGet(pConnHeapCache, key, klen);
if (p == NULL) {
tDebug("failed to get heap cache for key:%s, no need to del", key);
return;
}
int ret = transHeapDelete(p, pConn);
if (ret != 0) {
tDebug("failed to delete conn %p from heap cache, no need to del", pConn);
}
}
void cliHandleReq__shareConn(SCliMsg* pMsg, SCliThrd* pThrd) { void cliHandleReq__shareConn(SCliMsg* pMsg, SCliThrd* pThrd) {
STraceId* trace = &pMsg->msg.info.traceId; STraceId* trace = &pMsg->msg.info.traceId;
@ -1742,6 +1820,7 @@ void cliHandleReq__shareConn(SCliMsg* pMsg, SCliThrd* pThrd) {
} }
pConn = cliCreateConn(pThrd); pConn = cliCreateConn(pThrd);
pConn->dstAddr = taosStrdup(addr);
addConnToHeapCache(pThrd->connHeapCache, addr, pConn); addConnToHeapCache(pThrd->connHeapCache, addr, pConn);
return cliDoConn(pThrd, pConn, EPSET_GET_INUSE_IP(&pMsg->ctx->epSet), EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet)); return cliDoConn(pThrd, pConn, EPSET_GET_INUSE_IP(&pMsg->ctx->epSet), EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet));
@ -1790,15 +1869,13 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
cliSend(conn); cliSend(conn);
} else { } else {
conn = cliCreateConn(pThrd); conn = cliCreateConn(pThrd);
conn->dstAddr = taosStrdup(addr);
int64_t refId = (int64_t)pMsg->msg.info.handle; specifyConnRef(conn, true, (int64_t)pMsg->msg.info.handle);
if (refId != 0) specifyConnRef(conn, true, refId);
transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx); transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);
transQueuePush(&conn->cliMsgs, pMsg); transQueuePush(&conn->cliMsgs, pMsg);
conn->dstAddr = taosStrdup(addr);
uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn); uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn);
if (ipaddr == 0xffffffff) { if (ipaddr == 0xffffffff) {
cliResetConnTimer(conn); cliResetConnTimer(conn);
@ -2234,7 +2311,6 @@ static SCliThrd* createThrdObj(void* trans) {
pThrd->destroyAhandleFp = pTransInst->destroyFp; pThrd->destroyAhandleFp = pTransInst->destroyFp;
pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pThrd->connHeapCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pThrd->connHeapCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
@ -2267,7 +2343,6 @@ static void destroyThrdObj(SCliThrd* pThrd) {
taosMemoryFree(pThrd->prepare); taosMemoryFree(pThrd->prepare);
taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd->loop);
taosHashCleanup(pThrd->fqdn2ipCache); taosHashCleanup(pThrd->fqdn2ipCache);
taosHashCleanup(pThrd->failFastCache);
void** pIter = taosHashIterate(pThrd->batchCache, NULL); void** pIter = taosHashIterate(pThrd->batchCache, NULL);
while (pIter != NULL) { while (pIter != NULL) {

View File

@ -326,10 +326,6 @@ void transCtxMerge(STransCtx* dst, STransCtx* src) {
STransCtxVal* sVal = (STransCtxVal*)iter; STransCtxVal* sVal = (STransCtxVal*)iter;
key = taosHashGetKey(sVal, &klen); key = taosHashGetKey(sVal, &klen);
// STransCtxVal* dVal = taosHashGet(dst->args, key, klen);
// if (dVal) {
// dst->freeFunc(dVal->val);
// }
taosHashPut(dst->args, key, klen, sVal, sizeof(*sVal)); taosHashPut(dst->args, key, klen, sVal, sizeof(*sVal));
iter = taosHashIterate(src->args, iter); iter = taosHashIterate(src->args, iter);
} }

View File

@ -336,6 +336,59 @@ void uvWhiteListSetConnVer(SIpWhiteListTab* pWhite, SSvrConn* pConn) {
pConn->whiteListVer = pWhite->ver; pConn->whiteListVer = pWhite->ver;
} }
static void uvPerfLog(SSvrConn* pConn, STransMsgHead* pHead, STransMsg* pTransMsg) {
STrans* pTransInst = pConn->pTransInst;
STraceId* trace = &pHead->traceId;
int64_t cost = taosGetTimestampUs() - taosNtoh64(pHead->timestamp);
static int64_t EXCEPTION_LIMIT_US = 100 * 1000;
if (pConn->status == ConnNormal && pHead->noResp == 0) {
transRefSrvHandle(pConn);
if (cost >= EXCEPTION_LIMIT_US) {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception",
transLabel(pTransInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen,
(int)cost);
} else {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus", transLabel(pTransInst), pConn,
TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, (int)cost);
}
} else {
if (cost >= EXCEPTION_LIMIT_US) {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, recv exception",
transLabel(pTransInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen,
pHead->noResp, pTransMsg->code, (int)(cost));
} else {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus",
transLabel(pTransInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen,
pHead->noResp, pTransMsg->code, (int)(cost));
}
}
tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pTransInst), pTransMsg->info.handle,
pConn, pConn->refId);
}
static int8_t uvValidConn(SSvrConn* pConn) {
STrans* pTransInst = pConn->pTransInst;
SWorkThrd* pThrd = pConn->hostThrd;
int8_t forbiddenIp = 0;
if (pThrd->enableIpWhiteList) {
forbiddenIp = !uvWhiteListCheckConn(pThrd->pWhiteList, pConn) ? 1 : 0;
if (forbiddenIp == 0) {
uvWhiteListSetConnVer(pThrd->pWhiteList, pConn);
}
}
return forbiddenIp;
}
static void uvMaySetConnAcquired(SSvrConn* pConn, STransMsgHead* pHead) {
if (pConn->status == ConnNormal) {
if (pHead->persist == 1) {
pConn->status = ConnAcquire;
transRefSrvHandle(pConn);
tDebug("conn %p acquired by server app", pConn);
}
}
}
static bool uvHandleReq(SSvrConn* pConn) { static bool uvHandleReq(SSvrConn* pConn) {
STrans* pTransInst = pConn->pTransInst; STrans* pTransInst = pConn->pTransInst;
SWorkThrd* pThrd = pConn->hostThrd; SWorkThrd* pThrd = pConn->hostThrd;
@ -358,14 +411,6 @@ static bool uvHandleReq(SSvrConn* pConn) {
pConn->inType = pHead->msgType; pConn->inType = pHead->msgType;
memcpy(pConn->user, pHead->user, strlen(pHead->user)); memcpy(pConn->user, pHead->user, strlen(pHead->user));
int8_t forbiddenIp = 0;
if (pThrd->enableIpWhiteList) {
forbiddenIp = !uvWhiteListCheckConn(pThrd->pWhiteList, pConn) ? 1 : 0;
if (forbiddenIp == 0) {
uvWhiteListSetConnVer(pThrd->pWhiteList, pConn);
}
}
if (uvRecvReleaseReq(pConn, pHead)) { if (uvRecvReleaseReq(pConn, pHead)) {
return true; return true;
} }
@ -384,38 +429,7 @@ static bool uvHandleReq(SSvrConn* pConn) {
transMsg.msgType = pHead->msgType; transMsg.msgType = pHead->msgType;
transMsg.code = pHead->code; transMsg.code = pHead->code;
if (pConn->status == ConnNormal) { uvMaySetConnAcquired(pConn, pHead);
if (pHead->persist == 1) {
pConn->status = ConnAcquire;
transRefSrvHandle(pConn);
tDebug("conn %p acquired by server app", pConn);
}
}
STraceId* trace = &pHead->traceId;
int64_t cost = taosGetTimestampUs() - taosNtoh64(pHead->timestamp);
static int64_t EXCEPTION_LIMIT_US = 100 * 1000;
if (pConn->status == ConnNormal && pHead->noResp == 0) {
transRefSrvHandle(pConn);
if (cost >= EXCEPTION_LIMIT_US) {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception",
transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, (int)cost);
} else {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus", transLabel(pTransInst), pConn,
TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, (int)cost);
}
} else {
if (cost >= EXCEPTION_LIMIT_US) {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, recv exception",
transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp,
transMsg.code, (int)(cost));
} else {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus",
transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp,
transMsg.code, (int)(cost));
}
}
// pHead->noResp = 1, // pHead->noResp = 1,
// 1. server application should not send resp on handle // 1. server application should not send resp on handle
@ -423,21 +437,14 @@ static bool uvHandleReq(SSvrConn* pConn) {
// 3. not mixed with persist // 3. not mixed with persist
transMsg.info.ahandle = (void*)pHead->ahandle; transMsg.info.ahandle = (void*)pHead->ahandle;
transMsg.info.handle = (void*)transAcquireExHandle(transGetRefMgt(), pConn->refId); transMsg.info.handle = (void*)transAcquireExHandle(transGetRefMgt(), pConn->refId);
transMsg.info.refId = pConn->refId; ASSERTS(transMsg.info.handle != NULL, "trans-svr failed to alloc handle to msg");
transMsg.info.refId = pHead->noResp == 1 ? -1 : pConn->refId;
transMsg.info.traceId = pHead->traceId; transMsg.info.traceId = pHead->traceId;
transMsg.info.cliVer = htonl(pHead->compatibilityVer); transMsg.info.cliVer = htonl(pHead->compatibilityVer);
transMsg.info.forbiddenIp = forbiddenIp; transMsg.info.forbiddenIp = uvValidConn(pConn);
tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pTransInst), transMsg.info.handle, pConn, uvPerfLog(pConn, pHead, &transMsg);
pConn->refId);
ASSERTS(transMsg.info.handle != NULL, "trans-svr failed to alloc handle to msg");
if (transMsg.info.handle == NULL) {
return false;
}
if (pHead->noResp == 1) {
transMsg.info.refId = -1;
}
// set up conn info // set up conn info
SRpcConnInfo* pConnInfo = &(transMsg.info.conn); SRpcConnInfo* pConnInfo = &(transMsg.info.conn);