diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index cc9d789430..c1265d768c 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -63,7 +63,9 @@ typedef struct SRpcHandleInfo { int8_t forbiddenIp; int8_t notFreeAhandle; int8_t compressed; - int32_t seqNum; + int32_t seqNum; // msg seq + int64_t qId; // queryId Get from client, other req's qId = -1; + int32_t refIdMgt; } SRpcHandleInfo; typedef struct SRpcMsg { diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 9ab2d918b8..df7b4f8fcf 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -183,7 +183,8 @@ typedef struct { uint32_t magicNum; STraceId traceId; uint64_t ahandle; // ahandle assigned by client - uint32_t code; // del later + int64_t qid; + uint32_t code; // del later uint32_t msgType; int32_t msgLen; int32_t seqNum; @@ -272,10 +273,10 @@ bool transAsyncPoolIsEmpty(SAsyncPool* pool); } \ } while (0) -#define ASYNC_CHECK_HANDLE(exh1, id) \ +#define ASYNC_CHECK_HANDLE(idMgt, id, exh1) \ do { \ if (id > 0) { \ - SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), id); \ + SExHandle* exh2 = transAcquireExHandle(idMgt, id); \ if (exh2 == NULL || id != exh2->refId) { \ tTrace("handle %p except, may already freed, ignore msg, ref1:%" PRIu64 ", ref2:%" PRIu64, exh1, \ exh2 ? exh2->refId : 0, id); \ diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 27a92fb483..85d064b68f 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -58,6 +58,9 @@ typedef struct SSvrConn { char ckey[TSDB_PASSWORD_LEN]; // ciphering key int64_t whiteListVer; + + // state req dict + SHashObj* pQTable; } SSvrConn; typedef struct SSvrMsg { @@ -98,6 +101,8 @@ typedef struct SWorkThrd { SIpWhiteListTab* pWhiteList; int64_t whiteListVer; int8_t enableIpWhiteList; + + int32_t connRefMgt; } SWorkThrd; typedef struct SServerObj { @@ -166,6 +171,8 @@ static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn); static int32_t reallocConnRef(SSvrConn* conn); +int32_t uvGetConnRefOfThrd(SWorkThrd* thrd) { return thrd ? thrd->connRefMgt : -1; } + static void uvHandleQuit(SSvrMsg* msg, SWorkThrd* thrd); static void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd); static void uvHandleResp(SSvrMsg* msg, SWorkThrd* thrd); @@ -447,6 +454,7 @@ static bool uvHandleReq(SSvrConn* pConn) { tError("%s conn %p recv invalid packet, failed to decompress", transLabel(pInst), pConn); return false; } + pHead->ahandle = htole64(pHead->ahandle); pHead->code = htonl(pHead->code); pHead->msgLen = htonl(pHead->msgLen); @@ -470,18 +478,29 @@ static bool uvHandleReq(SSvrConn* pConn) { transMsg.pCont = pHead->content; transMsg.msgType = pHead->msgType; transMsg.code = pHead->code; + transMsg.info.qId = htole64(pHead->qid); + + if (transMsg.info.qId > 0) { + int32_t code = taosHashPut(pConn->pQTable, &transMsg.info.qId, sizeof(int64_t), &transMsg, sizeof(STransMsg)); + if (code != 0) { + tError("%s conn %p failed to put msg to req dict, since %s", transLabel(pInst), pConn, tstrerror(code)); + return false; + } + } if (pHead->seqNum == 0) { ASSERT(0); } + + transMsg.info.handle = (void*)transAcquireExHandle(uvGetConnRefOfThrd(pThrd), pConn->refId); + transMsg.info.refIdMgt = pThrd->connRefMgt; + + ASSERTS(transMsg.info.handle != NULL, "trans-svr failed to alloc handle to msg"); + // pHead->noResp = 1, // 1. server application should not send resp on handle // 2. once send out data, cli conn released to conn pool immediately // 3. not mixed with persist - transMsg.info.ahandle = (void*)pHead->ahandle; - transMsg.info.handle = (void*)transAcquireExHandle(transGetRefMgt(), pConn->refId); - ASSERTS(transMsg.info.handle != NULL, "trans-svr failed to alloc handle to msg"); - transMsg.info.refId = pHead->noResp == 1 ? -1 : pConn->refId; transMsg.info.traceId = pHead->traceId; transMsg.info.cliVer = htonl(pHead->compatibilityVer); @@ -489,7 +508,7 @@ static bool uvHandleReq(SSvrConn* pConn) { transMsg.info.noResp = pHead->noResp == 1 ? 1 : 0; transMsg.info.seqNum = htonl(pHead->seqNum); - uvMaySetConnAcquired(pConn, pHead); + // uvMaySetConnAcquired(pConn, pHead); uvPerfLog_receive(pConn, pHead, &transMsg); @@ -499,7 +518,7 @@ static bool uvHandleReq(SSvrConn* pConn) { pConnInfo->clientPort = pConn->port; tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user)); - (void)transReleaseExHandle(transGetRefMgt(), pConn->refId); + (void)transReleaseExHandle(uvGetConnRefOfThrd(pThrd), pConn->refId); (*pInst->cfp)(pInst->parent, &transMsg, NULL); return true; @@ -647,7 +666,7 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { pMsg->contLen = 0; } STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); - pHead->ahandle = (uint64_t)pMsg->info.ahandle; + // pHead->ahandle = (uint64_t)pMsg->info.ahandle; pHead->traceId = pMsg->info.traceId; pHead->hasEpSet = pMsg->info.hasEpSet; pHead->magicNum = htonl(TRANS_MAGIC_NUM); @@ -798,15 +817,15 @@ void uvWorkerAsyncCb(uv_async_t* handle) { int64_t refId = transMsg.info.refId; msg->seqNum = transMsg.info.seqNum; - SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), refId); + SExHandle* exh2 = transAcquireExHandle(uvGetConnRefOfThrd(pThrd), refId); if (exh2 == NULL || exh1 != exh2) { tTrace("handle except msg %p, ignore it", exh1); - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(uvGetConnRefOfThrd(pThrd), refId); destroySmsg(msg); continue; } msg->pConn = exh1->handle; - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(uvGetConnRefOfThrd(pThrd), refId); (*transAsyncHandle[msg->type])(msg, pThrd); } } @@ -836,12 +855,6 @@ static void uvShutDownCb(uv_shutdown_t* req, int status) { } static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) { if ((pHead)->release == 1 && (pHead->msgLen) == sizeof(*pHead)) { - // int32_t code = reallocConnRef(pConn); - - // if (code != 0) { - // destroyConn(pConn, true); - // return true; - // } tTrace("conn %p received release request", pConn); STraceId traceId = pHead->traceId; (void)transClearBuffer(&pConn->readBuf); @@ -874,53 +887,6 @@ static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) { } return false; } -static void uvPrepareCb(uv_prepare_t* handle) { - // prepare callback - SWorkThrd* pThrd = handle->data; - SAsyncPool* pool = pThrd->asyncPool; - - for (int i = 0; i < pool->nAsync; i++) { - uv_async_t* async = &(pool->asyncs[i]); - SAsyncItem* item = async->data; - - queue wq; - (void)taosThreadMutexLock(&item->mtx); - QUEUE_MOVE(&item->qmsg, &wq); - (void)taosThreadMutexUnlock(&item->mtx); - - while (!QUEUE_IS_EMPTY(&wq)) { - queue* head = QUEUE_HEAD(&wq); - QUEUE_REMOVE(head); - - SSvrMsg* msg = QUEUE_DATA(head, SSvrMsg, q); - if (msg == NULL) { - tError("unexcept occurred, continue"); - continue; - } - // release handle to rpc init - if (msg->type == Quit || msg->type == Update) { - (*transAsyncHandle[msg->type])(msg, pThrd); - continue; - } else { - STransMsg transMsg = msg->msg; - - SExHandle* exh1 = transMsg.info.handle; - int64_t refId = transMsg.info.refId; - SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), refId); - if (exh2 == NULL || exh1 != exh2) { - tTrace("handle except msg %p, ignore it", exh1); - (void)transReleaseExHandle(transGetRefMgt(), refId); - destroySmsg(msg); - continue; - } - msg->pConn = exh1->handle; - (void)transReleaseExHandle(transGetRefMgt(), refId); - (*transAsyncHandle[msg->type])(msg, pThrd); - } - } - } -} - static void uvWorkDoTask(uv_work_t* req) { // doing time-consumeing task // only auth conn currently, add more func later @@ -1011,7 +977,6 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { } if (pThrd->quit) { tWarn("thread already received quit msg, ignore incoming conn"); - // uv_close((uv_handle_t*)q, NULL); return; } @@ -1022,16 +987,6 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { return; } - // pConn->pInst = pThrd->pInst; - // /* init conn timer*/ - // // uv_timer_init(pThrd->loop, &pConn->pTimer); - // // pConn->pTimer.data = pConn; - // pConn->hostThrd = pThrd; - // // init client handle - // pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); - // uv_tcp_init(pThrd->loop, pConn->pTcp); - // pConn->pTcp->data = pConn; - if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) { uv_os_fd_t fd; (void)uv_fileno((const uv_handle_t*)pConn->pTcp, &fd); @@ -1122,25 +1077,6 @@ static int32_t addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { QUEUE_INIT(&pThrd->msg); - pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t)); - if (pThrd->prepare == NULL) { - tError("failed to init prepare"); - return TSDB_CODE_OUT_OF_MEMORY; - } - - code = uv_prepare_init(pThrd->loop, pThrd->prepare); - if (code != 0) { - tError("failed to init prepare since %s", uv_err_name(code)); - return TSDB_CODE_THIRDPARTY_ERROR; - } - - code = uv_prepare_start(pThrd->prepare, uvPrepareCb); - if (code != 0) { - tError("failed to start prepare since %s", uv_err_name(code)); - return TSDB_CODE_THIRDPARTY_ERROR; - } - pThrd->prepare->data = pThrd; - // conn set QUEUE_INIT(&pThrd->conn); @@ -1244,14 +1180,14 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) { exh->handle = pConn; exh->pThrd = pThrd; - exh->refId = transAddExHandle(transGetRefMgt(), exh); + exh->refId = transAddExHandle(uvGetConnRefOfThrd(pThrd), exh); if (exh->refId < 0) { TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, &lino, _end); } QUEUE_INIT(&exh->q); - SExHandle* pSelf = transAcquireExHandle(transGetRefMgt(), exh->refId); + SExHandle* pSelf = transAcquireExHandle(uvGetConnRefOfThrd(pThrd), exh->refId); if (pSelf != exh) { TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _end); } @@ -1263,11 +1199,11 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) { transRefSrvHandle(pConn); tTrace("%s handle %p, conn %p created, refId:%" PRId64, transLabel(pInst), exh, pConn, pConn->refId); - pConn->pInst = pThrd->pInst; - /* init conn timer*/ - // uv_timer_init(pThrd->loop, &pConn->pTimer); - // pConn->pTimer.data = pConn; - pConn->hostThrd = pThrd; + pConn->pQTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); + if (pConn->pQTable == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _end); + } + // init client handle pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); if (pConn->pTcp == NULL) { @@ -1282,11 +1218,15 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) { pConn->pTcp->data = pConn; QUEUE_PUSH(&pThrd->conn, &pConn->queue); + pConn->pInst = pThrd->pInst; + pConn->hostThrd = pThrd; + return pConn; _end: if (pConn) { transQueueDestroy(&pConn->srvMsgs); (void)transDestroyBuffer(&pConn->readBuf); + taosHashCleanup(pConn->pQTable); taosMemoryFree(pConn->pTcp); taosMemoryFree(pConn); pConn = NULL; @@ -1315,8 +1255,8 @@ static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn) { } static int32_t reallocConnRef(SSvrConn* conn) { if (conn->refId > 0) { - (void)transReleaseExHandle(transGetRefMgt(), conn->refId); - (void)transRemoveExHandle(transGetRefMgt(), conn->refId); + (void)transReleaseExHandle(uvGetConnRefOfThrd(conn->hostThrd), conn->refId); + (void)transRemoveExHandle(uvGetConnRefOfThrd(conn->hostThrd), conn->refId); } // avoid app continue to send msg on invalid handle SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); @@ -1326,14 +1266,14 @@ static int32_t reallocConnRef(SSvrConn* conn) { exh->handle = conn; exh->pThrd = conn->hostThrd; - exh->refId = transAddExHandle(transGetRefMgt(), exh); + exh->refId = transAddExHandle(uvGetConnRefOfThrd(conn->hostThrd), exh); if (exh->refId < 0) { taosMemoryFree(exh); return TSDB_CODE_REF_INVALID_ID; } QUEUE_INIT(&exh->q); - SExHandle* pSelf = transAcquireExHandle(transGetRefMgt(), exh->refId); + SExHandle* pSelf = transAcquireExHandle(uvGetConnRefOfThrd(conn->hostThrd), exh->refId); if (pSelf != exh) { tError("conn %p failed to acquire handle", conn); taosMemoryFree(exh); @@ -1352,8 +1292,8 @@ static void uvDestroyConn(uv_handle_t* handle) { } SWorkThrd* thrd = conn->hostThrd; - (void)transReleaseExHandle(transGetRefMgt(), conn->refId); - (void)transRemoveExHandle(transGetRefMgt(), conn->refId); + (void)transReleaseExHandle(uvGetConnRefOfThrd(thrd), conn->refId); + (void)transRemoveExHandle(uvGetConnRefOfThrd(thrd), conn->refId); STrans* pInst = thrd->pInst; tDebug("%s conn %p destroy", transLabel(pInst), conn); @@ -1366,6 +1306,8 @@ static void uvDestroyConn(uv_handle_t* handle) { transReqQueueClear(&conn->wreqQueue); QUEUE_REMOVE(&conn->queue); + + taosHashCleanup(conn->pQTable); taosMemoryFree(conn->pTcp); destroyConnRegArg(conn); (void)transDestroyBuffer(&conn->readBuf); @@ -1512,6 +1454,12 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, goto End; } + thrd->connRefMgt = transOpenRefMgt(50000, transDestroyExHandle); + if (thrd->connRefMgt < 0) { + code = thrd->connRefMgt; + goto End; + } + srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); if (srv->pipe[i] == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -1603,6 +1551,7 @@ void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd) { } else if (conn->status == ConnRelease || conn->status == ConnNormal) { tDebug("%s conn %p already released, ignore release-msg", transLabel(thrd->pInst), conn); } + destroySmsg(msg); } void uvHandleResp(SSvrMsg* msg, SWorkThrd* thrd) { @@ -1610,32 +1559,30 @@ void uvHandleResp(SSvrMsg* msg, SWorkThrd* thrd) { tDebug("%s conn %p start to send resp (2/2)", transLabel(thrd->pInst), msg->pConn); uvStartSendResp(msg); } + +int32_t uvHandleStateReq(SSvrMsg* msg) { + int32_t code = 0; + SSvrConn* conn = msg->pConn; + tDebug("%s conn %p start to register brokenlink callback, qid:%" PRId64 "", transLabel(conn->pInst), conn, + msg->msg.info.qId); + + SSvrRegArg arg = {.notifyCount = 0, .init = 1, .msg = msg->msg}; + SSvrRegArg* p = taosHashGet(conn->pQTable, &msg->msg.info.qId, sizeof(msg->msg.info.qId)); + if (p != NULL) { + transFreeMsg(p->msg.pCont); + } + + code = taosHashPut(conn->pQTable, &msg->msg.info.qId, sizeof(msg->msg.info.qId), &arg, sizeof(arg)); + if (code == 0) tDebug("conn %p register brokenlink callback succ", conn); + return code; +} void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd) { SSvrConn* conn = msg->pConn; tDebug("%s conn %p register brokenlink callback", transLabel(thrd->pInst), conn); - if (conn->status == ConnAcquire) { - if (!transQueuePush(&conn->srvMsgs, msg)) { - return; - } - (void)transQueuePop(&conn->srvMsgs); - - if (conn->regArg.init) { - transFreeMsg(conn->regArg.msg.pCont); - conn->regArg.init = 0; - } - conn->regArg.notifyCount = 0; - conn->regArg.init = 1; - conn->regArg.msg = msg->msg; - tDebug("conn %p register brokenlink callback succ", conn); - - if (conn->broken) { - STrans* pInst = conn->pInst; - (*pInst->cfp)(pInst->parent, &(conn->regArg.msg), NULL); - memset(&conn->regArg, 0, sizeof(conn->regArg)); - } - taosMemoryFree(msg); - } + int32_t code = uvHandleStateReq(msg); + taosMemoryFree(msg); } + void uvHandleUpdate(SSvrMsg* msg, SWorkThrd* thrd) { SUpdateIpWhite* req = msg->arg; if (req == NULL) { @@ -1752,7 +1699,7 @@ int32_t transReleaseSrvHandle(void* handle) { SExHandle* exh = info->handle; int64_t refId = info->refId; - ASYNC_CHECK_HANDLE(exh, refId); + ASYNC_CHECK_HANDLE(info->refIdMgt, refId, exh); SWorkThrd* pThrd = exh->pThrd; ASYNC_ERR_JRET(pThrd); @@ -1771,15 +1718,15 @@ int32_t transReleaseSrvHandle(void* handle) { tDebug("%s conn %p start to release", transLabel(pThrd->pInst), exh->handle); if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) { destroySmsg(m); - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(info->refIdMgt, refId); return code; } - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(info->refIdMgt, refId); return 0; _return1: tDebug("handle %p failed to send to release handle", exh); - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(info->refIdMgt, refId); return code; _return2: tDebug("handle %p failed to send to release handle", exh); @@ -1801,7 +1748,7 @@ int32_t transSendResponse(const STransMsg* msg) { return 0; } int64_t refId = msg->info.refId; - ASYNC_CHECK_HANDLE(exh, refId); + ASYNC_CHECK_HANDLE(msg->info.refIdMgt, refId, exh); STransMsg tmsg = *msg; tmsg.info.refId = refId; @@ -1822,17 +1769,17 @@ int32_t transSendResponse(const STransMsg* msg) { tGDebug("conn %p start to send resp (1/2)", exh->handle); if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) { destroySmsg(m); - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(msg->info.refIdMgt, refId); return code; } - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(msg->info.refIdMgt, refId); return 0; _return1: tDebug("handle %p failed to send resp", exh); rpcFreeCont(msg->pCont); - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(msg->info.refIdMgt, refId); return code; _return2: tDebug("handle %p failed to send resp", exh); @@ -1844,12 +1791,15 @@ int32_t transRegisterMsg(const STransMsg* msg) { SExHandle* exh = msg->info.handle; int64_t refId = msg->info.refId; - ASYNC_CHECK_HANDLE(exh, refId); + ASYNC_CHECK_HANDLE(msg->info.refIdMgt, refId, exh); STransMsg tmsg = *msg; tmsg.info.noResp = 1; + tmsg.info.qId = msg->info.qId; + tmsg.info.seqNum = msg->info.seqNum; tmsg.info.refId = refId; + tmsg.info.refIdMgt = msg->info.refIdMgt; SWorkThrd* pThrd = exh->pThrd; ASYNC_ERR_JRET(pThrd); @@ -1867,17 +1817,17 @@ int32_t transRegisterMsg(const STransMsg* msg) { tDebug("%s conn %p start to register brokenlink callback", transLabel(pInst), exh->handle); if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) { destroySmsg(m); - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(msg->info.refIdMgt, refId); return code; } - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(msg->info.refIdMgt, refId); return 0; _return1: tDebug("handle %p failed to register brokenlink", exh); rpcFreeCont(msg->pCont); - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(msg->info.refIdMgt, refId); return code; _return2: tDebug("handle %p failed to register brokenlink", exh);