From 9138ea4cec30269359a007f18c990183e2b99a3c Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 29 May 2024 09:20:30 +0000 Subject: [PATCH] opt transport --- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 1 + source/dnode/vnode/src/vnd/vnodeSvr.c | 14 +- source/libs/transport/inc/transportInt.h | 1 + source/libs/transport/src/trans.c | 2 + source/libs/transport/src/transCli.c | 209 +++++++++++++++++- 5 files changed, 217 insertions(+), 10 deletions(-) diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index a2355ddd22..f4e6d42e60 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -372,6 +372,7 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.connLimitLock = 1; rpcInit.supportBatch = 1; rpcInit.batchSize = 8 * 1024; + rpcInit.shareConn = 1; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 6d97c1cd79..6438601937 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -50,7 +50,7 @@ static int32_t vnodeProcessArbCheckSyncReq(SVnode *pVnode, void *pReq, int32_t l static int32_t vnodePreCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token, char *member1Token); static int32_t vnodeCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token, char *member1Token); -static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode* pVnode, int64_t ver, void* pReq, int32_t len, SRpcMsg* pRsp); +static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); extern int32_t vnodeProcessKillCompactReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); extern int32_t vnodeQueryCompactProgress(SVnode *pVnode, SRpcMsg *pMsg); @@ -931,13 +931,13 @@ end: return ret; } -static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode* pVnode, int64_t ver, void* pReq, int32_t len, SRpcMsg* pRsp) { +static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { int32_t code = -1; SMetaReader mr = {0}; SVDropTtlTableReq ttlReq = {0}; SVFetchTtlExpiredTbsRsp rsp = {0}; SEncoder encoder = {0}; - SArray* pNames = NULL; + SArray *pNames = NULL; pRsp->msgType = TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP; pRsp->code = TSDB_CODE_SUCCESS; pRsp->pCont = NULL; @@ -950,8 +950,8 @@ static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode* pVnode, int64_t ver, void* ASSERT(ttlReq.nUids == taosArrayGetSize(ttlReq.pTbUids)); - tb_uid_t suid; - char ctbName[TSDB_TABLE_NAME_LEN]; + tb_uid_t suid; + char ctbName[TSDB_TABLE_NAME_LEN]; SVDropTbReq expiredTb = {.igNotExists = true}; metaReaderDoInit(&mr, pVnode->pMeta, 0); rsp.vgId = TD_VID(pVnode); @@ -965,12 +965,12 @@ static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode* pVnode, int64_t ver, void* } char buf[TSDB_TABLE_NAME_LEN]; for (int32_t i = 0; i < ttlReq.nUids; ++i) { - tb_uid_t* uid = taosArrayGet(ttlReq.pTbUids, i); + tb_uid_t *uid = taosArrayGet(ttlReq.pTbUids, i); expiredTb.suid = *uid; terrno = metaReaderGetTableEntryByUid(&mr, *uid); if (terrno < 0) goto _end; strncpy(buf, mr.me.name, TSDB_TABLE_NAME_LEN); - void* p = taosArrayPush(pNames, buf); + void *p = taosArrayPush(pNames, buf); expiredTb.name = p; if (mr.me.type == TSDB_CHILD_TABLE) { expiredTb.suid = mr.me.ctbEntry.suid; diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 232210b53b..8e8672a357 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -75,6 +75,7 @@ typedef struct { void* parent; void* tcphandle; // returned handle from TCP initialization int64_t refId; + int8_t shareConn; TdThreadMutex mutex; } SRpcInfo; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 5ed2e00acd..4bfd0c0805 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -112,6 +112,8 @@ void* rpcOpen(const SRpcInit* pInit) { int64_t refId = transAddExHandle(transGetInstMgt(), pRpc); transAcquireExHandle(transGetInstMgt(), refId); pRpc->refId = refId; + + pRpc->shareConn = pInit->shareConn; return (void*)refId; } void rpcClose(void* arg) { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 5c1b970ceb..e59065ef26 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -76,6 +76,9 @@ typedef struct SCliConn { SDelayTask* task; + HeapNode node; // for heap + int8_t inHeap; + char* dstAddr; char src[32]; char dst[32]; @@ -119,6 +122,7 @@ typedef struct SCliThrd { SHashObj* failFastCache; SHashObj* batchCache; + SHashObj* connHeapCache; SCliMsg* stopMsg; bool quit; @@ -230,6 +234,23 @@ static void destroyThrdObj(SCliThrd* pThrd); static void cliWalkCb(uv_handle_t* handle, void* arg); +typedef struct { + void* p; + HeapNode node; +} SHeapNode; +typedef struct { + // void* p; + Heap* heap; + int32_t (*cmpFunc)(const HeapNode* a, const HeapNode* b); +} SHeap; + +int32_t compareHeapNode(const HeapNode* a, const HeapNode* b); +int transHeapCreate(SHeap* heap, int32_t (*cmpFunc)(const HeapNode* a, const HeapNode* b)); +void transHeapDestroy(SHeap* heap); +void transHeapGet(SHeap* heap, SCliConn** p); +int transHeapInsert(SHeap* heap, SCliConn* p); +int transHeapDelete(SHeap* heap, SCliConn* p); + #define CLI_RELEASE_UV(loop) \ do { \ uv_walk(loop, cliWalkCb, NULL); \ @@ -1054,6 +1075,66 @@ static void cliSendCb(uv_write_t* req, int status) { } uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb); } + +void cliSendBatch_shareConn(SCliConn* pConn) { + SCliThrd* pThrd = pConn->hostThrd; + STrans* pTransInst = pThrd->pTransInst; + int32_t size = transQueueSize(&pConn->cliMsgs); + int32_t totalLen = 0; + if (size == 0) { + tError("%s conn %p not msg to send", pTransInst->label, pConn); + cliHandleExcept(pConn); + return; + } + uv_buf_t* wb = taosMemoryCalloc(size, sizeof(uv_buf_t)); + + for (int i = 0; i < size; i++) { + SCliMsg* pCliMsg = transQueueGet(&pConn->cliMsgs, i); + + STransConnCtx* pCtx = pCliMsg->ctx; + + STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg); + if (pMsg->pCont == 0) { + pMsg->pCont = (void*)rpcMallocCont(0); + pMsg->contLen = 0; + } + + int msgLen = transMsgLenFromCont(pMsg->contLen); + STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); + + if (pHead->comp == 0) { + pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0; + pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0; + pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0; + pHead->msgType = pMsg->msgType; + pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); + pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; + memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user)); + pHead->traceId = pMsg->info.traceId; + pHead->magicNum = htonl(TRANS_MAGIC_NUM); + pHead->version = TRANS_VER; + pHead->compatibilityVer = htonl(pTransInst->compatibilityVer); + } + pHead->timestamp = taosHton64(taosGetTimestampUs()); + + if (pHead->comp == 0) { + if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) { + msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead); + pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); + } + } else { + msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen)); + } + wb[i++] = uv_buf_init((char*)pHead, msgLen); + totalLen += msgLen; + } + uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); + req->data = pConn; + tDebug("%s conn %p start to send batch msg, batch size:%d, len:%d", CONN_GET_INST_LABEL(pConn), pConn, size, + totalLen); + uv_write(req, (uv_stream_t*)pConn->stream, wb, size, cliSendBatchCb); + taosMemoryFree(wb); +} void cliSendBatch(SCliConn* pConn) { SCliThrd* pThrd = pConn->hostThrd; STrans* pTransInst = pThrd->pTransInst; @@ -1399,10 +1480,13 @@ void cliConnCb(uv_connect_t* req, int status) { tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn); if (pConn->pBatch != NULL) { - cliSendBatch(pConn); - } else { - cliSend(pConn); + return cliSendBatch(pConn); } + if (pConn->inHeap) { + return cliSendBatch_shareConn(pConn); + } + + return cliSend(pConn); } static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd, int32_t code) { @@ -1596,9 +1680,68 @@ static void doFreeTimeoutMsg(void* param) { doNotifyApp(pMsg, pThrd, code); taosMemoryFree(arg); } + +static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) { + size_t klen = strlen(key); + SHeap* p = taosHashGet(pConnHeapCache, key, klen); + if (p == NULL) { + SHeap heap = {0}; + transHeapCreate(&heap, compareHeapNode); + taosHashPut(pConnHeapCache, key, klen, &heap, sizeof(heap)); + + p = taosHashGet(pConnHeapCache, key, strlen(key)); + return NULL; + } + SCliConn* pConn = NULL; + transHeapGet(p, &pConn); + return pConn; +} +static void addConnToHeapCache(SHashObj* pConnHeapCacahe, char* key, SCliConn* pConn) { + size_t klen = strlen(key); + SHeap* p = taosHashGet(pConnHeapCacahe, key, klen); + if (p == NULL) { + SHeap heap = {0}; + transHeapCreate(&heap, compareHeapNode); + taosHashPut(pConnHeapCacahe, key, klen, &heap, sizeof(heap)); + p = taosHashGet(pConnHeapCacahe, key, klen); + } + transHeapInsert(p, pConn); +} +void cliHandleReq__shareConn(SCliMsg* pMsg, SCliThrd* pThrd) { + STraceId* trace = &pMsg->msg.info.traceId; + + STrans* pTransInst = pThrd->pTransInst; + + cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr); + if (!EPSET_IS_VALID(&pMsg->ctx->epSet)) { + destroyCmsg(pMsg); + return; + } + char addr[TSDB_FQDN_LEN + 64] = {0}; + CONN_CONSTRUCT_HASH_KEY(addr, EPSET_GET_INUSE_IP(&pMsg->ctx->epSet), EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet)); + + SCliConn* pConn = getConnFromHeapCache(pThrd->connHeapCache, addr); + if (pConn == NULL) { + tGTrace("%s conn %p get from heap cache", CONN_GET_INST_LABEL(pConn), pConn); + bool ignore = false; + pConn = getConnFromPool(pThrd, addr, &ignore); + if (pConn != NULL) { + return cliSendBatch_shareConn(pConn); + } + } + + pConn = cliCreateConn(pThrd); + addConnToHeapCache(pThrd->connHeapCache, addr, pConn); + + return cliDoConn(pThrd, pConn, EPSET_GET_INUSE_IP(&pMsg->ctx->epSet), EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet)); +} void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { STrans* pTransInst = pThrd->pTransInst; + if (pTransInst->shareConn == 1) { + return cliHandleReq__shareConn(pMsg, pThrd); + } + cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr); if (!EPSET_IS_VALID(&pMsg->ctx->epSet)) { destroyCmsg(pMsg); @@ -2083,6 +2226,7 @@ static SCliThrd* createThrdObj(void* trans) { pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + pThrd->connHeapCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pThrd->quit = false; @@ -2131,6 +2275,15 @@ static void destroyThrdObj(SCliThrd* pThrd) { pIter = (void**)taosHashIterate(pThrd->batchCache, pIter); } taosHashCleanup(pThrd->batchCache); + + void** pIter2 = taosHashIterate(pThrd->connHeapCache, NULL); + while (pIter2 != NULL) { + SHeap* heap = (SHeap*)(*pIter2); + transHeapDestroy(heap); + pIter2 = (void**)taosHashIterate(pThrd->connHeapCache, pIter2); + } + taosHashCleanup(pThrd->connHeapCache); + taosMemoryFree(pThrd); } @@ -2800,3 +2953,53 @@ int64_t transAllocHandle() { return exh->refId; } + +int32_t compareHeapNode(const HeapNode* a, const HeapNode* b) { + SCliConn* args1 = container_of(a, SCliConn, node); + SCliConn* args2 = container_of(b, SCliConn, node); + if (transQueueSize(&args1->cliMsgs) > transQueueSize(&args2->cliMsgs)) { + return 0; + } + return 1; +} +int transHeapCreate(SHeap* heap, int32_t (*cmpFunc)(const HeapNode* a, const HeapNode* b)) { + heap->heap = heapCreate(cmpFunc); + heap->cmpFunc = cmpFunc; + return 0; +} +void transHeapDestroy(SHeap* heap) { + if (heap != NULL) { + heapDestroy(heap->heap); + } +} +void transHeapGet(SHeap* heap, SCliConn** p) { + if (heapSize(heap->heap) == 0) { + *p = NULL; + return; + } + // HeapNode* minNode = headMin(heap->heap); + HeapNode* minNode = heapMin(heap->heap); + if (minNode == NULL) { + *p = NULL; + return; + } + *p = container_of(minNode, SCliConn, node); +} +int transHeapInsert(SHeap* heap, SCliConn* p) { + // impl later + if (p->inHeap == 1) { + return -1; + } + + heapInsert(heap->heap, &p->node); + p->inHeap = 1; + return 0; +} +int transHeapDelete(SHeap* heap, SCliConn* p) { + // impl later + if (p->inHeap == 0) { + return -1; + } + heapRemove(heap->heap, &p->node); + return 0; +} \ No newline at end of file