Merge branch 'fix/fixInitFailedLog2' into enh/TD-31494

This commit is contained in:
yihaoDeng 2024-08-22 21:16:06 +08:00
commit 4e6f90df63
10 changed files with 914 additions and 504 deletions

View File

@ -125,7 +125,8 @@ typedef struct SRpcInit {
int32_t timeToGetConn;
int8_t supportBatch; // 0: no batch, 1. batch
int32_t batchSize;
int8_t notWaitAvaliableConn; // 1: wait to get, 0: no wait
int8_t shareConn; // 0: no share, 1. share
int8_t notWaitAvaliableConn; // 1: wait to get, 0: no wait
void *parent;
} SRpcInit;

View File

@ -386,6 +386,7 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.connLimitLock = 1;
rpcInit.supportBatch = 1;
rpcInit.batchSize = 8 * 1024;
rpcInit.shareConn = 1;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
rpcInit.notWaitAvaliableConn = 0;

View File

@ -3413,6 +3413,9 @@ int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, cons
size_t vlen = 0;
const char* valStr = rocksdb_iter_value(pCur->iter, &vlen);
*pVLen = valueDecode((void*)valStr, vlen, NULL, (char**)pVal);
if (*pVLen < 0) {
return -1;
}
}
*pKey = pKtmp->key;

View File

@ -186,6 +186,7 @@ typedef struct {
uint32_t code; // del later
uint32_t msgType;
int32_t msgLen;
int32_t seqNum;
uint8_t content[0]; // message body starts from here
} STransMsgHead;

View File

@ -71,11 +71,13 @@ typedef struct {
int8_t connLimitLock; // 0: no lock. 1. lock
int8_t supportBatch; // 0: no batch, 1: support batch
int32_t batchSize;
int8_t optBatchFetch;
int32_t timeToGetConn;
int index;
void* parent;
void* tcphandle; // returned handle from TCP initialization
int64_t refId;
int8_t shareConn;
TdThreadMutex mutex;
} SRpcInfo;

View File

