Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/ly_max_delay

This commit is contained in:
54liuyao 2024-02-04 15:08:28 +08:00
commit 9b7ac50f59
2 changed files with 12 additions and 11 deletions

View File

@ -2515,7 +2515,7 @@ int transReleaseCliHandle(void* handle) {
SCliThrd* pThrd = transGetWorkThrdFromHandle(NULL, (int64_t)handle); SCliThrd* pThrd = transGetWorkThrdFromHandle(NULL, (int64_t)handle);
if (pThrd == NULL) { if (pThrd == NULL) {
return -1; return TSDB_CODE_RPC_BROKEN_LINK;
} }
STransMsg tmsg = {.info.handle = handle, .info.ahandle = (void*)0x9527}; STransMsg tmsg = {.info.handle = handle, .info.ahandle = (void*)0x9527};
@ -2535,7 +2535,7 @@ int transReleaseCliHandle(void* handle) {
if (0 != transAsyncSend(pThrd->asyncPool, &cmsg->q)) { if (0 != transAsyncSend(pThrd->asyncPool, &cmsg->q)) {
destroyCmsg(cmsg); destroyCmsg(cmsg);
return -1; return TSDB_CODE_RPC_BROKEN_LINK;
} }
return 0; return 0;
} }
@ -2544,7 +2544,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
if (pTransInst == NULL) { if (pTransInst == NULL) {
transFreeMsg(pReq->pCont); transFreeMsg(pReq->pCont);
return -1; return TSDB_CODE_RPC_BROKEN_LINK;
} }
SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle); SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
@ -2577,7 +2577,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
if (0 != transAsyncSend(pThrd->asyncPool, &(cliMsg->q))) { if (0 != transAsyncSend(pThrd->asyncPool, &(cliMsg->q))) {
destroyCmsg(cliMsg); destroyCmsg(cliMsg);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return -1; return TSDB_CODE_RPC_BROKEN_LINK;
} }
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return 0; return 0;
@ -2589,7 +2589,7 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs
if (pTransInst == NULL) { if (pTransInst == NULL) {
transFreeMsg(pReq->pCont); transFreeMsg(pReq->pCont);
taosMemoryFree(pTransRsp); taosMemoryFree(pTransRsp);
return -1; return TSDB_CODE_RPC_BROKEN_LINK;
} }
SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle); SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
@ -2627,6 +2627,7 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs
int ret = transAsyncSend(pThrd->asyncPool, &cliMsg->q); int ret = transAsyncSend(pThrd->asyncPool, &cliMsg->q);
if (ret != 0) { if (ret != 0) {
destroyCmsg(cliMsg); destroyCmsg(cliMsg);
ret = TSDB_CODE_RPC_BROKEN_LINK;
goto _RETURN; goto _RETURN;
} }
tsem_wait(sem); tsem_wait(sem);
@ -2661,7 +2662,7 @@ int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STr
if (pTransInst == NULL) { if (pTransInst == NULL) {
transFreeMsg(pReq->pCont); transFreeMsg(pReq->pCont);
taosMemoryFree(pTransMsg); taosMemoryFree(pTransMsg);
return -1; return TSDB_CODE_RPC_BROKEN_LINK;
} }
SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle); SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
@ -2698,6 +2699,7 @@ int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STr
int ret = transAsyncSend(pThrd->asyncPool, &cliMsg->q); int ret = transAsyncSend(pThrd->asyncPool, &cliMsg->q);
if (ret != 0) { if (ret != 0) {
destroyCmsg(cliMsg); destroyCmsg(cliMsg);
ret = TSDB_CODE_RPC_BROKEN_LINK;
goto _RETURN; goto _RETURN;
} }
@ -2726,7 +2728,7 @@ _RETURN:
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
if (pTransInst == NULL) { if (pTransInst == NULL) {
return -1; return TSDB_CODE_RPC_BROKEN_LINK;
} }
SCvtAddr cvtAddr = {0}; SCvtAddr cvtAddr = {0};
@ -2750,7 +2752,6 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
if (transAsyncSend(thrd->asyncPool, &(cliMsg->q)) != 0) { if (transAsyncSend(thrd->asyncPool, &(cliMsg->q)) != 0) {
destroyCmsg(cliMsg); destroyCmsg(cliMsg);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return -1;
} }
} }
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);

View File

@ -89,7 +89,7 @@ static void *tQWorkerThreadFp(SQueueWorker *worker) {
if (qinfo.timestamp != 0) { if (qinfo.timestamp != 0) {
int64_t cost = taosGetTimestampUs() - qinfo.timestamp; int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
if (cost > QUEUE_THRESHOLD) { if (cost > QUEUE_THRESHOLD) {
uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost); uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost / QUEUE_THRESHOLD);
} }
} }
@ -210,7 +210,7 @@ static void *tAutoQWorkerThreadFp(SQueueWorker *worker) {
if (qinfo.timestamp != 0) { if (qinfo.timestamp != 0) {
int64_t cost = taosGetTimestampUs() - qinfo.timestamp; int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
if (cost > QUEUE_THRESHOLD) { if (cost > QUEUE_THRESHOLD) {
uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost); uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost / QUEUE_THRESHOLD);
} }
} }
@ -357,7 +357,7 @@ static void *tWWorkerThreadFp(SWWorker *worker) {
if (qinfo.timestamp != 0) { if (qinfo.timestamp != 0) {
int64_t cost = taosGetTimestampUs() - qinfo.timestamp; int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
if (cost > QUEUE_THRESHOLD) { if (cost > QUEUE_THRESHOLD) {
uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost); uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost / QUEUE_THRESHOLD);
} }
} }