port main to 3.0
This commit is contained in:
parent
9429c9d04c
commit
f853272d77
|
@ -585,11 +585,11 @@ void* destroyConnPool(SCliThrd* pThrd) {
|
|||
static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
|
||||
void* pool = pThrd->pool;
|
||||
STrans* pTranInst = pThrd->pTransInst;
|
||||
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key) + 1);
|
||||
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
|
||||
if (plist == NULL) {
|
||||
SConnList list = {0};
|
||||
taosHashPut((SHashObj*)pool, key, strlen(key) + 1, (void*)&list, sizeof(list));
|
||||
plist = taosHashGet(pool, key, strlen(key) + 1);
|
||||
taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
|
||||
plist = taosHashGet(pool, key, strlen(key));
|
||||
|
||||
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
|
||||
QUEUE_INIT(&nList->msgQ);
|
||||
|
@ -624,11 +624,11 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
|
|||
static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
|
||||
void* pool = pThrd->pool;
|
||||
STrans* pTransInst = pThrd->pTransInst;
|
||||
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key) + 1);
|
||||
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
|
||||
if (plist == NULL) {
|
||||
SConnList list = {0};
|
||||
taosHashPut((SHashObj*)pool, key, strlen(key) + 1, (void*)&list, sizeof(list));
|
||||
plist = taosHashGet(pool, key, strlen(key) + 1);
|
||||
taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
|
||||
plist = taosHashGet(pool, key, strlen(key));
|
||||
|
||||
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
|
||||
QUEUE_INIT(&nList->msgQ);
|
||||
|
@ -714,7 +714,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
|
|||
cliDestroyConnMsgs(conn, false);
|
||||
|
||||
if (conn->list == NULL) {
|
||||
conn->list = taosHashGet((SHashObj*)pool, conn->dstAddr, strlen(conn->dstAddr) + 1);
|
||||
conn->list = taosHashGet((SHashObj*)pool, conn->dstAddr, strlen(conn->dstAddr));
|
||||
}
|
||||
|
||||
SConnList* pList = conn->list;
|
||||
|
@ -1279,7 +1279,7 @@ static void cliHandleFastFail(SCliConn* pConn, int status) {
|
|||
|
||||
if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) &&
|
||||
(pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) {
|
||||
SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr) + 1);
|
||||
SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr));
|
||||
int64_t cTimestamp = taosGetTimestampMs();
|
||||
if (item != NULL) {
|
||||
int32_t elapse = cTimestamp - item->timestamp;
|
||||
|
@ -1291,7 +1291,7 @@ static void cliHandleFastFail(SCliConn* pConn, int status) {
|
|||
}
|
||||
} else {
|
||||
SFailFastItem item = {.count = 1, .timestamp = cTimestamp};
|
||||
taosHashPut(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr) + 1, &item, sizeof(SFailFastItem));
|
||||
taosHashPut(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr), &item, sizeof(SFailFastItem));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -1471,7 +1471,7 @@ FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) {
|
|||
}
|
||||
static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn) {
|
||||
uint32_t addr = 0;
|
||||
uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn) + 1);
|
||||
uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn));
|
||||
if (v == NULL) {
|
||||
addr = taosGetIpv4FromFqdn(fqdn);
|
||||
if (addr == 0xffffffff) {
|
||||
|
@ -1480,7 +1480,7 @@ static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn)
|
|||
return addr;
|
||||
}
|
||||
|
||||
taosHashPut(cache, fqdn, strlen(fqdn) + 1, &addr, sizeof(addr));
|
||||
taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr));
|
||||
} else {
|
||||
addr = *v;
|
||||
}
|
||||
|
@ -1490,13 +1490,13 @@ static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) {
|
|||
// impl later
|
||||
uint32_t addr = taosGetIpv4FromFqdn(fqdn);
|
||||
if (addr != 0xffffffff) {
|
||||
uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn) + 1);
|
||||
uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn));
|
||||
if (addr != *v) {
|
||||
char old[64] = {0}, new[64] = {0};
|
||||
tinet_ntoa(old, *v);
|
||||
tinet_ntoa(new, addr);
|
||||
tWarn("update ip of fqdn:%s, old: %s, new: %s", fqdn, old, new);
|
||||
taosHashPut(cache, fqdn, strlen(fqdn) + 1, &addr, sizeof(addr));
|
||||
taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr));
|
||||
}
|
||||
}
|
||||
return;
|
||||
|
@ -1537,21 +1537,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
|||
return;
|
||||
}
|
||||
|
||||
if (rpcDebugFlag & DEBUG_TRACE) {
|
||||
if (tmsgIsValid(pMsg->msg.msgType)) {
|
||||
char buf[128] = {0};
|
||||
sprintf(buf, "%s", TMSG_INFO(pMsg->msg.msgType));
|
||||
int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf));
|
||||
if (NULL == 0) {
|
||||
int localCount = 1;
|
||||
taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
|
||||
} else {
|
||||
int localCount = *count + 1;
|
||||
taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
char* fqdn = EPSET_GET_INUSE_IP(&pMsg->ctx->epSet);
|
||||
uint16_t port = EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet);
|
||||
char addr[TSDB_FQDN_LEN + 64] = {0};
|
||||
|
@ -1705,7 +1690,6 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
|
|||
char key[TSDB_FQDN_LEN + 64] = {0};
|
||||
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
|
||||
|
||||
// SCliBatch** ppBatch = taosHashGet(pThrd->batchCache, key, sizeof(key));
|
||||
SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, sizeof(key));
|
||||
if (ppBatchList == NULL || *ppBatchList == NULL) {
|
||||
SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList));
|
||||
|
@ -1800,21 +1784,6 @@ static void cliAsyncCb(uv_async_t* handle) {
|
|||
QUEUE_MOVE(&item->qmsg, &wq);
|
||||
taosThreadMutexUnlock(&item->mtx);
|
||||
|
||||
if (rpcDebugFlag & DEBUG_TRACE) {
|
||||
void* pIter = taosHashIterate(pThrd->msgCount, NULL);
|
||||
while (pIter != NULL) {
|
||||
int* count = pIter;
|
||||
size_t len = 0;
|
||||
char* key = taosHashGetKey(pIter, &len);
|
||||
if (*count != 0) {
|
||||
tDebug("key: %s count: %d", key, *count);
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(pThrd->msgCount, pIter);
|
||||
}
|
||||
tDebug("all conn count: %d", pThrd->newConnCount);
|
||||
}
|
||||
|
||||
int8_t supportBatch = pTransInst->supportBatch;
|
||||
if (supportBatch == 0) {
|
||||
cliNoBatchDealReq(&wq, pThrd);
|
||||
|
@ -2411,20 +2380,6 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
|||
tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
|
||||
}
|
||||
}
|
||||
if (rpcDebugFlag & DEBUG_TRACE) {
|
||||
if (tmsgIsValid(pResp->msgType - 1)) {
|
||||
char buf[128] = {0};
|
||||
sprintf(buf, "%s", TMSG_INFO(pResp->msgType - 1));
|
||||
int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf));
|
||||
if (NULL == 0) {
|
||||
int localCount = 0;
|
||||
taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
|
||||
} else {
|
||||
int localCount = *count - 1;
|
||||
taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (pCtx->pSem || pCtx->syncMsgRef != 0) {
|
||||
tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn);
|
||||
if (pCtx->pSem) {
|
||||
|
|
Loading…
Reference in New Issue