@ -115,6 +115,8 @@ void* rpcOpen(const SRpcInit* pInit) {
int64_t refId = transAddExHandle(transGetInstMgt(), pRpc);
(void)transAcquireExHandle(transGetInstMgt(), refId);
pRpc->refId = refId;
pRpc->shareConn = pInit->shareConn;
return (void*)refId;
_end:
taosMemoryFree(pRpc);

File diff suppressed because it is too large Load Diff

View File

@ -330,6 +330,7 @@ int transAsyncSend(SAsyncPool* pool, queue* q) {
(void)taosThreadMutexLock(&item->mtx);
QUEUE_PUSH(&item->qmsg, q);
(void)taosThreadMutexUnlock(&item->mtx);
int ret = uv_async_send(async);
if (ret != 0) {
tError("failed to send async,reason:%s", uv_err_name(ret));

View File

@ -33,8 +33,8 @@ typedef struct SSvrConn {
queue queue;
SConnBuffer readBuf; // read buf,
int inType;
void* pTransInst; // rpc init
void* ahandle; //
void* pInst; // rpc init
void* ahandle; //
void* hostThrd;
STransQueue srvMsgs;
@ -92,7 +92,7 @@ typedef struct SWorkThrd {
queue msg;
queue conn;
void* pTransInst;
void* pInst;
bool quit;
SIpWhiteListTab* pWhiteList;
@ -369,8 +369,66 @@ void uvWhiteListSetConnVer(SIpWhiteListTab* pWhite, SSvrConn* pConn) {
pConn->whiteListVer = pWhite->ver;
}
static void uvPerfLog_receive(SSvrConn* pConn, STransMsgHead* pHead, STransMsg* pTransMsg) {
if (!(rpcDebugFlag & DEBUG_DEBUG)) {
return;
}
STrans* pInst = pConn->pInst;
STraceId* trace = &pHead->traceId;
int64_t cost = taosGetTimestampUs() - taosNtoh64(pHead->timestamp);
static int64_t EXCEPTION_LIMIT_US = 100 * 1000;
if (pConn->status == ConnNormal && pHead->noResp == 0) {
// transRefSrvHandle(pConn);
if (cost >= EXCEPTION_LIMIT_US) {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception", transLabel(pInst),
pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, (int)cost);
} else {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus", transLabel(pInst), pConn,
TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, (int)cost);
}
} else {
if (cost >= EXCEPTION_LIMIT_US) {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, recv exception",
transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen,
pHead->noResp, pTransMsg->code, (int)(cost));
} else {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus", transLabel(pInst),
pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, pHead->noResp,
pTransMsg->code, (int)(cost));
}
}
tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pInst), pTransMsg->info.handle, pConn,
pConn->refId);
}
static int8_t uvValidConn(SSvrConn* pConn) {
STrans* pInst = pConn->pInst;
SWorkThrd* pThrd = pConn->hostThrd;
int8_t forbiddenIp = 0;
if (pThrd->enableIpWhiteList) {
forbiddenIp = !uvWhiteListCheckConn(pThrd->pWhiteList, pConn) ? 1 : 0;
if (forbiddenIp == 0) {
uvWhiteListSetConnVer(pThrd->pWhiteList, pConn);
}
}
return forbiddenIp;
}
static void uvMaySetConnAcquired(SSvrConn* pConn, STransMsgHead* pHead) {
if (pConn->status == ConnNormal) {
if (pHead->persist == 1) {
pConn->status = ConnAcquire;
transRefSrvHandle(pConn);
tDebug("conn %p acquired by server app", pConn);
} else if (pHead->noResp == 0) {
transRefSrvHandle(pConn);
}
}
}
static bool uvHandleReq(SSvrConn* pConn) {
STrans* pTransInst = pConn->pTransInst;
STrans* pInst = pConn->pInst;
SWorkThrd* pThrd = pConn->hostThrd;
STransMsgHead* pHead = NULL;
@ -378,15 +436,15 @@ static bool uvHandleReq(SSvrConn* pConn) {
int8_t resetBuf = pConn->status == ConnAcquire ? 0 : 1;
int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&pHead, resetBuf);
if (msgLen <= 0) {
tError("%s conn %p read invalid packet", transLabel(pTransInst), pConn);
tError("%s conn %p read invalid packet", transLabel(pInst), pConn);
return false;
}
if (resetBuf == 0) {
tTrace("%s conn %p not reset read buf", transLabel(pTransInst), pConn);
tTrace("%s conn %p not reset read buf", transLabel(pInst), pConn);
}
if (transDecompressMsg((char**)&pHead, msgLen) < 0) {
tError("%s conn %p recv invalid packet, failed to decompress", transLabel(pTransInst), pConn);
tError("%s conn %p recv invalid packet, failed to decompress", transLabel(pInst), pConn);
return false;
}
pHead->code = htonl(pHead->code);
@ -407,75 +465,32 @@ static bool uvHandleReq(SSvrConn* pConn) {
return true;
}
// TODO(dengyihao): time-consuming task throwed into BG Thread
// uv_work_t* wreq = taosMemoryMalloc(sizeof(uv_work_t));
// wreq->data = pConn;
// uv_read_stop((uv_stream_t*)pConn->pTcp);
// transRefSrvHandle(pConn);
// uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask);
STransMsg transMsg;
memset(&transMsg, 0, sizeof(transMsg));
STransMsg transMsg = {0};
transMsg.contLen = transContLenFromMsg(pHead->msgLen);
transMsg.pCont = pHead->content;
transMsg.msgType = pHead->msgType;
transMsg.code = pHead->code;
if (pConn->status == ConnNormal) {
if (pHead->persist == 1) {
pConn->status = ConnAcquire;
transRefSrvHandle(pConn);
tDebug("conn %p acquired by server app", pConn);
}
if (pHead->seqNum != 0) {
ASSERT(0);
}
STraceId* trace = &pHead->traceId;
int64_t cost = taosGetTimestampUs() - taosNtoh64(pHead->timestamp);
static int64_t EXCEPTION_LIMIT_US = 100 * 1000;
if (pConn->status == ConnNormal && pHead->noResp == 0) {
transRefSrvHandle(pConn);
if (cost >= EXCEPTION_LIMIT_US) {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception",
transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, (int)cost);
} else {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus", transLabel(pTransInst), pConn,
TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, (int)cost);
}
} else {
if (cost >= EXCEPTION_LIMIT_US) {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, recv exception",
transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp,
transMsg.code, (int)(cost));
} else {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus",
transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp,
transMsg.code, (int)(cost));
}
}
// pHead->noResp = 1,
// 1. server application should not send resp on handle
// 2. once send out data, cli conn released to conn pool immediately
// 3. not mixed with persist
transMsg.info.ahandle = (void*)pHead->ahandle;
transMsg.info.handle = (void*)transAcquireExHandle(transGetRefMgt(), pConn->refId);
transMsg.info.refId = pConn->refId;
ASSERTS(transMsg.info.handle != NULL, "trans-svr failed to alloc handle to msg");
transMsg.info.refId = pHead->noResp == 1 ? -1 : pConn->refId;
transMsg.info.traceId = pHead->traceId;
transMsg.info.cliVer = htonl(pHead->compatibilityVer);
transMsg.info.forbiddenIp = forbiddenIp;
transMsg.info.noResp = pHead->noResp == 1 ? 1 : 0;
tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pTransInst), transMsg.info.handle, pConn,
pConn->refId);
ASSERTS(transMsg.info.handle != NULL, "trans-svr failed to alloc handle to msg");
if (transMsg.info.handle == NULL) {
return false;
}
uvMaySetConnAcquired(pConn, pHead);
if (pHead->noResp == 1) {
transMsg.info.refId = -1;
}
uvPerfLog_receive(pConn, pHead, &transMsg);
// set up conn info
SRpcConnInfo* pConnInfo = &(transMsg.info.conn);
@ -485,7 +500,7 @@ static bool uvHandleReq(SSvrConn* pConn) {
(void)transReleaseExHandle(transGetRefMgt(), pConn->refId);
(*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
(*pInst->cfp)(pInst->parent, &transMsg, NULL);
return true;
}
@ -500,7 +515,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
destroyConn(conn, true);
return;
}
STrans* pTransInst = conn->pTransInst;
STrans* pInst = conn->pInst;
SConnBuffer* pBuf = &conn->readBuf;
if (nread > 0) {
@ -508,16 +523,16 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
if (pBuf->len <= TRANS_PACKET_LIMIT) {
while (transReadComplete(pBuf)) {
if (true == pBuf->invalid || false == uvHandleReq(conn)) {
tError("%s conn %p read invalid packet, received from %s, local info:%s", transLabel(pTransInst), conn,
conn->dst, conn->src);
tError("%s conn %p read invalid packet, received from %s, local info:%s", transLabel(pInst), conn, conn->dst,
conn->src);
destroyConn(conn, true);
return;
}
}
return;
} else {
tError("%s conn %p read invalid packet, exceed limit, received from %s, local info:%s", transLabel(pTransInst),
conn, conn->dst, conn->src);
tError("%s conn %p read invalid packet, exceed limit, received from %s, local info:%s", transLabel(pInst), conn,
conn->dst, conn->src);
destroyConn(conn, true);
return;
}
@ -526,14 +541,14 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
return;
}
tDebug("%s conn %p read error:%s", transLabel(pTransInst), conn, uv_err_name(nread));
tDebug("%s conn %p read error:%s", transLabel(pInst), conn, uv_err_name(nread));
if (nread < 0) {
conn->broken = true;
if (conn->status == ConnAcquire) {
if (conn->regArg.init) {
tTrace("%s conn %p broken, notify server app", transLabel(pTransInst), conn);
STrans* pTransInst = conn->pTransInst;
(*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
tTrace("%s conn %p broken, notify server app", transLabel(pInst), conn);
STrans* pInst = conn->pInst;
(*pInst->cfp)(pInst->parent, &(conn->regArg.msg), NULL);
memset(&conn->regArg, 0, sizeof(conn->regArg));
}
}
@ -579,8 +594,8 @@ void uvOnSendCb(uv_write_t* req, int status) {
conn->regArg.init = 1;
conn->regArg.msg = msg->msg;
if (conn->broken) {
STrans* pTransInst = conn->pTransInst;
(pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
STrans* pInst = conn->pInst;
(pInst->cfp)(pInst->parent, &(conn->regArg.msg), NULL);
memset(&conn->regArg, 0, sizeof(conn->regArg));
}
(void)transQueuePop(&conn->srvMsgs);
@ -635,7 +650,7 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
pHead->traceId = pMsg->info.traceId;
pHead->hasEpSet = pMsg->info.hasEpSet;
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
pHead->compatibilityVer = htonl(((STrans*)pConn->pTransInst)->compatibilityVer);
pHead->compatibilityVer = htonl(((STrans*)pConn->pInst)->compatibilityVer);
pHead->version = TRANS_VER;
// handle invalid drop_task resp, TD-20098
@ -667,16 +682,16 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
char* msg = (char*)pHead;
int32_t len = transMsgLenFromCont(pMsg->contLen);
STrans* pTransInst = pConn->pTransInst;
if (pMsg->info.compressed == 0 && pConn->clientIp != pConn->serverIp && pTransInst->compressSize != -1 &&
pTransInst->compressSize < pMsg->contLen) {
STrans* pInst = pConn->pInst;
if (pMsg->info.compressed == 0 && pConn->clientIp != pConn->serverIp && pInst->compressSize != -1 &&
pInst->compressSize < pMsg->contLen) {
len = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
pHead->msgLen = (int32_t)htonl((uint32_t)len);
}
STraceId* trace = &pMsg->info.traceId;
tGDebug("%s conn %p %s is sent to %s, local info:%s, len:%d", transLabel(pTransInst), pConn,
TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, len);
tGDebug("%s conn %p %s is sent to %s, local info:%s, len:%d", transLabel(pInst), pConn, TMSG_INFO(pHead->msgType),
pConn->dst, pConn->src, len);
wb->base = (char*)pHead;
wb->len = len;
@ -841,8 +856,8 @@ static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) {
}
if (pConn->regArg.init) {
tTrace("conn %p release, notify server app", pConn);
STrans* pTransInst = pConn->pTransInst;
(*pTransInst->cfp)(pTransInst->parent, &(pConn->regArg.msg), NULL);
STrans* pInst = pConn->pInst;
(*pInst->cfp)(pInst->parent, &(pConn->regArg.msg), NULL);
memset(&pConn->regArg, 0, sizeof(pConn->regArg));
}
uvStartSendRespImpl(srvMsg);
@ -998,6 +1013,16 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
return;
}
// pConn->pInst = pThrd->pInst;
// /* init conn timer*/
// // uv_timer_init(pThrd->loop, &pConn->pTimer);
// // pConn->pTimer.data = pConn;
// pConn->hostThrd = pThrd;
// // init client handle
// pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
// uv_tcp_init(pThrd->loop, pConn->pTcp);
// pConn->pTcp->data = pConn;
if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
uv_os_fd_t fd;
(void)uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
@ -1181,7 +1206,6 @@ void* transWorkerThread(void* arg) {
static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
int32_t code = 0;
SWorkThrd* pThrd = hThrd;
STrans* pTransInst = pThrd->pTransInst;
int32_t lino;
SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn));
@ -1223,12 +1247,17 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _end);
}
STrans* pInst = pThrd->pInst;
pConn->refId = exh->refId;
QUEUE_INIT(&exh->q);
transRefSrvHandle(pConn);
tTrace("%s handle %p, conn %p created, refId:%" PRId64, transLabel(pTransInst), exh, pConn, pConn->refId);
tTrace("%s handle %p, conn %p created, refId:%" PRId64, transLabel(pInst), exh, pConn, pConn->refId);
pConn->pTransInst = pThrd->pTransInst;
pConn->pInst = pThrd->pInst;
/* init conn timer*/
// uv_timer_init(pThrd->loop, &pConn->pTimer);
// pConn->pTimer.data = pConn;
pConn->hostThrd = pThrd;
// init client handle
pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
@ -1238,8 +1267,8 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
code = uv_tcp_init(pThrd->loop, pConn->pTcp);
if (code != 0) {
tError("%s failed to create conn since %s" PRId64, transLabel(pTransInst), uv_strerror(code));
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _end);
tError("%s failed to create conn since %s" PRId64, transLabel(pInst), uv_strerror(code));
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, NULL, _end);
}
pConn->pTcp->data = pConn;
QUEUE_PUSH(&pThrd->conn, &pConn->queue);
@ -1253,7 +1282,7 @@ _end:
taosMemoryFree(pConn);
pConn = NULL;
}
tError("%s failed to create conn since %s, lino:%d" PRId64, transLabel(pTransInst), tstrerror(code), lino);
tError("%s failed to create conn since %s" PRId64, transLabel(pInst), tstrerror(code));
return NULL;
}
@ -1317,8 +1346,8 @@ static void uvDestroyConn(uv_handle_t* handle) {
(void)transReleaseExHandle(transGetRefMgt(), conn->refId);
(void)transRemoveExHandle(transGetRefMgt(), conn->refId);
STrans* pTransInst = thrd->pTransInst;
tDebug("%s conn %p destroy", transLabel(pTransInst), conn);
STrans* pInst = thrd->pInst;
tDebug("%s conn %p destroy", transLabel(pInst), conn);
for (int i = 0; i < transQueueSize(&conn->srvMsgs); i++) {
SSvrMsg* msg = transQueueGet(&conn->srvMsgs, i);
@ -1434,9 +1463,9 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
for (int i = 0; i < srv->numOfThreads; i++) {
SWorkThrd* thrd = (SWorkThrd*)taosMemoryCalloc(1, sizeof(SWorkThrd));
thrd->pTransInst = shandle;
thrd->pInst = shandle;
thrd->quit = false;
thrd->pTransInst = shandle;
thrd->pInst = shandle;
thrd->pWhiteList = uvWhiteListCreate();
srv->pThreadObj[i] = thrd;
@ -1465,9 +1494,9 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
goto End;
}
thrd->pTransInst = shandle;
thrd->pInst = shandle;
thrd->quit = false;
thrd->pTransInst = shandle;
thrd->pInst = shandle;
thrd->pWhiteList = uvWhiteListCreate();
if (thrd->pWhiteList == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
@ -1568,18 +1597,18 @@ void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd) {
uvStartSendRespImpl(msg);
return;
} else if (conn->status == ConnRelease || conn->status == ConnNormal) {
tDebug("%s conn %p already released, ignore release-msg", transLabel(thrd->pTransInst), conn);
tDebug("%s conn %p already released, ignore release-msg", transLabel(thrd->pInst), conn);
}
destroySmsg(msg);
}
void uvHandleResp(SSvrMsg* msg, SWorkThrd* thrd) {
// send msg to client
tDebug("%s conn %p start to send resp (2/2)", transLabel(thrd->pTransInst), msg->pConn);
tDebug("%s conn %p start to send resp (2/2)", transLabel(thrd->pInst), msg->pConn);
uvStartSendResp(msg);
}
void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd) {
SSvrConn* conn = msg->pConn;
tDebug("%s conn %p register brokenlink callback", transLabel(thrd->pTransInst), conn);
tDebug("%s conn %p register brokenlink callback", transLabel(thrd->pInst), conn);
if (conn->status == ConnAcquire) {
if (!transQueuePush(&conn->srvMsgs, msg)) {
return;
@ -1596,8 +1625,8 @@ void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd) {
tDebug("conn %p register brokenlink callback succ", conn);
if (conn->broken) {
STrans* pTransInst = conn->pTransInst;
(*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
STrans* pInst = conn->pInst;
(*pInst->cfp)(pInst->parent, &(conn->regArg.msg), NULL);
memset(&conn->regArg, 0, sizeof(conn->regArg));
}
taosMemoryFree(msg);
@ -1735,7 +1764,7 @@ int32_t transReleaseSrvHandle(void* handle) {
m->msg = tmsg;
m->type = Release;
tDebug("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle);
tDebug("%s conn %p start to release", transLabel(pThrd->pInst), exh->handle);
if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
destroySmsg(m);
(void)transReleaseExHandle(transGetRefMgt(), refId);
@ -1830,8 +1859,8 @@ int32_t transRegisterMsg(const STransMsg* msg) {
m->msg = tmsg;
m->type = Register;
STrans* pTransInst = pThrd->pTransInst;
tDebug("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle);
STrans* pInst = pThrd->pInst;
tDebug("%s conn %p start to register brokenlink callback", transLabel(pInst), exh->handle);
if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
destroySmsg(m);
(void)transReleaseExHandle(transGetRefMgt(), refId);
@ -1853,15 +1882,15 @@ _return2:
}
int32_t transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) {
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)thandle);
if (pTransInst == NULL) {
STrans* pInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)thandle);
if (pInst == NULL) {
return TSDB_CODE_RPC_MODULE_QUIT;
}
int32_t code = 0;
tDebug("ip-white-list update on rpc");
SServerObj* svrObj = pTransInst->tcphandle;
SServerObj* svrObj = pInst->tcphandle;
for (int i = 0; i < svrObj->numOfThreads; i++) {
SWorkThrd* pThrd = svrObj->pThreadObj[i];

View File

@ -21,6 +21,7 @@ size_t heapSize(Heap* heap) { return heap->nelts; }
Heap* heapCreate(HeapCompareFn fn) {
Heap* heap = taosMemoryCalloc(1, sizeof(Heap));
if (heap == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
@ -226,9 +227,9 @@ static size_t pqParent(size_t i) { return (--i) >> 1; /* (i - 1) / 2 */ }
static size_t pqLeft(size_t i) { return (i << 1) | 1; /* i * 2 + 1 */ }
static size_t pqRight(size_t i) { return (++i) << 1; /* (i + 1) * 2 */ }
static void pqSwapPQNode(PriorityQueueNode* a, PriorityQueueNode* b) {
void* tmp = a->data;
a->data = b->data;
b->data = tmp;
void* tmp = a->data;
a->data = b->data;
b->data = tmp;
}
#define pqContainerGetEle(pq, i) ((PriorityQueueNode*)taosArrayGet((pq)->container, (i)))