Merge remote-tracking branch 'origin/3.0' into enh/opt-transport

This commit is contained in:
yihaoDeng 2024-09-29 08:59:15 +08:00
parent 22f0d9b793
commit 08f962a1d6
8 changed files with 44 additions and 30 deletions

View File

@ -128,12 +128,12 @@ typedef struct SRpcInit {
int32_t connLimitLock;
int32_t timeToGetConn;
int8_t supportBatch; // 0: no batch, 1. batch
int32_t batchSize;
int32_t shareConnLimit;
int8_t shareConn; // 0: no share, 1. share
int8_t notWaitAvaliableConn; // 1: wait to get, 0: no wait
int8_t startReadTimer;
void *parent;
void *parent;
} SRpcInit;
typedef struct {

View File

@ -370,6 +370,7 @@ int32_t openTransporter(const char *user, const char *auth, int32_t numOfThread,
connLimitNum = TMAX(connLimitNum, 10);
connLimitNum = TMIN(connLimitNum, 1000);
rpcInit.connLimitNum = connLimitNum;
rpcInit.shareConnLimit = 8;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
int32_t code = taosVersionStrToInt(version, &(rpcInit.compatibilityVer));

View File

@ -404,7 +404,7 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.connLimitNum = connLimitNum;
rpcInit.connLimitLock = 1;
rpcInit.supportBatch = 1;
rpcInit.batchSize = 8 * 1024;
rpcInit.shareConnLimit = 16;
rpcInit.shareConn = 1;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
rpcInit.notWaitAvaliableConn = 0;
@ -454,7 +454,7 @@ int32_t dmInitStatusClient(SDnode *pDnode) {
rpcInit.connLimitNum = connLimitNum;
rpcInit.connLimitLock = 1;
rpcInit.supportBatch = 1;
rpcInit.batchSize = 8 * 1024;
rpcInit.shareConnLimit = 32;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
rpcInit.startReadTimer = 1;
@ -503,7 +503,7 @@ int32_t dmInitSyncClient(SDnode *pDnode) {
rpcInit.connLimitNum = connLimitNum;
rpcInit.connLimitLock = 1;
rpcInit.supportBatch = 1;
rpcInit.batchSize = 8 * 1024;
rpcInit.shareConnLimit = 64;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
rpcInit.startReadTimer = 1;
@ -560,6 +560,7 @@ int32_t dmInitServer(SDnode *pDnode) {
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.parent = pDnode;
rpcInit.compressSize = tsCompressMsgSize;
rpcInit.shareConnLimit = 16;
if (taosVersionStrToInt(version, &(rpcInit.compatibilityVer)) != 0) {
dError("failed to convert version string:%s to int", version);

View File

@ -70,7 +70,7 @@ typedef struct {
int32_t connLimitNum;
int8_t connLimitLock; // 0: no lock. 1. lock
int8_t supportBatch; // 0: no batch, 1: support batch
int32_t batchSize;
int32_t shareConnLimit;
int8_t optBatchFetch;
int32_t timeToGetConn;
int index;

View File

@ -79,7 +79,10 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->connLimitLock = pInit->connLimitLock;
pRpc->supportBatch = pInit->supportBatch;
pRpc->batchSize = pInit->batchSize;
pRpc->shareConnLimit = pInit->shareConnLimit;
if (pRpc->shareConnLimit <= 0) {
pRpc->shareConnLimit = BUFFER_LIMIT;
}
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
if (pRpc->numOfThreads <= 0) {

View File

@ -48,7 +48,7 @@ typedef struct {
queue wq;
queue listq;
int32_t wLen;
int32_t batchSize; //
int32_t shareConnLimit; //
int32_t batch;
SCliBatchList* pList;
} SCliBatch;
@ -961,6 +961,7 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int
int32_t code = 0;
int32_t lino = 0;
STrans* pInst = pThrd->pInst;
SCliConn* conn = taosMemoryCalloc(1, sizeof(SCliConn));
if (conn == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _failed);
@ -971,6 +972,9 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int
conn->dstAddr = taosStrdup(addr);
conn->ipStr = taosStrdup(ip);
conn->port = port;
if (conn->dstAddr == NULL || conn->ipStr == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _failed);
}
conn->hostThrd = pThrd;
conn->status = ConnNormal;
@ -1006,8 +1010,8 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int
TAOS_CHECK_GOTO(code, NULL, _failed);
}
conn->bufSize = BUFFER_LIMIT;
conn->buf = (uv_buf_t*)taosMemoryCalloc(1, BUFFER_LIMIT * sizeof(uv_buf_t));
conn->bufSize = pInst->shareConnLimit;
conn->buf = (uv_buf_t*)taosMemoryCalloc(1, pInst->shareConnLimit * sizeof(uv_buf_t));
if (conn->buf == NULL) {
TAOS_CHECK_GOTO(terrno, NULL, _failed);
}
@ -1870,7 +1874,7 @@ static void cliBuildBatch(SCliReq* pReq, queue* h, SCliThrd* pThrd) {
return;
}
pBatchList->batchLenLimit = pInst->batchSize;
pBatchList->batchLenLimit = pInst->shareConnLimit;
SCliBatch* pBatch = NULL;
code = createBatch(&pBatch, pBatchList, pReq);
@ -1895,9 +1899,9 @@ static void cliBuildBatch(SCliReq* pReq, queue* h, SCliThrd* pThrd) {
} else {
queue* hdr = QUEUE_TAIL(&((*ppBatchList)->wq));
SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq);
if ((pBatch->batchSize + pReq->msg.contLen) < (*ppBatchList)->batchLenLimit) {
if ((pBatch->shareConnLimit + pReq->msg.contLen) < (*ppBatchList)->batchLenLimit) {
QUEUE_PUSH(&pBatch->wq, h);
pBatch->batchSize += pReq->msg.contLen;
pBatch->shareConnLimit += pReq->msg.contLen;
pBatch->wLen += 1;
} else {
SCliBatch* tBatch = NULL;
@ -1962,7 +1966,7 @@ static int32_t createBatch(SCliBatch** ppBatch, SCliBatchList* pList, SCliReq* p
QUEUE_PUSH(&pBatch->wq, &pReq->q);
pBatch->wLen += 1;
pBatch->batchSize = pReq->msg.contLen;
pBatch->shareConnLimit = pReq->msg.contLen;
pBatch->pList = pList;
QUEUE_PUSH(&pList->wq, &pBatch->listq);
@ -3332,12 +3336,12 @@ static int32_t getOrCreateHeap(SHashObj* pConnHeapCache, char* key, SHeap** pHea
return code;
}
static FORCE_INLINE int8_t shouldSWitchToOtherConn(int32_t reqNum, int32_t sentNum, int32_t stateNum) {
static FORCE_INLINE int8_t shouldSWitchToOtherConn(STrans* pInst, int32_t reqNum, int32_t sentNum, int32_t stateNum) {
int32_t total = reqNum + sentNum;
if (stateNum >= STATE_BUFFER_LIMIT) {
if (stateNum >= pInst->shareConnLimit) {
return 1;
}
if (total >= BUFFER_LIMIT) {
if (total >= pInst->shareConnLimit) {
return 1;
}
@ -3353,11 +3357,13 @@ static FORCE_INLINE bool filterToDebug(void* e, void* arg) {
static FORCE_INLINE void logConnMissHit(SCliConn* pConn) {
// queue set;
// QUEUE_INIT(&set);
SCliThrd* pThrd = pConn->hostThrd;
STrans* pInst = pThrd->pInst;
pConn->heapMissHit++;
tDebug("conn %p has %d reqs, %d sentout and %d status in process, total limit:%d, switch to other conn", pConn,
transQueueSize(&pConn->reqsToSend), transQueueSize(&pConn->reqsSentOut), taosHashGetSize(pConn->pQTable),
BUFFER_LIMIT);
// if (transQueueSize(&pConn->reqsSentOut) >= BUFFER_LIMIT) {
pInst->shareConnLimit);
// if (transQueueSize(&pConn->reqsSentOut) >= pInst->shareConnLimit) {
// transQueueRemoveByFilter(&pConn->reqsSentOut, filterToDebug, NULL, &set, 1);
// }
}
@ -3376,11 +3382,12 @@ static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) {
return NULL;
} else {
tDebug("get conn %p from heap cache for key:%s, status:%d, refCnt:%d", pConn, key, pConn->inHeap, pConn->reqRefCnt);
int32_t reqsNum = transQueueSize(&pConn->reqsToSend);
int32_t reqsSentOut = transQueueSize(&pConn->reqsSentOut);
int32_t stateNum = taosHashGetSize(pConn->pQTable);
if (shouldSWitchToOtherConn(reqsNum, reqsSentOut, stateNum)) {
int32_t reqsNum = transQueueSize(&pConn->reqsToSend);
int32_t reqsSentOut = transQueueSize(&pConn->reqsSentOut);
int32_t stateNum = taosHashGetSize(pConn->pQTable);
SCliThrd* pThrd = pConn->hostThrd;
STrans* pInst = pThrd->pInst;
if (shouldSWitchToOtherConn(pInst, reqsNum, reqsSentOut, stateNum)) {
logConnMissHit(pConn);
return NULL;
}

View File

@ -1293,7 +1293,6 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
TAOS_CHECK_GOTO(code, &lino, _end);
}
// memset(&pConn->regArg, 0, sizeof(pConn->regArg));
pConn->broken = false;
pConn->status = ConnNormal;
@ -1320,7 +1319,6 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
pConn->refId = exh->refId;
QUEUE_INIT(&exh->q);
transRefSrvHandle(pConn);
tTrace("%s handle %p, conn %p created, refId:%" PRId64, transLabel(pInst), exh, pConn, pConn->refId);
pConn->pQTable = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
@ -1337,8 +1335,11 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _end);
}
pConn->bufSize = BUFFER_LIMIT;
pConn->buf = taosMemoryCalloc(1, sizeof(uv_buf_t));
pConn->bufSize = pInst->shareConnLimit;
pConn->buf = taosMemoryCalloc(1, pInst->shareConnLimit * sizeof(uv_buf_t));
if (pConn->buf == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _end);
}
code = uv_tcp_init(pThrd->loop, pConn->pTcp);
if (code != 0) {
@ -1351,6 +1352,7 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
pConn->pInst = pThrd->pInst;
pConn->hostThrd = pThrd;
transRefSrvHandle(pConn);
return pConn;
_end:
if (pConn) {

View File

@ -117,7 +117,7 @@ int main(int argc, char *argv[]) {
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.connLimitNum = 10;
rpcInit.connLimitLock = 1;
rpcInit.batchSize = 16 * 1024;
rpcInit.shareConnLimit = 16 * 1024;
rpcInit.supportBatch = 1;
rpcDebugFlag = 135;