opt transport

This commit is contained in:
Yihao Deng 2024-05-29 09:20:30 +00:00
parent 2cdaa8c962
commit 9138ea4cec
5 changed files with 217 additions and 10 deletions

View File

@ -372,6 +372,7 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.connLimitLock = 1;
rpcInit.supportBatch = 1;
rpcInit.batchSize = 8 * 1024;
rpcInit.shareConn = 1;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));

View File

@ -50,7 +50,7 @@ static int32_t vnodeProcessArbCheckSyncReq(SVnode *pVnode, void *pReq, int32_t l
static int32_t vnodePreCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token, char *member1Token);
static int32_t vnodeCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token, char *member1Token);
static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode* pVnode, int64_t ver, void* pReq, int32_t len, SRpcMsg* pRsp);
static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
extern int32_t vnodeProcessKillCompactReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
extern int32_t vnodeQueryCompactProgress(SVnode *pVnode, SRpcMsg *pMsg);
@ -931,13 +931,13 @@ end:
return ret;
}
static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode* pVnode, int64_t ver, void* pReq, int32_t len, SRpcMsg* pRsp) {
static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
int32_t code = -1;
SMetaReader mr = {0};
SVDropTtlTableReq ttlReq = {0};
SVFetchTtlExpiredTbsRsp rsp = {0};
SEncoder encoder = {0};
SArray* pNames = NULL;
SArray *pNames = NULL;
pRsp->msgType = TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP;
pRsp->code = TSDB_CODE_SUCCESS;
pRsp->pCont = NULL;
@ -965,12 +965,12 @@ static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode* pVnode, int64_t ver, void*
}
char buf[TSDB_TABLE_NAME_LEN];
for (int32_t i = 0; i < ttlReq.nUids; ++i) {
tb_uid_t* uid = taosArrayGet(ttlReq.pTbUids, i);
tb_uid_t *uid = taosArrayGet(ttlReq.pTbUids, i);
expiredTb.suid = *uid;
terrno = metaReaderGetTableEntryByUid(&mr, *uid);
if (terrno < 0) goto _end;
strncpy(buf, mr.me.name, TSDB_TABLE_NAME_LEN);
void* p = taosArrayPush(pNames, buf);
void *p = taosArrayPush(pNames, buf);
expiredTb.name = p;
if (mr.me.type == TSDB_CHILD_TABLE) {
expiredTb.suid = mr.me.ctbEntry.suid;

View File

@ -75,6 +75,7 @@ typedef struct {
void* parent;
void* tcphandle; // returned handle from TCP initialization
int64_t refId;
int8_t shareConn;
TdThreadMutex mutex;
} SRpcInfo;

View File

@ -112,6 +112,8 @@ void* rpcOpen(const SRpcInit* pInit) {
int64_t refId = transAddExHandle(transGetInstMgt(), pRpc);
transAcquireExHandle(transGetInstMgt(), refId);
pRpc->refId = refId;
pRpc->shareConn = pInit->shareConn;
return (void*)refId;
}
void rpcClose(void* arg) {

View File

@ -76,6 +76,9 @@ typedef struct SCliConn {
SDelayTask* task;
HeapNode node; // for heap
int8_t inHeap;
char* dstAddr;
char src[32];
char dst[32];
@ -119,6 +122,7 @@ typedef struct SCliThrd {
SHashObj* failFastCache;
SHashObj* batchCache;
SHashObj* connHeapCache;
SCliMsg* stopMsg;
bool quit;
@ -230,6 +234,23 @@ static void destroyThrdObj(SCliThrd* pThrd);
static void cliWalkCb(uv_handle_t* handle, void* arg);
typedef struct {
void* p;
HeapNode node;
} SHeapNode;
typedef struct {
// void* p;
Heap* heap;
int32_t (*cmpFunc)(const HeapNode* a, const HeapNode* b);
} SHeap;
int32_t compareHeapNode(const HeapNode* a, const HeapNode* b);
int transHeapCreate(SHeap* heap, int32_t (*cmpFunc)(const HeapNode* a, const HeapNode* b));
void transHeapDestroy(SHeap* heap);
void transHeapGet(SHeap* heap, SCliConn** p);
int transHeapInsert(SHeap* heap, SCliConn* p);
int transHeapDelete(SHeap* heap, SCliConn* p);
#define CLI_RELEASE_UV(loop) \
do { \
uv_walk(loop, cliWalkCb, NULL); \
@ -1054,6 +1075,66 @@ static void cliSendCb(uv_write_t* req, int status) {
}
uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb);
}
void cliSendBatch_shareConn(SCliConn* pConn) {
SCliThrd* pThrd = pConn->hostThrd;
STrans* pTransInst = pThrd->pTransInst;
int32_t size = transQueueSize(&pConn->cliMsgs);
int32_t totalLen = 0;
if (size == 0) {
tError("%s conn %p not msg to send", pTransInst->label, pConn);
cliHandleExcept(pConn);
return;
}
uv_buf_t* wb = taosMemoryCalloc(size, sizeof(uv_buf_t));
for (int i = 0; i < size; i++) {
SCliMsg* pCliMsg = transQueueGet(&pConn->cliMsgs, i);
STransConnCtx* pCtx = pCliMsg->ctx;
STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg);
if (pMsg->pCont == 0) {
pMsg->pCont = (void*)rpcMallocCont(0);
pMsg->contLen = 0;
}
int msgLen = transMsgLenFromCont(pMsg->contLen);
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
if (pHead->comp == 0) {
pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0;
pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0;
pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0;
pHead->msgType = pMsg->msgType;
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
pHead->traceId = pMsg->info.traceId;
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
pHead->version = TRANS_VER;
pHead->compatibilityVer = htonl(pTransInst->compatibilityVer);
}
pHead->timestamp = taosHton64(taosGetTimestampUs());
if (pHead->comp == 0) {
if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) {
msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
}
} else {
msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen));
}
wb[i++] = uv_buf_init((char*)pHead, msgLen);
totalLen += msgLen;
}
uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
req->data = pConn;
tDebug("%s conn %p start to send batch msg, batch size:%d, len:%d", CONN_GET_INST_LABEL(pConn), pConn, size,
totalLen);
uv_write(req, (uv_stream_t*)pConn->stream, wb, size, cliSendBatchCb);
taosMemoryFree(wb);
}
void cliSendBatch(SCliConn* pConn) {
SCliThrd* pThrd = pConn->hostThrd;
STrans* pTransInst = pThrd->pTransInst;
@ -1399,10 +1480,13 @@ void cliConnCb(uv_connect_t* req, int status) {
tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
if (pConn->pBatch != NULL) {
cliSendBatch(pConn);
} else {
cliSend(pConn);
return cliSendBatch(pConn);
}
if (pConn->inHeap) {
return cliSendBatch_shareConn(pConn);
}
return cliSend(pConn);
}
static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd, int32_t code) {
@ -1596,9 +1680,68 @@ static void doFreeTimeoutMsg(void* param) {
doNotifyApp(pMsg, pThrd, code);
taosMemoryFree(arg);
}
static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) {
size_t klen = strlen(key);
SHeap* p = taosHashGet(pConnHeapCache, key, klen);
if (p == NULL) {
SHeap heap = {0};
transHeapCreate(&heap, compareHeapNode);
taosHashPut(pConnHeapCache, key, klen, &heap, sizeof(heap));
p = taosHashGet(pConnHeapCache, key, strlen(key));
return NULL;
}
SCliConn* pConn = NULL;
transHeapGet(p, &pConn);
return pConn;
}
static void addConnToHeapCache(SHashObj* pConnHeapCacahe, char* key, SCliConn* pConn) {
size_t klen = strlen(key);
SHeap* p = taosHashGet(pConnHeapCacahe, key, klen);
if (p == NULL) {
SHeap heap = {0};
transHeapCreate(&heap, compareHeapNode);
taosHashPut(pConnHeapCacahe, key, klen, &heap, sizeof(heap));
p = taosHashGet(pConnHeapCacahe, key, klen);
}
transHeapInsert(p, pConn);
}
void cliHandleReq__shareConn(SCliMsg* pMsg, SCliThrd* pThrd) {
STraceId* trace = &pMsg->msg.info.traceId;
STrans* pTransInst = pThrd->pTransInst;
cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr);
if (!EPSET_IS_VALID(&pMsg->ctx->epSet)) {
destroyCmsg(pMsg);
return;
}
char addr[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(addr, EPSET_GET_INUSE_IP(&pMsg->ctx->epSet), EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet));
SCliConn* pConn = getConnFromHeapCache(pThrd->connHeapCache, addr);
if (pConn == NULL) {
tGTrace("%s conn %p get from heap cache", CONN_GET_INST_LABEL(pConn), pConn);
bool ignore = false;
pConn = getConnFromPool(pThrd, addr, &ignore);
if (pConn != NULL) {
return cliSendBatch_shareConn(pConn);
}
}
pConn = cliCreateConn(pThrd);
addConnToHeapCache(pThrd->connHeapCache, addr, pConn);
return cliDoConn(pThrd, pConn, EPSET_GET_INUSE_IP(&pMsg->ctx->epSet), EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet));
}
void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
STrans* pTransInst = pThrd->pTransInst;
if (pTransInst->shareConn == 1) {
return cliHandleReq__shareConn(pMsg, pThrd);
}
cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr);
if (!EPSET_IS_VALID(&pMsg->ctx->epSet)) {
destroyCmsg(pMsg);
@ -2083,6 +2226,7 @@ static SCliThrd* createThrdObj(void* trans) {
pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pThrd->connHeapCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pThrd->quit = false;
@ -2131,6 +2275,15 @@ static void destroyThrdObj(SCliThrd* pThrd) {
pIter = (void**)taosHashIterate(pThrd->batchCache, pIter);
}
taosHashCleanup(pThrd->batchCache);
void** pIter2 = taosHashIterate(pThrd->connHeapCache, NULL);
while (pIter2 != NULL) {
SHeap* heap = (SHeap*)(*pIter2);
transHeapDestroy(heap);
pIter2 = (void**)taosHashIterate(pThrd->connHeapCache, pIter2);
}
taosHashCleanup(pThrd->connHeapCache);
taosMemoryFree(pThrd);
}
@ -2800,3 +2953,53 @@ int64_t transAllocHandle() {
return exh->refId;
}
int32_t compareHeapNode(const HeapNode* a, const HeapNode* b) {
SCliConn* args1 = container_of(a, SCliConn, node);
SCliConn* args2 = container_of(b, SCliConn, node);
if (transQueueSize(&args1->cliMsgs) > transQueueSize(&args2->cliMsgs)) {
return 0;
}
return 1;
}
int transHeapCreate(SHeap* heap, int32_t (*cmpFunc)(const HeapNode* a, const HeapNode* b)) {
heap->heap = heapCreate(cmpFunc);
heap->cmpFunc = cmpFunc;
return 0;
}
void transHeapDestroy(SHeap* heap) {
if (heap != NULL) {
heapDestroy(heap->heap);
}
}
void transHeapGet(SHeap* heap, SCliConn** p) {
if (heapSize(heap->heap) == 0) {
*p = NULL;
return;
}
// HeapNode* minNode = headMin(heap->heap);
HeapNode* minNode = heapMin(heap->heap);
if (minNode == NULL) {
*p = NULL;
return;
}
*p = container_of(minNode, SCliConn, node);
}
int transHeapInsert(SHeap* heap, SCliConn* p) {
// impl later
if (p->inHeap == 1) {
return -1;
}
heapInsert(heap->heap, &p->node);
p->inHeap = 1;
return 0;
}
int transHeapDelete(SHeap* heap, SCliConn* p) {
// impl later
if (p->inHeap == 0) {
return -1;
}
heapRemove(heap->heap, &p->node);
return 0;
}