opt transport

This commit is contained in:
yihaoDeng 2024-09-06 21:53:52 +08:00
parent 3940997194
commit f797c8ed2e
3 changed files with 97 additions and 144 deletions

View File

@ -63,7 +63,9 @@ typedef struct SRpcHandleInfo {
int8_t forbiddenIp;
int8_t notFreeAhandle;
int8_t compressed;
int32_t seqNum;
int32_t seqNum; // msg seq
int64_t qId; // queryId Get from client, other req's qId = -1;
int32_t refIdMgt;
} SRpcHandleInfo;
typedef struct SRpcMsg {

View File

@ -183,6 +183,7 @@ typedef struct {
uint32_t magicNum;
STraceId traceId;
uint64_t ahandle; // ahandle assigned by client
int64_t qid;
uint32_t code; // del later
uint32_t msgType;
int32_t msgLen;
@ -272,10 +273,10 @@ bool transAsyncPoolIsEmpty(SAsyncPool* pool);
} \
} while (0)
#define ASYNC_CHECK_HANDLE(exh1, id) \
#define ASYNC_CHECK_HANDLE(idMgt, id, exh1) \
do { \
if (id > 0) { \
SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), id); \
SExHandle* exh2 = transAcquireExHandle(idMgt, id); \
if (exh2 == NULL || id != exh2->refId) { \
tTrace("handle %p except, may already freed, ignore msg, ref1:%" PRIu64 ", ref2:%" PRIu64, exh1, \
exh2 ? exh2->refId : 0, id); \

View File

@ -58,6 +58,9 @@ typedef struct SSvrConn {
char ckey[TSDB_PASSWORD_LEN]; // ciphering key
int64_t whiteListVer;
// state req dict
SHashObj* pQTable;
} SSvrConn;
typedef struct SSvrMsg {
@ -98,6 +101,8 @@ typedef struct SWorkThrd {
SIpWhiteListTab* pWhiteList;
int64_t whiteListVer;
int8_t enableIpWhiteList;
int32_t connRefMgt;
} SWorkThrd;
typedef struct SServerObj {
@ -166,6 +171,8 @@ static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn);
static int32_t reallocConnRef(SSvrConn* conn);
int32_t uvGetConnRefOfThrd(SWorkThrd* thrd) { return thrd ? thrd->connRefMgt : -1; }
static void uvHandleQuit(SSvrMsg* msg, SWorkThrd* thrd);
static void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd);
static void uvHandleResp(SSvrMsg* msg, SWorkThrd* thrd);
@ -447,6 +454,7 @@ static bool uvHandleReq(SSvrConn* pConn) {
tError("%s conn %p recv invalid packet, failed to decompress", transLabel(pInst), pConn);
return false;
}
pHead->ahandle = htole64(pHead->ahandle);
pHead->code = htonl(pHead->code);
pHead->msgLen = htonl(pHead->msgLen);
@ -470,18 +478,29 @@ static bool uvHandleReq(SSvrConn* pConn) {
transMsg.pCont = pHead->content;
transMsg.msgType = pHead->msgType;
transMsg.code = pHead->code;
transMsg.info.qId = htole64(pHead->qid);
if (transMsg.info.qId > 0) {
int32_t code = taosHashPut(pConn->pQTable, &transMsg.info.qId, sizeof(int64_t), &transMsg, sizeof(STransMsg));
if (code != 0) {
tError("%s conn %p failed to put msg to req dict, since %s", transLabel(pInst), pConn, tstrerror(code));
return false;
}
}
if (pHead->seqNum == 0) {
ASSERT(0);
}
transMsg.info.handle = (void*)transAcquireExHandle(uvGetConnRefOfThrd(pThrd), pConn->refId);
transMsg.info.refIdMgt = pThrd->connRefMgt;
ASSERTS(transMsg.info.handle != NULL, "trans-svr failed to alloc handle to msg");
// pHead->noResp = 1,
// 1. server application should not send resp on handle
// 2. once send out data, cli conn released to conn pool immediately
// 3. not mixed with persist
transMsg.info.ahandle = (void*)pHead->ahandle;
transMsg.info.handle = (void*)transAcquireExHandle(transGetRefMgt(), pConn->refId);
ASSERTS(transMsg.info.handle != NULL, "trans-svr failed to alloc handle to msg");
transMsg.info.refId = pHead->noResp == 1 ? -1 : pConn->refId;
transMsg.info.traceId = pHead->traceId;
transMsg.info.cliVer = htonl(pHead->compatibilityVer);
@ -489,7 +508,7 @@ static bool uvHandleReq(SSvrConn* pConn) {
transMsg.info.noResp = pHead->noResp == 1 ? 1 : 0;
transMsg.info.seqNum = htonl(pHead->seqNum);
uvMaySetConnAcquired(pConn, pHead);
// uvMaySetConnAcquired(pConn, pHead);
uvPerfLog_receive(pConn, pHead, &transMsg);
@ -499,7 +518,7 @@ static bool uvHandleReq(SSvrConn* pConn) {
pConnInfo->clientPort = pConn->port;
tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user));
(void)transReleaseExHandle(transGetRefMgt(), pConn->refId);
(void)transReleaseExHandle(uvGetConnRefOfThrd(pThrd), pConn->refId);
(*pInst->cfp)(pInst->parent, &transMsg, NULL);
return true;
@ -647,7 +666,7 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
pMsg->contLen = 0;
}
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
pHead->ahandle = (uint64_t)pMsg->info.ahandle;
// pHead->ahandle = (uint64_t)pMsg->info.ahandle;
pHead->traceId = pMsg->info.traceId;
pHead->hasEpSet = pMsg->info.hasEpSet;
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
@ -798,15 +817,15 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
int64_t refId = transMsg.info.refId;
msg->seqNum = transMsg.info.seqNum;
SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), refId);
SExHandle* exh2 = transAcquireExHandle(uvGetConnRefOfThrd(pThrd), refId);
if (exh2 == NULL || exh1 != exh2) {
tTrace("handle except msg %p, ignore it", exh1);
(void)transReleaseExHandle(transGetRefMgt(), refId);
(void)transReleaseExHandle(uvGetConnRefOfThrd(pThrd), refId);
destroySmsg(msg);
continue;
}
msg->pConn = exh1->handle;
(void)transReleaseExHandle(transGetRefMgt(), refId);
(void)transReleaseExHandle(uvGetConnRefOfThrd(pThrd), refId);
(*transAsyncHandle[msg->type])(msg, pThrd);
}
}
@ -836,12 +855,6 @@ static void uvShutDownCb(uv_shutdown_t* req, int status) {
}
static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) {
if ((pHead)->release == 1 && (pHead->msgLen) == sizeof(*pHead)) {
// int32_t code = reallocConnRef(pConn);
// if (code != 0) {
// destroyConn(pConn, true);
// return true;
// }
tTrace("conn %p received release request", pConn);
STraceId traceId = pHead->traceId;
(void)transClearBuffer(&pConn->readBuf);
@ -874,53 +887,6 @@ static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) {
}
return false;
}
static void uvPrepareCb(uv_prepare_t* handle) {
// prepare callback
SWorkThrd* pThrd = handle->data;
SAsyncPool* pool = pThrd->asyncPool;
for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]);
SAsyncItem* item = async->data;
queue wq;
(void)taosThreadMutexLock(&item->mtx);
QUEUE_MOVE(&item->qmsg, &wq);
(void)taosThreadMutexUnlock(&item->mtx);
while (!QUEUE_IS_EMPTY(&wq)) {
queue* head = QUEUE_HEAD(&wq);
QUEUE_REMOVE(head);
SSvrMsg* msg = QUEUE_DATA(head, SSvrMsg, q);
if (msg == NULL) {
tError("unexcept occurred, continue");
continue;
}
// release handle to rpc init
if (msg->type == Quit || msg->type == Update) {
(*transAsyncHandle[msg->type])(msg, pThrd);
continue;
} else {
STransMsg transMsg = msg->msg;
SExHandle* exh1 = transMsg.info.handle;
int64_t refId = transMsg.info.refId;
SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), refId);
if (exh2 == NULL || exh1 != exh2) {
tTrace("handle except msg %p, ignore it", exh1);
(void)transReleaseExHandle(transGetRefMgt(), refId);
destroySmsg(msg);
continue;
}
msg->pConn = exh1->handle;
(void)transReleaseExHandle(transGetRefMgt(), refId);
(*transAsyncHandle[msg->type])(msg, pThrd);
}
}
}
}
static void uvWorkDoTask(uv_work_t* req) {
// doing time-consumeing task
// only auth conn currently, add more func later
@ -1011,7 +977,6 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
}
if (pThrd->quit) {
tWarn("thread already received quit msg, ignore incoming conn");
// uv_close((uv_handle_t*)q, NULL);
return;
}
@ -1022,16 +987,6 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
return;
}
// pConn->pInst = pThrd->pInst;
// /* init conn timer*/
// // uv_timer_init(pThrd->loop, &pConn->pTimer);
// // pConn->pTimer.data = pConn;
// pConn->hostThrd = pThrd;
// // init client handle
// pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
// uv_tcp_init(pThrd->loop, pConn->pTcp);
// pConn->pTcp->data = pConn;
if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
uv_os_fd_t fd;
(void)uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
@ -1122,25 +1077,6 @@ static int32_t addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
QUEUE_INIT(&pThrd->msg);
pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t));
if (pThrd->prepare == NULL) {
tError("failed to init prepare");
return TSDB_CODE_OUT_OF_MEMORY;
}
code = uv_prepare_init(pThrd->loop, pThrd->prepare);
if (code != 0) {
tError("failed to init prepare since %s", uv_err_name(code));
return TSDB_CODE_THIRDPARTY_ERROR;
}
code = uv_prepare_start(pThrd->prepare, uvPrepareCb);
if (code != 0) {
tError("failed to start prepare since %s", uv_err_name(code));
return TSDB_CODE_THIRDPARTY_ERROR;
}
pThrd->prepare->data = pThrd;
// conn set
QUEUE_INIT(&pThrd->conn);
@ -1244,14 +1180,14 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
exh->handle = pConn;
exh->pThrd = pThrd;
exh->refId = transAddExHandle(transGetRefMgt(), exh);
exh->refId = transAddExHandle(uvGetConnRefOfThrd(pThrd), exh);
if (exh->refId < 0) {
TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, &lino, _end);
}
QUEUE_INIT(&exh->q);
SExHandle* pSelf = transAcquireExHandle(transGetRefMgt(), exh->refId);
SExHandle* pSelf = transAcquireExHandle(uvGetConnRefOfThrd(pThrd), exh->refId);
if (pSelf != exh) {
TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _end);
}
@ -1263,11 +1199,11 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
transRefSrvHandle(pConn);
tTrace("%s handle %p, conn %p created, refId:%" PRId64, transLabel(pInst), exh, pConn, pConn->refId);
pConn->pInst = pThrd->pInst;
/* init conn timer*/
// uv_timer_init(pThrd->loop, &pConn->pTimer);
// pConn->pTimer.data = pConn;
pConn->hostThrd = pThrd;
pConn->pQTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (pConn->pQTable == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _end);
}
// init client handle
pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
if (pConn->pTcp == NULL) {
@ -1282,11 +1218,15 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
pConn->pTcp->data = pConn;
QUEUE_PUSH(&pThrd->conn, &pConn->queue);
pConn->pInst = pThrd->pInst;
pConn->hostThrd = pThrd;
return pConn;
_end:
if (pConn) {
transQueueDestroy(&pConn->srvMsgs);
(void)transDestroyBuffer(&pConn->readBuf);
taosHashCleanup(pConn->pQTable);
taosMemoryFree(pConn->pTcp);
taosMemoryFree(pConn);
pConn = NULL;
@ -1315,8 +1255,8 @@ static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn) {
}
static int32_t reallocConnRef(SSvrConn* conn) {
if (conn->refId > 0) {
(void)transReleaseExHandle(transGetRefMgt(), conn->refId);
(void)transRemoveExHandle(transGetRefMgt(), conn->refId);
(void)transReleaseExHandle(uvGetConnRefOfThrd(conn->hostThrd), conn->refId);
(void)transRemoveExHandle(uvGetConnRefOfThrd(conn->hostThrd), conn->refId);
}
// avoid app continue to send msg on invalid handle
SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
@ -1326,14 +1266,14 @@ static int32_t reallocConnRef(SSvrConn* conn) {
exh->handle = conn;
exh->pThrd = conn->hostThrd;
exh->refId = transAddExHandle(transGetRefMgt(), exh);
exh->refId = transAddExHandle(uvGetConnRefOfThrd(conn->hostThrd), exh);
if (exh->refId < 0) {
taosMemoryFree(exh);
return TSDB_CODE_REF_INVALID_ID;
}
QUEUE_INIT(&exh->q);
SExHandle* pSelf = transAcquireExHandle(transGetRefMgt(), exh->refId);
SExHandle* pSelf = transAcquireExHandle(uvGetConnRefOfThrd(conn->hostThrd), exh->refId);
if (pSelf != exh) {
tError("conn %p failed to acquire handle", conn);
taosMemoryFree(exh);
@ -1352,8 +1292,8 @@ static void uvDestroyConn(uv_handle_t* handle) {
}
SWorkThrd* thrd = conn->hostThrd;
(void)transReleaseExHandle(transGetRefMgt(), conn->refId);
(void)transRemoveExHandle(transGetRefMgt(), conn->refId);
(void)transReleaseExHandle(uvGetConnRefOfThrd(thrd), conn->refId);
(void)transRemoveExHandle(uvGetConnRefOfThrd(thrd), conn->refId);
STrans* pInst = thrd->pInst;
tDebug("%s conn %p destroy", transLabel(pInst), conn);
@ -1366,6 +1306,8 @@ static void uvDestroyConn(uv_handle_t* handle) {
transReqQueueClear(&conn->wreqQueue);
QUEUE_REMOVE(&conn->queue);
taosHashCleanup(conn->pQTable);
taosMemoryFree(conn->pTcp);
destroyConnRegArg(conn);
(void)transDestroyBuffer(&conn->readBuf);
@ -1512,6 +1454,12 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
goto End;
}
thrd->connRefMgt = transOpenRefMgt(50000, transDestroyExHandle);
if (thrd->connRefMgt < 0) {
code = thrd->connRefMgt;
goto End;
}
srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
if (srv->pipe[i] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
@ -1603,6 +1551,7 @@ void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd) {
} else if (conn->status == ConnRelease || conn->status == ConnNormal) {
tDebug("%s conn %p already released, ignore release-msg", transLabel(thrd->pInst), conn);
}
destroySmsg(msg);
}
void uvHandleResp(SSvrMsg* msg, SWorkThrd* thrd) {
@ -1610,32 +1559,30 @@ void uvHandleResp(SSvrMsg* msg, SWorkThrd* thrd) {
tDebug("%s conn %p start to send resp (2/2)", transLabel(thrd->pInst), msg->pConn);
uvStartSendResp(msg);
}
int32_t uvHandleStateReq(SSvrMsg* msg) {
int32_t code = 0;
SSvrConn* conn = msg->pConn;
tDebug("%s conn %p start to register brokenlink callback, qid:%" PRId64 "", transLabel(conn->pInst), conn,
msg->msg.info.qId);
SSvrRegArg arg = {.notifyCount = 0, .init = 1, .msg = msg->msg};
SSvrRegArg* p = taosHashGet(conn->pQTable, &msg->msg.info.qId, sizeof(msg->msg.info.qId));
if (p != NULL) {
transFreeMsg(p->msg.pCont);
}
code = taosHashPut(conn->pQTable, &msg->msg.info.qId, sizeof(msg->msg.info.qId), &arg, sizeof(arg));
if (code == 0) tDebug("conn %p register brokenlink callback succ", conn);
return code;
}
void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd) {
SSvrConn* conn = msg->pConn;
tDebug("%s conn %p register brokenlink callback", transLabel(thrd->pInst), conn);
if (conn->status == ConnAcquire) {
if (!transQueuePush(&conn->srvMsgs, msg)) {
return;
}
(void)transQueuePop(&conn->srvMsgs);
if (conn->regArg.init) {
transFreeMsg(conn->regArg.msg.pCont);
conn->regArg.init = 0;
}
conn->regArg.notifyCount = 0;
conn->regArg.init = 1;
conn->regArg.msg = msg->msg;
tDebug("conn %p register brokenlink callback succ", conn);
if (conn->broken) {
STrans* pInst = conn->pInst;
(*pInst->cfp)(pInst->parent, &(conn->regArg.msg), NULL);
memset(&conn->regArg, 0, sizeof(conn->regArg));
}
int32_t code = uvHandleStateReq(msg);
taosMemoryFree(msg);
}
}
void uvHandleUpdate(SSvrMsg* msg, SWorkThrd* thrd) {
SUpdateIpWhite* req = msg->arg;
if (req == NULL) {
@ -1752,7 +1699,7 @@ int32_t transReleaseSrvHandle(void* handle) {
SExHandle* exh = info->handle;
int64_t refId = info->refId;
ASYNC_CHECK_HANDLE(exh, refId);
ASYNC_CHECK_HANDLE(info->refIdMgt, refId, exh);
SWorkThrd* pThrd = exh->pThrd;
ASYNC_ERR_JRET(pThrd);
@ -1771,15 +1718,15 @@ int32_t transReleaseSrvHandle(void* handle) {
tDebug("%s conn %p start to release", transLabel(pThrd->pInst), exh->handle);
if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
destroySmsg(m);
(void)transReleaseExHandle(transGetRefMgt(), refId);
(void)transReleaseExHandle(info->refIdMgt, refId);
return code;
}
(void)transReleaseExHandle(transGetRefMgt(), refId);
(void)transReleaseExHandle(info->refIdMgt, refId);
return 0;
_return1:
tDebug("handle %p failed to send to release handle", exh);
(void)transReleaseExHandle(transGetRefMgt(), refId);
(void)transReleaseExHandle(info->refIdMgt, refId);
return code;
_return2:
tDebug("handle %p failed to send to release handle", exh);
@ -1801,7 +1748,7 @@ int32_t transSendResponse(const STransMsg* msg) {
return 0;
}
int64_t refId = msg->info.refId;
ASYNC_CHECK_HANDLE(exh, refId);
ASYNC_CHECK_HANDLE(msg->info.refIdMgt, refId, exh);
STransMsg tmsg = *msg;
tmsg.info.refId = refId;
@ -1822,17 +1769,17 @@ int32_t transSendResponse(const STransMsg* msg) {
tGDebug("conn %p start to send resp (1/2)", exh->handle);
if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
destroySmsg(m);
(void)transReleaseExHandle(transGetRefMgt(), refId);
(void)transReleaseExHandle(msg->info.refIdMgt, refId);
return code;
}
(void)transReleaseExHandle(transGetRefMgt(), refId);
(void)transReleaseExHandle(msg->info.refIdMgt, refId);
return 0;
_return1:
tDebug("handle %p failed to send resp", exh);
rpcFreeCont(msg->pCont);
(void)transReleaseExHandle(transGetRefMgt(), refId);
(void)transReleaseExHandle(msg->info.refIdMgt, refId);
return code;
_return2:
tDebug("handle %p failed to send resp", exh);
@ -1844,12 +1791,15 @@ int32_t transRegisterMsg(const STransMsg* msg) {
SExHandle* exh = msg->info.handle;
int64_t refId = msg->info.refId;
ASYNC_CHECK_HANDLE(exh, refId);
ASYNC_CHECK_HANDLE(msg->info.refIdMgt, refId, exh);
STransMsg tmsg = *msg;
tmsg.info.noResp = 1;
tmsg.info.qId = msg->info.qId;
tmsg.info.seqNum = msg->info.seqNum;
tmsg.info.refId = refId;
tmsg.info.refIdMgt = msg->info.refIdMgt;
SWorkThrd* pThrd = exh->pThrd;
ASYNC_ERR_JRET(pThrd);
@ -1867,17 +1817,17 @@ int32_t transRegisterMsg(const STransMsg* msg) {
tDebug("%s conn %p start to register brokenlink callback", transLabel(pInst), exh->handle);
if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
destroySmsg(m);
(void)transReleaseExHandle(transGetRefMgt(), refId);
(void)transReleaseExHandle(msg->info.refIdMgt, refId);
return code;
}
(void)transReleaseExHandle(transGetRefMgt(), refId);
(void)transReleaseExHandle(msg->info.refIdMgt, refId);
return 0;
_return1:
tDebug("handle %p failed to register brokenlink", exh);
rpcFreeCont(msg->pCont);
(void)transReleaseExHandle(transGetRefMgt(), refId);
(void)transReleaseExHandle(msg->info.refIdMgt, refId);
return code;
_return2:
tDebug("handle %p failed to register brokenlink", exh);