refactor transport
This commit is contained in:
parent
02ff9967fb
commit
8c11a7e998
|
@ -212,8 +212,10 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd);
|
||||||
static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd);
|
static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd);
|
||||||
static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, NULL,
|
static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, NULL,
|
||||||
cliHandleUpdate};
|
cliHandleUpdate};
|
||||||
/// static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease,
|
|
||||||
/// NULL,cliHandleUpdate};
|
static void cliDealReq(queue* h, SCliThrd* pThrd);
|
||||||
|
static void cliBatchDealReq(queue* h, SCliThrd* pThrd);
|
||||||
|
static void (*cliDealFunc[])(queue* h, SCliThrd* pThrd) = {cliDealReq, cliBatchDealReq};
|
||||||
|
|
||||||
static FORCE_INLINE void destroyCmsg(void* cmsg);
|
static FORCE_INLINE void destroyCmsg(void* cmsg);
|
||||||
|
|
||||||
|
@ -1695,7 +1697,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||||
tGTrace("%s conn %p ready", pTransInst->label, conn);
|
tGTrace("%s conn %p ready", pTransInst->label, conn);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cliNoBatchDealReq(queue* wq, SCliThrd* pThrd) {
|
static void cliDealReq(queue* wq, SCliThrd* pThrd) {
|
||||||
int count = 0;
|
int count = 0;
|
||||||
|
|
||||||
while (!QUEUE_IS_EMPTY(wq)) {
|
while (!QUEUE_IS_EMPTY(wq)) {
|
||||||
|
@ -1709,7 +1711,6 @@ static void cliNoBatchDealReq(queue* wq, SCliThrd* pThrd) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
(*cliAsyncHandle[pMsg->type])(pMsg, pThrd);
|
(*cliAsyncHandle[pMsg->type])(pMsg, pThrd);
|
||||||
|
|
||||||
count++;
|
count++;
|
||||||
}
|
}
|
||||||
if (count >= 2) {
|
if (count >= 2) {
|
||||||
|
@ -1729,7 +1730,77 @@ SCliBatch* cliGetHeadFromList(SCliBatchList* pList) {
|
||||||
SCliBatch* batch = QUEUE_DATA(hr, SCliBatch, listq);
|
SCliBatch* batch = QUEUE_DATA(hr, SCliBatch, listq);
|
||||||
return batch;
|
return batch;
|
||||||
}
|
}
|
||||||
|
static void cliBuildBatch(SCliMsg* pMsg, queue* h, SCliThrd* pThrd) {
|
||||||
|
STrans* pInst = pThrd->pTransInst;
|
||||||
|
STransConnCtx* pCtx = pMsg->ctx;
|
||||||
|
|
||||||
|
char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet);
|
||||||
|
uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
|
||||||
|
char key[TSDB_FQDN_LEN + 64] = {0};
|
||||||
|
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
|
||||||
|
size_t klen = strlen(key);
|
||||||
|
SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, klen);
|
||||||
|
if (ppBatchList == NULL || *ppBatchList == NULL) {
|
||||||
|
SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList));
|
||||||
|
QUEUE_INIT(&pBatchList->wq);
|
||||||
|
pBatchList->connMax = pInst->connLimitNum;
|
||||||
|
pBatchList->connCnt = 0;
|
||||||
|
pBatchList->batchLenLimit = pInst->batchSize;
|
||||||
|
pBatchList->len += 1;
|
||||||
|
|
||||||
|
pBatchList->ip = taosStrdup(ip);
|
||||||
|
pBatchList->dst = taosStrdup(key);
|
||||||
|
pBatchList->port = port;
|
||||||
|
|
||||||
|
SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
|
||||||
|
QUEUE_INIT(&pBatch->wq);
|
||||||
|
QUEUE_INIT(&pBatch->listq);
|
||||||
|
|
||||||
|
QUEUE_PUSH(&pBatch->wq, h);
|
||||||
|
pBatch->wLen += 1;
|
||||||
|
pBatch->batchSize += pMsg->msg.contLen;
|
||||||
|
pBatch->pList = pBatchList;
|
||||||
|
|
||||||
|
QUEUE_PUSH(&pBatchList->wq, &pBatch->listq);
|
||||||
|
|
||||||
|
taosHashPut(pThrd->batchCache, key, klen, &pBatchList, sizeof(void*));
|
||||||
|
} else {
|
||||||
|
if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) {
|
||||||
|
SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
|
||||||
|
QUEUE_INIT(&pBatch->wq);
|
||||||
|
QUEUE_INIT(&pBatch->listq);
|
||||||
|
|
||||||
|
QUEUE_PUSH(&pBatch->wq, h);
|
||||||
|
pBatch->wLen += 1;
|
||||||
|
pBatch->batchSize = pMsg->msg.contLen;
|
||||||
|
pBatch->pList = *ppBatchList;
|
||||||
|
|
||||||
|
QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq);
|
||||||
|
(*ppBatchList)->len += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
queue* hdr = QUEUE_TAIL(&((*ppBatchList)->wq));
|
||||||
|
SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq);
|
||||||
|
if ((pBatch->batchSize + pMsg->msg.contLen) < (*ppBatchList)->batchLenLimit) {
|
||||||
|
QUEUE_PUSH(&pBatch->wq, h);
|
||||||
|
pBatch->batchSize += pMsg->msg.contLen;
|
||||||
|
pBatch->wLen += 1;
|
||||||
|
} else {
|
||||||
|
SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
|
||||||
|
QUEUE_INIT(&pBatch->wq);
|
||||||
|
QUEUE_INIT(&pBatch->listq);
|
||||||
|
|
||||||
|
QUEUE_PUSH(&pBatch->wq, h);
|
||||||
|
pBatch->wLen += 1;
|
||||||
|
pBatch->batchSize += pMsg->msg.contLen;
|
||||||
|
pBatch->pList = *ppBatchList;
|
||||||
|
|
||||||
|
QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq);
|
||||||
|
(*ppBatchList)->len += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
|
static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
|
||||||
STrans* pInst = pThrd->pTransInst;
|
STrans* pInst = pThrd->pTransInst;
|
||||||
|
|
||||||
|
@ -1746,75 +1817,7 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pMsg->type == Normal && REQUEST_NO_RESP(&pMsg->msg)) {
|
if (pMsg->type == Normal && REQUEST_NO_RESP(&pMsg->msg)) {
|
||||||
STransConnCtx* pCtx = pMsg->ctx;
|
cliBuildBatch(pMsg, h, pThrd);
|
||||||
|
|
||||||
char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet);
|
|
||||||
uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
|
|
||||||
char key[TSDB_FQDN_LEN + 64] = {0};
|
|
||||||
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
|
|
||||||
size_t klen = strlen(key);
|
|
||||||
SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, klen);
|
|
||||||
if (ppBatchList == NULL || *ppBatchList == NULL) {
|
|
||||||
SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList));
|
|
||||||
QUEUE_INIT(&pBatchList->wq);
|
|
||||||
pBatchList->connMax = pInst->connLimitNum;
|
|
||||||
pBatchList->connCnt = 0;
|
|
||||||
pBatchList->batchLenLimit = pInst->batchSize;
|
|
||||||
pBatchList->len += 1;
|
|
||||||
|
|
||||||
pBatchList->ip = taosStrdup(ip);
|
|
||||||
pBatchList->dst = taosStrdup(key);
|
|
||||||
pBatchList->port = port;
|
|
||||||
|
|
||||||
SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
|
|
||||||
QUEUE_INIT(&pBatch->wq);
|
|
||||||
QUEUE_INIT(&pBatch->listq);
|
|
||||||
|
|
||||||
QUEUE_PUSH(&pBatch->wq, h);
|
|
||||||
pBatch->wLen += 1;
|
|
||||||
pBatch->batchSize += pMsg->msg.contLen;
|
|
||||||
pBatch->pList = pBatchList;
|
|
||||||
|
|
||||||
QUEUE_PUSH(&pBatchList->wq, &pBatch->listq);
|
|
||||||
|
|
||||||
taosHashPut(pThrd->batchCache, key, klen, &pBatchList, sizeof(void*));
|
|
||||||
} else {
|
|
||||||
if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) {
|
|
||||||
SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
|
|
||||||
QUEUE_INIT(&pBatch->wq);
|
|
||||||
QUEUE_INIT(&pBatch->listq);
|
|
||||||
|
|
||||||
QUEUE_PUSH(&pBatch->wq, h);
|
|
||||||
pBatch->wLen += 1;
|
|
||||||
pBatch->batchSize = pMsg->msg.contLen;
|
|
||||||
pBatch->pList = *ppBatchList;
|
|
||||||
|
|
||||||
QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq);
|
|
||||||
(*ppBatchList)->len += 1;
|
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
queue* hdr = QUEUE_TAIL(&((*ppBatchList)->wq));
|
|
||||||
SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq);
|
|
||||||
if ((pBatch->batchSize + pMsg->msg.contLen) < (*ppBatchList)->batchLenLimit) {
|
|
||||||
QUEUE_PUSH(&pBatch->wq, h);
|
|
||||||
pBatch->batchSize += pMsg->msg.contLen;
|
|
||||||
pBatch->wLen += 1;
|
|
||||||
} else {
|
|
||||||
SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
|
|
||||||
QUEUE_INIT(&pBatch->wq);
|
|
||||||
QUEUE_INIT(&pBatch->listq);
|
|
||||||
|
|
||||||
QUEUE_PUSH(&pBatch->wq, h);
|
|
||||||
pBatch->wLen += 1;
|
|
||||||
pBatch->batchSize += pMsg->msg.contLen;
|
|
||||||
pBatch->pList = *ppBatchList;
|
|
||||||
|
|
||||||
QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq);
|
|
||||||
(*ppBatchList)->len += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
(*cliAsyncHandle[pMsg->type])(pMsg, pThrd);
|
(*cliAsyncHandle[pMsg->type])(pMsg, pThrd);
|
||||||
|
@ -1847,12 +1850,7 @@ static void cliAsyncCb(uv_async_t* handle) {
|
||||||
QUEUE_MOVE(&item->qmsg, &wq);
|
QUEUE_MOVE(&item->qmsg, &wq);
|
||||||
taosThreadMutexUnlock(&item->mtx);
|
taosThreadMutexUnlock(&item->mtx);
|
||||||
|
|
||||||
int8_t supportBatch = pTransInst->supportBatch;
|
cliDealFunc[pTransInst->supportBatch](&wq, pThrd);
|
||||||
if (supportBatch == 0) {
|
|
||||||
cliNoBatchDealReq(&wq, pThrd);
|
|
||||||
} else if (supportBatch == 1) {
|
|
||||||
cliBatchDealReq(&wq, pThrd);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd);
|
if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue