enh: adjust log format

This commit is contained in:
Simon Guan 2025-02-26 16:15:21 +08:00
parent 03d507422b
commit 61e0e77dcf
2 changed files with 159 additions and 159 deletions

View File

@ -388,7 +388,7 @@ int32_t cliGetConnTimer(SCliThrd* pThrd, SCliConn* pConn) {
tDebug("no available timer, create a timer %p", timer);
int ret = uv_timer_init(pThrd->loop, timer);
if (ret != 0) {
tError("conn %p failed to init timer %p since %s", pConn, timer, uv_err_name(ret));
tError("conn:%p, failed to init timer %p since %s", pConn, timer, uv_err_name(ret));
return TSDB_CODE_THIRDPARTY_ERROR;
}
}
@ -400,11 +400,11 @@ void cliResetConnTimer(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd;
if (conn->timer) {
if (uv_is_active((uv_handle_t*)conn->timer)) {
tDebug("%s conn %p stop timer", CONN_GET_INST_LABEL(conn), conn);
tDebug("%s conn:%p, stop timer", CONN_GET_INST_LABEL(conn), conn);
TAOS_UNUSED(uv_timer_stop(conn->timer));
}
if (taosArrayPush(pThrd->timerList, &conn->timer) == NULL) {
tError("%s conn %p failed to push timer %p to list since %s", CONN_GET_INST_LABEL(conn), conn, conn->timer,
tError("%s conn:%p, failed to push timer %p to list since %s", CONN_GET_INST_LABEL(conn), conn, conn->timer,
tstrerror(terrno));
}
conn->timer->data = NULL;
@ -435,7 +435,7 @@ void cliConnMayUpdateTimer(SCliConn* conn, int64_t timeout) {
}
int ret = uv_timer_start(conn->timer, cliConnTimeout__checkReq, timeout, 0);
if (ret != 0) {
tError("%s conn %p failed to start timer %p since %s", CONN_GET_INST_LABEL(conn), conn, conn->timer,
tError("%s conn:%p, failed to start timer %p since %s", CONN_GET_INST_LABEL(conn), conn, conn->timer,
uv_err_name(ret));
}
}
@ -493,7 +493,7 @@ int8_t cliMayRecycleConn(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd;
STrans* pInst = pThrd->pInst;
tTrace("%s conn %p in-process req summary:reqsToSend:%d, reqsSentOut:%d, statusTableSize:%d",
tTrace("%s conn:%p, in-process req summary:reqsToSend:%d, reqsSentOut:%d, statusTableSize:%d",
CONN_GET_INST_LABEL(conn), conn, transQueueSize(&conn->reqsToSend), transQueueSize(&conn->reqsSentOut),
taosHashGetSize(conn->pQTable));
@ -503,14 +503,14 @@ int8_t cliMayRecycleConn(SCliConn* conn) {
conn->forceDelFromHeap = 1;
code = delConnFromHeapCache(pThrd->connHeapCache, conn);
if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) {
tDebug("%s conn %p failed to remove conn from heap cache since %s", CONN_GET_INST_LABEL(conn), conn,
tDebug("%s conn:%p, failed to remove conn from heap cache since %s", CONN_GET_INST_LABEL(conn), conn,
tstrerror(code));
TAOS_UNUSED(transHeapMayBalance(conn->heap, conn));
return 1;
} else {
if (code != 0) {
tDebug("%s conn %p failed to remove conn from heap cache since %s", CONN_GET_INST_LABEL(conn), conn,
tDebug("%s conn:%p, failed to remove conn from heap cache since %s", CONN_GET_INST_LABEL(conn), conn,
tstrerror(code));
return 0;
}
@ -519,10 +519,10 @@ int8_t cliMayRecycleConn(SCliConn* conn) {
return 1;
} else if ((transQueueSize(&conn->reqsToSend) == 0) && (transQueueSize(&conn->reqsSentOut) == 0) &&
(taosHashGetSize(conn->pQTable) != 0)) {
tDebug("%s conn %p do balance directly", CONN_GET_INST_LABEL(conn), conn);
tDebug("%s conn:%p, do balance directly", CONN_GET_INST_LABEL(conn), conn);
TAOS_UNUSED(transHeapMayBalance(conn->heap, conn));
} else {
tTrace("%s conn %p may do balance", CONN_GET_INST_LABEL(conn), conn);
tTrace("%s conn:%p, may do balance", CONN_GET_INST_LABEL(conn), conn);
TAOS_UNUSED(transHeapMayBalance(conn->heap, conn));
}
return 0;
@ -573,12 +573,12 @@ int8_t cliMayNotifyUserOnRecvReleaseExcept(SCliConn* conn, STransMsgHead* pHead,
STraceId* trace = &pHead->traceId;
code = cliBuildExceptResp(pThrd, pReq, &resp);
if (code != 0) {
tGWarn("%s conn %p failed to build except resp for req:%" PRId64 " since %s", CONN_GET_INST_LABEL(conn), conn, qId,
tGWarn("%s conn:%p, failed to build except resp for req:%" PRId64 " since %s", CONN_GET_INST_LABEL(conn), conn, qId,
tstrerror(code));
}
code = cliNotifyCb(conn, NULL, &resp);
if (code != 0) {
tGWarn("%s conn %p failed to notify user for req:%" PRId64 " since %s", CONN_GET_INST_LABEL(conn), conn, qId,
tGWarn("%s conn:%p, failed to notify user for req:%" PRId64 " since %s", CONN_GET_INST_LABEL(conn), conn, qId,
tstrerror(code));
}
@ -593,7 +593,7 @@ int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead
int64_t qId = taosHton64(pHead->qid);
STraceId* trace = &pHead->traceId;
int64_t seqNum = taosHton64(pHead->seqNum);
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, seqNum:%" PRId64 ", sid:%" PRId64 "",
tGDebug("%s conn:%p, %s received from %s, local info:%s, len:%d, seqNum:%" PRId64 ", sid:%" PRId64 "",
CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(pHead->msgType), conn->dst, conn->src, pHead->msgLen, seqNum,
qId);
@ -602,12 +602,12 @@ int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead
code = taosHashRemove(conn->pQTable, &qId, sizeof(qId));
if (code != 0) {
tDebug("%s conn %p failed to release req:%" PRId64 " from conn", CONN_GET_INST_LABEL(conn), conn, qId);
tDebug("%s conn:%p, failed to release req:%" PRId64 " from conn", CONN_GET_INST_LABEL(conn), conn, qId);
}
code = taosHashRemove(pThrd->pIdConnTable, &qId, sizeof(qId));
if (code != 0) {
tDebug("%s conn %p failed to release req:%" PRId64 " from thrd ", CONN_GET_INST_LABEL(conn), conn, qId);
tDebug("%s conn:%p, failed to release req:%" PRId64 " from thrd ", CONN_GET_INST_LABEL(conn), conn, qId);
}
tDebug("%s %p reqToSend:%d, sentOut:%d", CONN_GET_INST_LABEL(conn), conn, transQueueSize(&conn->reqsToSend),
@ -653,7 +653,7 @@ int32_t cliHandleState_mayCreateAhandle(SCliConn* conn, STransMsgHead* pHead, ST
pCtx->st = taosGetTimestampUs();
STraceId* trace = &pHead->traceId;
pResp->info.ahandle = transCtxDumpVal(pCtx, pHead->msgType);
tGDebug("%s conn %p %s received from %s, local info:%s, sid:%" PRId64 ", create ahandle %p by %s",
tGDebug("%s conn:%p, %s received from %s, local info:%s, sid:%" PRId64 ", create ahandle %p by %s",
CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(pHead->msgType), conn->dst, conn->src, qId, pResp->info.ahandle,
TMSG_INFO(pHead->msgType));
return 0;
@ -677,17 +677,17 @@ void cliHandleResp(SCliConn* conn) {
int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead, 0);
if (msgLen < 0) {
taosMemoryFree(pHead);
tWarn("%s conn %p recv invalid packet", CONN_GET_INST_LABEL(conn), conn);
tWarn("%s conn:%p, recv invalid packet", CONN_GET_INST_LABEL(conn), conn);
// TODO: notify cb
code = pThrd->notifyExceptCb(pThrd, NULL, NULL);
if (code != 0) {
tError("%s conn %p failed to notify user since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code));
tError("%s conn:%p, failed to notify user since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code));
}
return;
}
if ((code = transDecompressMsg((char**)&pHead, &msgLen)) < 0) {
tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn);
tDebug("%s conn:%p, recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn);
// TODO: notify cb
return;
}
@ -712,7 +712,7 @@ void cliHandleResp(SCliConn* conn) {
return;
}
if (code != 0) {
tWarn("%s conn %p recv unexpected packet, msgType:%s, seqNum:%" PRId64 ", sid:%" PRId64
tWarn("%s conn:%p, recv unexpected packet, msgType:%s, seqNum:%" PRId64 ", sid:%" PRId64
", the sever may sends repeated response since %s",
CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(pHead->msgType), seq, qId, tstrerror(code));
// TODO: notify cb
@ -725,7 +725,7 @@ void cliHandleResp(SCliConn* conn) {
} else {
code = cliHandleState_mayUpdateStateTime(conn, pReq);
if (code != 0) {
tDebug("%s conn %p failed to update state time sid:%" PRId64 " since %s", CONN_GET_INST_LABEL(conn), conn, qId,
tDebug("%s conn:%p, failed to update state time sid:%" PRId64 " since %s", CONN_GET_INST_LABEL(conn), conn, qId,
tstrerror(code));
}
}
@ -733,7 +733,7 @@ void cliHandleResp(SCliConn* conn) {
code = cliBuildRespFromCont(pReq, &resp, pHead);
STraceId* trace = &resp.info.traceId;
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, seq:%" PRId64 ", sid:%" PRId64 ", code:%s",
tGDebug("%s conn:%p, %s received from %s, local info:%s, len:%d, seq:%" PRId64 ", sid:%" PRId64 ", code:%s",
CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(resp.msgType), conn->dst, conn->src, pHead->msgLen, seq, qId,
tstrerror(pHead->code));
code = cliNotifyCb(conn, pReq, &resp);
@ -760,7 +760,7 @@ void cliConnTimeout(uv_timer_t* handle) {
}
cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr);
tTrace("%s conn %p failed to connect %s since conn timeout", CONN_GET_INST_LABEL(conn), conn, conn->dstAddr);
tTrace("%s conn:%p, failed to connect %s since conn timeout", CONN_GET_INST_LABEL(conn), conn, conn->dstAddr);
TAOS_UNUSED(transUnrefCliHandle(conn));
}
@ -899,7 +899,7 @@ static int32_t cliGetConnFromPool(SCliThrd* pThrd, const char* key, SCliConn** p
transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, task);
}
tDebug("conn %p get from pool, pool size:%d, dst:%s", conn, conn->list->size, conn->dstAddr);
tDebug("conn:%p, get from pool, pool size:%d, dst:%s", conn, conn->list->size, conn->dstAddr);
*ppConn = conn;
return 0;
@ -941,7 +941,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
QUEUE_INIT(&conn->q);
QUEUE_PUSH(&conn->list->conns, &conn->q);
conn->list->size += 1;
tDebug("conn %p added to pool, pool size: %d, dst: %s", conn, conn->list->size, conn->dstAddr);
tDebug("conn:%p, added to pool, pool size: %d, dst: %s", conn, conn->list->size, conn->dstAddr);
conn->heapMissHit = 0;
@ -961,7 +961,7 @@ static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_
SConnBuffer* pBuf = &conn->readBuf;
int32_t code = transAllocBuffer(pBuf, buf);
if (code < 0) {
tError("conn %p failed to alloc buffer, since %s", conn, tstrerror(code));
tError("conn:%p, failed to alloc buffer, since %s", conn, tstrerror(code));
}
}
static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
@ -975,14 +975,14 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
SCliConn* conn = handle->data;
code = transSetReadOption((uv_handle_t*)handle);
if (code != 0) {
tWarn("%s conn %p failed to set recv opt since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code));
tWarn("%s conn:%p, failed to set recv opt since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code));
}
SConnBuffer* pBuf = &conn->readBuf;
if (nread > 0) {
pBuf->len += nread;
while (transReadComplete(pBuf)) {
tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
tTrace("%s conn:%p, read complete", CONN_GET_INST_LABEL(conn), conn);
if (pBuf->invalid) {
conn->broken = true;
TAOS_UNUSED(transUnrefCliHandle(conn));
@ -999,11 +999,11 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
// ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb
// nread might be 0, which does not indicate an error or EOF. This is equivalent to EAGAIN or EWOULDBLOCK under
// read(2).
tTrace("%s conn %p read empty", CONN_GET_INST_LABEL(conn), conn);
tTrace("%s conn:%p, read empty", CONN_GET_INST_LABEL(conn), conn);
return;
}
if (nread < 0) {
tDebug("%s conn %p read error:%s, ref:%d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread),
tDebug("%s conn:%p, read error:%s, ref:%d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread),
transGetRefCount(conn));
conn->broken = true;
TAOS_UNUSED(transUnrefCliHandle(conn));
@ -1136,10 +1136,10 @@ static void cliDestroyAllQidFromThrd(SCliConn* conn) {
code = taosHashRemove(pThrd->pIdConnTable, qid, sizeof(*qid));
if (code != 0) {
tDebug("%s conn %p failed to remove state %" PRId64 " since %s", CONN_GET_INST_LABEL(conn), conn, *qid,
tDebug("%s conn:%p, failed to remove state %" PRId64 " since %s", CONN_GET_INST_LABEL(conn), conn, *qid,
tstrerror(code));
} else {
tDebug("%s conn %p destroy sid::%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, *qid);
tDebug("%s conn:%p, destroy sid::%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, *qid);
}
STransCtx* ctx = pIter;
@ -1163,17 +1163,17 @@ static void cliDestroy(uv_handle_t* handle) {
SCliThrd* pThrd = conn->hostThrd;
cliResetConnTimer(conn);
tDebug("%s conn %p try to destroy", CONN_GET_INST_LABEL(conn), conn);
tDebug("%s conn:%p, try to destroy", CONN_GET_INST_LABEL(conn), conn);
code = destroyAllReqs(conn);
if (code != 0) {
tDebug("%s conn %p failed to all reqs since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code));
tDebug("%s conn:%p, failed to all reqs since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code));
}
conn->forceDelFromHeap = 1;
code = delConnFromHeapCache(pThrd->connHeapCache, conn);
if (code != 0) {
tDebug("%s conn %p failed to del conn from heapcach since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code));
tDebug("%s conn:%p, failed to del conn from heapcach since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code));
}
taosMemoryFree(conn->dstAddr);
@ -1190,7 +1190,7 @@ static void cliDestroy(uv_handle_t* handle) {
destroyWQ(&conn->wq);
transDestroyBuffer(&conn->readBuf);
tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
tTrace("%s conn:%p, destroy successfully", CONN_GET_INST_LABEL(conn), conn);
taosMemoryFree(conn);
}
@ -1217,12 +1217,12 @@ static void notifyAndDestroyReq(SCliConn* pConn, SCliReq* pReq, int32_t code) {
}
STraceId* trace = &resp.info.traceId;
tDebug("%s conn %p notify user and destroy msg %s since %s", CONN_GET_INST_LABEL(pConn), pConn,
tDebug("%s conn:%p, notify user and destroy msg %s since %s", CONN_GET_INST_LABEL(pConn), pConn,
TMSG_INFO(pReq->msg.msgType), tstrerror(resp.code));
// handle noresp and inter manage msg
if (pCtx == NULL || REQUEST_NO_RESP(&pReq->msg)) {
tDebug("%s conn %p destroy %s msg directly since %s", CONN_GET_INST_LABEL(pConn), pConn,
tDebug("%s conn:%p, destroy %s msg directly since %s", CONN_GET_INST_LABEL(pConn), pConn,
TMSG_INFO(pReq->msg.msgType), tstrerror(resp.code));
destroyReq(pReq);
return;
@ -1271,7 +1271,7 @@ static void cliHandleException(SCliConn* conn) {
cliResetConnTimer(conn);
code = destroyAllReqs(conn);
if (code != 0) {
tError("%s conn %p failed to destroy all reqs on conn since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code));
tError("%s conn:%p, failed to destroy all reqs on conn since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code));
}
cliDestroyAllQidFromThrd(conn);
@ -1288,13 +1288,13 @@ static void cliHandleException(SCliConn* conn) {
conn->forceDelFromHeap = 1;
code = delConnFromHeapCache(pThrd->connHeapCache, conn);
if (code != 0) {
tError("%s conn %p failed to del conn from heapcach since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code));
tError("%s conn:%p, failed to del conn from heapcach since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code));
}
if (conn->registered) {
int8_t ref = transGetRefCount(conn);
if (ref == 0 && !uv_is_closing((uv_handle_t*)conn->stream)) {
// tTrace("%s conn %p fd %d,%d,%d,%p uv_closed", CONN_GET_INST_LABEL(conn), conn, conn->stream->u.fd,
// tTrace("%s conn:%p, fd %d,%d,%d,%p uv_closed", CONN_GET_INST_LABEL(conn), conn, conn->stream->u.fd,
// conn->stream->io_watcher.fd, conn->stream->accepted_fd, conn->stream->queued_fds);
uv_close((uv_handle_t*)conn->stream, cliDestroy);
}
@ -1346,7 +1346,7 @@ static void cliBatchSendCb(uv_write_t* req, int status) {
cliConnRmReqs(conn);
if (status != 0) {
tDebug("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(status));
tDebug("%s conn:%p, failed to send msg since %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(status));
TAOS_UNUSED(transUnrefCliHandle(conn));
return;
}
@ -1355,7 +1355,7 @@ static void cliBatchSendCb(uv_write_t* req, int status) {
if (conn->readerStart == 0) {
code = uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
if (code != 0) {
tDebug("%s conn %p failed to start read since%s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code));
tDebug("%s conn:%p, failed to start read since%s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code));
TAOS_UNUSED(transUnrefCliHandle(conn));
return;
}
@ -1365,7 +1365,7 @@ static void cliBatchSendCb(uv_write_t* req, int status) {
if (!cliMayRecycleConn(conn)) {
code = cliBatchSend(conn, 1);
if (code != 0) {
tDebug("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code));
tDebug("%s conn:%p, failed to send msg since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code));
TAOS_UNUSED(transUnrefCliHandle(conn));
}
}
@ -1435,7 +1435,7 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) {
}
QUEUE_PUSH(&pThrd->batchSendSet, &pConn->batchSendq);
pConn->inThreadSendq = 1;
tDebug("%s conn %p batch send later", pInst->label, pConn);
tDebug("%s conn:%p, batch send later", pInst->label, pConn);
return 0;
}
@ -1443,7 +1443,7 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) {
int32_t totalLen = 0;
if (size == 0) {
tDebug("%s conn %p not msg to send", pInst->label, pConn);
tDebug("%s conn:%p, not msg to send", pInst->label, pConn);
return 0;
}
uv_buf_t* wb = NULL;
@ -1520,7 +1520,7 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) {
pCliMsg->sent = 1;
STraceId* trace = &pCliMsg->msg.info.traceId;
tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%" PRId64 ", sid:%" PRId64 "", CONN_GET_INST_LABEL(pConn),
tGDebug("%s conn:%p, %s is sent to %s, local info:%s, seq:%" PRId64 ", sid:%" PRId64 "", CONN_GET_INST_LABEL(pConn),
pConn, TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq, pReq->info.qId);
transQueuePush(&pConn->reqsSentOut, &pCliMsg->q);
@ -1536,7 +1536,7 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) {
uv_write_t* req = allocWReqFromWQ(&pConn->wq, pConn);
if (req == NULL) {
tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(terrno));
tError("%s conn:%p, failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(terrno));
while (!QUEUE_IS_EMPTY(&reqToSend)) {
queue* h = QUEUE_HEAD(&reqToSend);
SCliReq* pCliMsg = QUEUE_DATA(h, SCliReq, sendQ);
@ -1550,11 +1550,11 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) {
SWReqsWrapper* pWreq = req->data;
QUEUE_MOVE(&reqToSend, &pWreq->node);
tDebug("%s conn %p start to send msg, batch size:%d, len:%d", CONN_GET_INST_LABEL(pConn), pConn, j, totalLen);
tDebug("%s conn:%p, start to send msg, batch size:%d, len:%d", CONN_GET_INST_LABEL(pConn), pConn, j, totalLen);
int32_t ret = uv_write(req, (uv_stream_t*)pConn->stream, wb, j, cliBatchSendCb);
if (ret != 0) {
tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(ret));
tError("%s conn:%p, failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(ret));
while (!QUEUE_IS_EMPTY(&pWreq->node)) {
queue* h = QUEUE_HEAD(&pWreq->node);
SCliReq* pCliMsg = QUEUE_DATA(h, SCliReq, sendQ);
@ -1617,23 +1617,23 @@ static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn) {
addr.sin_addr.s_addr = ipaddr;
addr.sin_port = (uint16_t)htons(conn->port);
tTrace("%s conn %p try to connect to %s", pInst->label, conn, conn->dstAddr);
tTrace("%s conn:%p, try to connect to %s", pInst->label, conn, conn->dstAddr);
int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10);
if (fd < 0) {
TAOS_CHECK_GOTO(terrno, &lino, _exception1);
}
tTrace("%s conn %p fd %d openend", pInst->label, conn, fd);
tTrace("%s conn:%p, fd %d openend", pInst->label, conn, fd);
int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd);
if (ret != 0) {
tError("%s conn %p failed to set stream since %s", transLabel(pInst), conn, uv_err_name(ret));
tError("%s conn:%p, failed to set stream since %s", transLabel(pInst), conn, uv_err_name(ret));
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception1);
}
ret = transSetConnOption((uv_tcp_t*)conn->stream, 20);
if (ret != 0) {
tError("%s conn %p failed to set socket opt since %s", transLabel(pInst), conn, uv_err_name(ret));
tError("%s conn:%p, failed to set socket opt since %s", transLabel(pInst), conn, uv_err_name(ret));
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception1);
return code;
}
@ -1656,19 +1656,19 @@ static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn) {
transRefCliHandle(conn);
ret = uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
if (ret != 0) {
tError("%s conn %p failed to start timer since %s", transLabel(pInst), conn, uv_err_name(ret));
tError("%s conn:%p, failed to start timer since %s", transLabel(pInst), conn, uv_err_name(ret));
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception2);
}
return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
_exception1:
tError("%s conn %p failed to do connect since %s", transLabel(pInst), conn, tstrerror(code));
tError("%s conn:%p, failed to do connect since %s", transLabel(pInst), conn, tstrerror(code));
cliDestroyConn(conn, true);
return code;
_exception2:
TAOS_UNUSED(transUnrefCliHandle(conn));
tError("%s conn %p failed to do connect since %s", transLabel(pInst), conn, tstrerror(code));
tError("%s conn:%p, failed to do connect since %s", transLabel(pInst), conn, tstrerror(code));
return code;
}
@ -1738,7 +1738,7 @@ void cliConnCb(uv_connect_t* req, int status) {
STUB_RAND_NETWORK_ERR(status);
if (status != 0) {
tError("%s conn %p failed to connect to %s since %s", CONN_GET_INST_LABEL(pConn), pConn, pConn->dstAddr,
tError("%s conn:%p, failed to connect to %s since %s", CONN_GET_INST_LABEL(pConn), pConn, pConn->dstAddr,
uv_strerror(status));
cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, pConn->dstAddr);
TAOS_UNUSED(transUnrefCliHandle(pConn));
@ -1747,14 +1747,14 @@ void cliConnCb(uv_connect_t* req, int status) {
pConn->connnected = 1;
code = cliConnSetSockInfo(pConn);
if (code != 0) {
tDebug("%s conn %p failed to get sock info since %s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(code));
tDebug("%s conn:%p, failed to get sock info since %s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(code));
TAOS_UNUSED(transUnrefCliHandle(pConn));
}
tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
tTrace("%s conn:%p, connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
code = cliBatchSend(pConn, 1);
if (code != 0) {
tDebug("%s conn %p failed to get sock info since %s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(code));
tDebug("%s conn:%p, failed to get sock info since %s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(code));
TAOS_UNUSED(transUnrefCliHandle(pConn));
}
}
@ -1952,7 +1952,7 @@ int32_t cliHandleState_mayUpdateStateCtx(SCliConn* pConn, SCliReq* pReq) {
SReqCtx* pCtx = pReq->ctx;
SCliThrd* pThrd = pConn->hostThrd;
if (pCtx == NULL) {
tDebug("%s conn %p not need to update statue ctx, sid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid);
tDebug("%s conn:%p, not need to update statue ctx, sid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid);
return 0;
}
@ -1960,11 +1960,11 @@ int32_t cliHandleState_mayUpdateStateCtx(SCliConn* pConn, SCliReq* pReq) {
if (pUserCtx == NULL) {
pCtx->userCtx.st = taosGetTimestampUs();
code = taosHashPut(pConn->pQTable, &qid, sizeof(qid), &pCtx->userCtx, sizeof(pCtx->userCtx));
tDebug("%s conn %p succ to add statue ctx, sid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid);
tDebug("%s conn:%p, succ to add statue ctx, sid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid);
} else {
transCtxMerge(pUserCtx, &pCtx->userCtx);
pUserCtx->st = taosGetTimestampUs();
tDebug("%s conn %p succ to update statue ctx, sid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid);
tDebug("%s conn:%p, succ to update statue ctx, sid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid);
}
return 0;
}
@ -1987,12 +1987,12 @@ int32_t cliMayGetStateByQid(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn) {
transReleaseExHandle(transGetRefMgt(), qid);
return TSDB_CODE_RPC_STATE_DROPED;
}
tDebug("%s conn %p failed to get statue, sid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid);
tDebug("%s conn:%p, failed to get statue, sid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid);
transReleaseExHandle(transGetRefMgt(), qid);
return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
} else {
*pConn = pState->conn;
tDebug("%s conn %p succ to get conn of statue, sid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid);
tDebug("%s conn:%p, succ to get conn of statue, sid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid);
}
transReleaseExHandle(transGetRefMgt(), qid);
return 0;
@ -2010,10 +2010,10 @@ int32_t cliHandleState_mayUpdateState(SCliConn* pConn, SCliReq* pReq) {
SReqState state = {.conn = pConn, .arg = NULL};
code = taosHashPut(pThrd->pIdConnTable, &qid, sizeof(qid), &state, sizeof(state));
if (code != 0) {
tDebug("%s conn %p failed to statue, sid:%" PRId64 " since %s", transLabel(pThrd->pInst), pConn, qid,
tDebug("%s conn:%p, failed to statue, sid:%" PRId64 " since %s", transLabel(pThrd->pInst), pConn, qid,
tstrerror(code));
} else {
tDebug("%s conn %p succ to add statue, sid:%" PRId64 " (1)", transLabel(pThrd->pInst), pConn, qid);
tDebug("%s conn:%p, succ to add statue, sid:%" PRId64 " (1)", transLabel(pThrd->pInst), pConn, qid);
}
TAOS_UNUSED(cliHandleState_mayUpdateStateCtx(pConn, pReq));
@ -2050,7 +2050,7 @@ void cliHandleBatchReq(SCliThrd* pThrd, SCliReq* pReq) {
} else if (code == 0) {
code = addConnToHeapCache(pThrd->connHeapCache, pConn);
if (code != 0) {
tWarn("%s conn %p failed to added to heap cache since %s", pInst->label, pConn, tstrerror(code));
tWarn("%s conn:%p, failed to added to heap cache since %s", pInst->label, pConn, tstrerror(code));
}
} else {
if (code == TSDB_CODE_OUT_OF_MEMORY && pConn == NULL) {
@ -2063,11 +2063,11 @@ void cliHandleBatchReq(SCliThrd* pThrd, SCliReq* pReq) {
}
code = cliSendReq(pConn, pReq);
if (code != 0) {
tWarn("%s conn %p failed to send req since %s", pInst->label, pConn, tstrerror(code));
tWarn("%s conn:%p, failed to send req since %s", pInst->label, pConn, tstrerror(code));
TAOS_UNUSED(transUnrefCliHandle(pConn));
}
tTrace("%s conn %p ready", pInst->label, pConn);
tTrace("%s conn:%p, ready", pInst->label, pConn);
return;
_exception:
@ -2110,7 +2110,7 @@ static void cliDoReq(queue* wq, SCliThrd* pThrd) {
QUEUE_INIT(&conn->batchSendq);
code = cliBatchSend(conn, 1);
if (code != 0) {
tWarn("%s conn %p failed to send req since %s", pThrd->pInst->label, conn, tstrerror(code));
tWarn("%s conn:%p, failed to send req since %s", pThrd->pInst->label, conn, tstrerror(code));
TAOS_UNUSED(transUnrefCliHandle(conn));
}
}
@ -2160,7 +2160,7 @@ static FORCE_INLINE void destroyReq(void* arg) {
removeReqFromSendQ(pReq);
STraceId* trace = &pReq->msg.info.traceId;
tGDebug("free memory:%p, free ctx: %p", pReq, pReq->ctx);
tGTrace("free memory:%p, free ctx:%p", pReq, pReq->ctx);
if (pReq->ctx) {
destroyReqCtx(pReq->ctx);
@ -2517,7 +2517,7 @@ static FORCE_INLINE void doDelayTask(void* param) {
static FORCE_INLINE void doCloseIdleConn(void* param) {
STaskArg* arg = param;
SCliConn* conn = arg->param1;
tDebug("%s conn %p idle, close it", CONN_GET_INST_LABEL(conn), conn);
tDebug("%s conn:%p, idle, close it", CONN_GET_INST_LABEL(conn), conn);
conn->task = NULL;
taosMemoryFree(arg);
@ -2559,7 +2559,7 @@ static FORCE_INLINE void cliPerfLog_epset(SCliConn* pConn, SCliReq* pReq) {
tWarn("failed to debug epset since %s", tstrerror(code));
return;
}
tTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
tTrace("%s conn:%p, extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
return;
}
@ -2846,10 +2846,10 @@ int32_t cliNotifyImplCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) {
return 0;
}
if (pCtx->pSem || pCtx->syncMsgRef != 0) {
tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn);
tGTrace("%s conn:%p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn);
if (pCtx->pSem) {
if (pCtx->pRsp == NULL) {
tGTrace("%s conn %p(sync) failed to resp, ignore", CONN_GET_INST_LABEL(pConn), pConn);
tGTrace("%s conn:%p(sync) failed to resp, ignore", CONN_GET_INST_LABEL(pConn), pConn);
} else {
memcpy((char*)pCtx->pRsp, (char*)pResp, sizeof(*pResp));
}
@ -2875,7 +2875,7 @@ int32_t cliNotifyImplCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) {
}
}
} else {
tGTrace("%s conn %p handle resp", CONN_GET_INST_LABEL(pConn), pConn);
tGTrace("%s conn:%p, handle resp", CONN_GET_INST_LABEL(pConn), pConn);
if (pResp->info.hasEpSet == 1) {
SEpSet epset = {0};
if (transCreateUserEpsetFromReqEpset(pCtx->epSet, &epset) != 0) {
@ -2938,7 +2938,7 @@ void transRefCliHandle(void* handle) {
SCliConn* conn = (SCliConn*)handle;
conn->ref++;
tTrace("%s conn %p ref %d", CONN_GET_INST_LABEL(conn), conn, conn->ref);
tTrace("%s conn:%p, ref %d", CONN_GET_INST_LABEL(conn), conn, conn->ref);
}
int32_t transUnrefCliHandle(void* handle) {
if (handle == NULL) {
@ -2949,7 +2949,7 @@ int32_t transUnrefCliHandle(void* handle) {
conn->ref--;
ref = conn->ref;
tTrace("%s conn %p ref:%d", CONN_GET_INST_LABEL(conn), conn, conn->ref);
tTrace("%s conn:%p, ref:%d", CONN_GET_INST_LABEL(conn), conn, conn->ref);
if (conn->ref == 0) {
cliDestroyConn(conn, true);
}
@ -3615,7 +3615,7 @@ static void cliConnRemoveTimoutQidMsg(SCliConn* pConn, int64_t* st, queue* set)
if (((*st - pCtx->st) / 1000000) >= pInst->readTimeout) {
code = taosHashRemove(pThrd->pIdConnTable, qid, sizeof(*qid));
if (code != 0) {
tError("%s conn %p failed to remove state sid:%" PRId64 " since %s", CONN_GET_INST_LABEL(pConn), pConn, *qid,
tError("%s conn:%p, failed to remove state sid:%" PRId64 " since %s", CONN_GET_INST_LABEL(pConn), pConn, *qid,
tstrerror(code));
}
@ -3624,11 +3624,11 @@ static void cliConnRemoveTimoutQidMsg(SCliConn* pConn, int64_t* st, queue* set)
if (taosArrayPush(pQIdBuf, qid) == NULL) {
code = terrno;
tError("%s conn %p failed to add sid:%" PRId64 " since %s", CONN_GET_INST_LABEL(pConn), pConn, *qid,
tError("%s conn:%p, failed to add sid:%" PRId64 " since %s", CONN_GET_INST_LABEL(pConn), pConn, *qid,
tstrerror(code));
break;
}
tWarn("%s conn %p remove timeout msg sid:%" PRId64 "", CONN_GET_INST_LABEL(pConn), pConn, *qid);
tWarn("%s conn:%p, remove timeout msg sid:%" PRId64 "", CONN_GET_INST_LABEL(pConn), pConn, *qid);
}
pIter = taosHashIterate(pConn->pQTable, pIter);
}
@ -3642,7 +3642,7 @@ static void cliConnRemoveTimoutQidMsg(SCliConn* pConn, int64_t* st, queue* set)
transCtxCleanup(p);
code = taosHashRemove(pConn->pQTable, qid, sizeof(*qid));
if (code != 0) {
tError("%s conn %p failed to drop ctx of sid:%" PRId64 " since %s", CONN_GET_INST_LABEL(pConn), pConn, *qid,
tError("%s conn:%p, failed to drop ctx of sid:%" PRId64 " since %s", CONN_GET_INST_LABEL(pConn), pConn, *qid,
tstrerror(code));
}
}
@ -3674,7 +3674,7 @@ static int8_t cliConnRemoveTimeoutMsg(SCliConn* pConn) {
if (QUEUE_IS_EMPTY(&set)) {
return 0;
}
tWarn("%s conn %p do remove timeout msg", pInst->label, pConn);
tWarn("%s conn:%p, do remove timeout msg", pInst->label, pConn);
destroyReqInQueue(pConn, &set, TSDB_CODE_RPC_TIMEOUT);
return 1;
}
@ -3682,7 +3682,7 @@ static FORCE_INLINE int8_t shouldSWitchToOtherConn(SCliConn* pConn, char* key) {
SCliThrd* pThrd = pConn->hostThrd;
STrans* pInst = pThrd->pInst;
tDebug("get conn %p from heap cache for key:%s, status:%d, refCnt:%d", pConn, key, pConn->inHeap, pConn->reqRefCnt);
tDebug("get conn:%p from heap cache for key:%s, status:%d, refCnt:%d", pConn, key, pConn->inHeap, pConn->reqRefCnt);
int32_t reqsNum = transQueueSize(&pConn->reqsToSend);
int32_t reqsSentOut = transQueueSize(&pConn->reqsSentOut);
int32_t stateNum = taosHashGetSize(pConn->pQTable);
@ -3694,14 +3694,14 @@ static FORCE_INLINE int8_t shouldSWitchToOtherConn(SCliConn* pConn, char* key) {
if (pConn->list == NULL && pConn->dstAddr != NULL) {
pConn->list = taosHashGet((SHashObj*)pThrd->pool, pConn->dstAddr, strlen(pConn->dstAddr));
if (pConn->list != NULL) {
tTrace("conn %p get list %p from pool for key:%s", pConn, pConn->list, key);
tTrace("conn:%p, get list %p from pool for key:%s", pConn, pConn->list, key);
}
}
if (pConn->list && pConn->list->totalSize >= pInst->connLimitNum / 4) {
tWarn("%s conn %p try to remove timeout msg since too many conn created", transLabel(pInst), pConn);
tWarn("%s conn:%p, try to remove timeout msg since too many conn created", transLabel(pInst), pConn);
if (cliConnRemoveTimeoutMsg(pConn)) {
tWarn("%s conn %p succ to remove timeout msg", transLabel(pInst), pConn);
tWarn("%s conn:%p, succ to remove timeout msg", transLabel(pInst), pConn);
}
return 1;
}
@ -3724,7 +3724,7 @@ static FORCE_INLINE void logConnMissHit(SCliConn* pConn) {
SCliThrd* pThrd = pConn->hostThrd;
STrans* pInst = pThrd->pInst;
pConn->heapMissHit++;
tDebug("conn %p has %d reqs, %d sentout and %d status in process, total limit:%d, switch to other conn", pConn,
tDebug("conn:%p, has %d reqs, %d sentout and %d status in process, total limit:%d, switch to other conn", pConn,
transQueueSize(&pConn->reqsToSend), transQueueSize(&pConn->reqsSentOut), taosHashGetSize(pConn->pQTable),
pInst->shareConnLimit);
// if (transQueueSize(&pConn->reqsSentOut) >= pInst->shareConnLimit) {
@ -3745,12 +3745,12 @@ static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) {
tTrace("failed to get conn from heap cache for key:%s", key);
return NULL;
} else {
tTrace("conn %p get conn from heap cache for key:%s", pConn, key);
tTrace("conn:%p, get conn from heap cache for key:%s", pConn, key);
if (shouldSWitchToOtherConn(pConn, key)) {
SCliConn* pNewConn = NULL;
code = balanceConnHeapCache(pConnHeapCache, pConn, &pNewConn);
if (code == 1) {
tTrace("conn %p start to handle reqs", pNewConn);
tTrace("conn:%p, start to handle reqs", pNewConn);
return pNewConn;
}
return NULL;
@ -3765,7 +3765,7 @@ static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn) {
if (pConn->heap != NULL) {
p = pConn->heap;
tTrace("conn %p add to heap cache for key:%s, status:%d, refCnt:%d, add direct", pConn, pConn->dstAddr,
tTrace("conn:%p, add to heap cache for key:%s, status:%d, refCnt:%d, add direct", pConn, pConn->dstAddr,
pConn->inHeap, pConn->reqRefCnt);
} else {
code = getOrCreateHeap(pConnHeapCacahe, pConn->dstAddr, &p);
@ -3781,14 +3781,14 @@ static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn) {
}
code = transHeapInsert(p, pConn);
tTrace("conn %p add to heap cache for key:%s, status:%d, refCnt:%d", pConn, pConn->dstAddr, pConn->inHeap,
tTrace("conn:%p, add to heap cache for key:%s, status:%d, refCnt:%d", pConn, pConn->dstAddr, pConn->inHeap,
pConn->reqRefCnt);
return code;
}
static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn) {
if (pConn->heap != NULL) {
tTrace("conn %p try to delete from heap cache direct", pConn);
tTrace("conn:%p, try to delete from heap cache direct", pConn);
return transHeapDelete(pConn->heap, pConn);
}
@ -3799,7 +3799,7 @@ static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn) {
}
int32_t code = transHeapDelete(p, pConn);
if (code != 0) {
tTrace("conn %p failed delete from heap cache since %s", pConn, tstrerror(code));
tTrace("conn:%p, failed delete from heap cache since %s", pConn, tstrerror(code));
}
return code;
}
@ -3864,7 +3864,7 @@ int32_t transHeapInsert(SHeap* heap, SCliConn* p) {
// impl later
p->reqRefCnt++;
if (p->inHeap == 1) {
tTrace("failed to insert conn %p since already in heap", p);
tTrace("failed to insert conn:%p since already in heap", p);
return TSDB_CODE_DUP_KEY;
}
@ -3881,12 +3881,12 @@ int32_t transHeapDelete(SHeap* heap, SCliConn* p) {
}
if (p->inHeap == 0) {
tTrace("failed to del conn %p since not in heap", p);
tTrace("failed to del conn:%p since not in heap", p);
return 0;
} else {
int64_t now = taosGetTimestampMs();
if (p->forceDelFromHeap == 0 && now - p->lastAddHeapTime < 10000) {
tTrace("conn %p not added/delete to heap frequently", p);
tTrace("conn:%p, not added/delete to heap frequently", p);
return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
}
}
@ -3895,11 +3895,11 @@ int32_t transHeapDelete(SHeap* heap, SCliConn* p) {
p->reqRefCnt--;
if (p->reqRefCnt == 0) {
heapRemove(heap->heap, &p->node);
tTrace("conn %p delete from heap", p);
tTrace("conn:%p, delete from heap", p);
} else if (p->reqRefCnt < 0) {
tTrace("conn %p has %d reqs, not delete from heap,assert", p, p->reqRefCnt);
tTrace("conn:%p, has %d reqs, not delete from heap,assert", p, p->reqRefCnt);
} else {
tTrace("conn %p has %d reqs, not delete from heap", p, p->reqRefCnt);
tTrace("conn:%p, has %d reqs, not delete from heap", p, p->reqRefCnt);
}
return 0;
}

View File

@ -215,7 +215,7 @@ void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b
SConnBuffer* pBuf = &conn->readBuf;
int32_t code = transAllocBuffer(pBuf, buf);
if (code < 0) {
tError("conn %p failed to alloc buffer, since %s", conn, tstrerror(code));
tError("conn:%p, failed to alloc buffer, since %s", conn, tstrerror(code));
}
}
@ -393,24 +393,24 @@ static void uvPerfLog_receive(SSvrConn* pConn, STransMsgHead* pHead, STransMsg*
if (pConn->status == ConnNormal && pHead->noResp == 0) {
if (cost >= EXCEPTION_LIMIT_US) {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception, seqNum:%" PRId64
tGDebug("%s conn:%p, %s received from %s, local info:%s, len:%d, cost:%dus, recv exception, seqNum:%" PRId64
", sid:%" PRId64 "",
transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen,
(int)cost, pTransMsg->info.seqNum, pTransMsg->info.qId);
} else {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, seqNum:%" PRId64 ", sid:%" PRId64 "",
tGDebug("%s conn:%p, %s received from %s, local info:%s, len:%d, cost:%dus, seqNum:%" PRId64 ", sid:%" PRId64 "",
transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen,
(int)cost, pTransMsg->info.seqNum, pTransMsg->info.qId);
}
} else {
if (cost >= EXCEPTION_LIMIT_US) {
tGDebug(
"%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, recv exception, "
"%s conn:%p, %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, recv exception, "
"seqNum:%" PRId64 ", sid:%" PRId64 "",
transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen,
pHead->noResp, pTransMsg->code, (int)(cost), pTransMsg->info.seqNum, pTransMsg->info.qId);
} else {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, seqNum:%" PRId64
tGDebug("%s conn:%p, %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, seqNum:%" PRId64
", "
"sid:%" PRId64 "",
transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen,
@ -440,23 +440,23 @@ static int32_t uvMayHandleReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) {
if (pHead->msgType == TDMT_SCH_TASK_RELEASE) {
int64_t qId = taosHton64(pHead->qid);
if (qId <= 0) {
tError("conn %p recv release, but invalid sid:%" PRId64 "", pConn, qId);
tError("conn:%p, recv release, but invalid sid:%" PRId64 "", pConn, qId);
code = TSDB_CODE_RPC_NO_STATE;
} else {
void* p = taosHashGet(pConn->pQTable, &qId, sizeof(qId));
if (p == NULL) {
code = TSDB_CODE_RPC_NO_STATE;
tTrace("conn %p recv release, and releady release by server sid:%" PRId64 "", pConn, qId);
tTrace("conn:%p, recv release, and releady release by server sid:%" PRId64 "", pConn, qId);
} else {
SSvrRegArg* arg = p;
(pInst->cfp)(pInst->parent, &(arg->msg), NULL);
tTrace("conn %p recv release, notify server app, sid:%" PRId64 "", pConn, qId);
tTrace("conn:%p, recv release, notify server app, sid:%" PRId64 "", pConn, qId);
code = taosHashRemove(pConn->pQTable, &qId, sizeof(qId));
if (code != 0) {
tDebug("conn %p failed to remove sid:%" PRId64 "", pConn, qId);
tDebug("conn:%p, failed to remove sid:%" PRId64 "", pConn, qId);
}
tTrace("conn %p clear state,sid:%" PRId64 "", pConn, qId);
tTrace("conn:%p, clear state,sid:%" PRId64 "", pConn, qId);
}
}
@ -468,7 +468,7 @@ static int32_t uvMayHandleReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) {
SSvrRespMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrRespMsg));
if (srvMsg == NULL) {
tError("conn %p recv release, failed to send release-resp since %s", pConn, tstrerror(terrno));
tError("conn:%p, recv release, failed to send release-resp since %s", pConn, tstrerror(terrno));
taosMemoryFree(pHead);
return terrno;
}
@ -496,7 +496,7 @@ bool uvConnMayGetUserInfo(SSvrConn* pConn, STransMsgHead** ppHead, int32_t* msgL
if (pHead->withUserInfo) {
STransMsgHead* tHead = taosMemoryCalloc(1, len - sizeof(pInst->user));
if (tHead == NULL) {
tError("conn %p failed to get user info since %s", pConn, tstrerror(terrno));
tError("conn:%p, failed to get user info since %s", pConn, tstrerror(terrno));
return false;
}
memcpy((char*)tHead, (char*)pHead, TRANS_MSG_OVERHEAD);
@ -524,28 +524,28 @@ static bool uvHandleReq(SSvrConn* pConn) {
int8_t resetBuf = 0;
int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&pHead, 0);
if (msgLen <= 0) {
tError("%s conn %p read invalid packet", transLabel(pInst), pConn);
tError("%s conn:%p, read invalid packet", transLabel(pInst), pConn);
return false;
}
if (transDecompressMsg((char**)&pHead, &msgLen) < 0) {
tError("%s conn %p recv invalid packet, failed to decompress", transLabel(pInst), pConn);
tError("%s conn:%p, recv invalid packet, failed to decompress", transLabel(pInst), pConn);
taosMemoryFree(pHead);
return false;
}
if (uvConnMayGetUserInfo(pConn, &pHead, &msgLen) == true) {
tDebug("%s conn %p get user info", transLabel(pInst), pConn);
tDebug("%s conn:%p, get user info", transLabel(pInst), pConn);
} else {
if (pConn->userInited == 0) {
taosMemoryFree(pHead);
tDebug("%s conn %p failed get user info since %s", transLabel(pInst), pConn, tstrerror(terrno));
tDebug("%s conn:%p, failed get user info since %s", transLabel(pInst), pConn, tstrerror(terrno));
return false;
}
tDebug("%s conn %p no need get user info", transLabel(pInst), pConn);
tDebug("%s conn:%p, no need get user info", transLabel(pInst), pConn);
}
if (resetBuf == 0) {
tTrace("%s conn %p not reset read buf", transLabel(pInst), pConn);
tTrace("%s conn:%p, not reset read buf", transLabel(pInst), pConn);
}
pHead->code = htonl(pHead->code);
@ -573,7 +573,7 @@ static bool uvHandleReq(SSvrConn* pConn) {
if (pHead->seqNum == 0) {
STraceId* trace = &pHead->traceId;
tGError("%s conn %p received invalid seqNum, msgType:%s", transLabel(pInst), pConn, TMSG_INFO(pHead->msgType));
tGError("%s conn:%p, received invalid seqNum, msgType:%s", transLabel(pInst), pConn, TMSG_INFO(pHead->msgType));
return false;
}
@ -623,7 +623,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
code = transSetReadOption((uv_handle_t*)cli);
if (code != 0) {
tWarn("%s conn %p failed to set recv opt since %s", transLabel(pInst), conn, tstrerror(code));
tWarn("%s conn:%p, failed to set recv opt since %s", transLabel(pInst), conn, tstrerror(code));
}
SConnBuffer* pBuf = &conn->readBuf;
@ -632,7 +632,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
if (pBuf->len <= TRANS_PACKET_LIMIT) {
while (transReadComplete(pBuf)) {
if (true == pBuf->invalid || false == uvHandleReq(conn)) {
tError("%s conn %p read invalid packet, received from %s, local info:%s", transLabel(pInst), conn, conn->dst,
tError("%s conn:%p, read invalid packet, received from %s, local info:%s", transLabel(pInst), conn, conn->dst,
conn->src);
conn->broken = true;
transUnrefSrvHandle(conn);
@ -641,7 +641,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
}
return;
} else {
tError("%s conn %p read invalid packet, exceed limit, received from %s, local info:%s", transLabel(pInst), conn,
tError("%s conn:%p, read invalid packet, exceed limit, received from %s, local info:%s", transLabel(pInst), conn,
conn->dst, conn->src);
transUnrefSrvHandle(conn);
return;
@ -651,7 +651,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
return;
}
tDebug("%s conn %p read error:%s", transLabel(pInst), conn, uv_err_name(nread));
tDebug("%s conn:%p, read error:%s", transLabel(pInst), conn, uv_err_name(nread));
if (nread < 0) {
conn->broken = true;
transUnrefSrvHandle(conn);
@ -668,7 +668,7 @@ void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b
void uvOnTimeoutCb(uv_timer_t* handle) {
// opt
SSvrConn* pConn = handle->data;
tError("conn %p time out", pConn);
tError("conn:%p, time out", pConn);
}
void uvOnSendCb(uv_write_t* req, int status) {
@ -683,7 +683,7 @@ void uvOnSendCb(uv_write_t* req, int status) {
freeWReqToWQ(&conn->wq, wrapper);
tDebug("%s conn %p send data out ", transLabel(conn->pInst), conn);
tDebug("%s conn:%p, send data out ", transLabel(conn->pInst), conn);
if (status == 0) {
while (!QUEUE_IS_EMPTY(&src)) {
queue* head = QUEUE_HEAD(&src);
@ -691,7 +691,7 @@ void uvOnSendCb(uv_write_t* req, int status) {
SSvrRespMsg* smsg = QUEUE_DATA(head, SSvrRespMsg, q);
STraceId* trace = &smsg->msg.info.traceId;
tGDebug("%s conn %p msg already send out, seqNum:%" PRId64 ", sid:%" PRId64 "", transLabel(conn->pInst), conn,
tGDebug("%s conn:%p, msg already send out, seqNum:%" PRId64 ", sid:%" PRId64 "", transLabel(conn->pInst), conn,
smsg->msg.info.seqNum, smsg->msg.info.qId);
destroySmsg(smsg);
}
@ -702,7 +702,7 @@ void uvOnSendCb(uv_write_t* req, int status) {
SSvrRespMsg* smsg = QUEUE_DATA(head, SSvrRespMsg, q);
STraceId* trace = &smsg->msg.info.traceId;
tGDebug("%s conn %p failed to send, seqNum:%" PRId64 ", sid:%" PRId64 " since %s", transLabel(conn->pInst), conn,
tGDebug("%s conn:%p, failed to send, seqNum:%" PRId64 ", sid:%" PRId64 " since %s", transLabel(conn->pInst), conn,
smsg->msg.info.seqNum, smsg->msg.info.qId, uv_err_name(status));
destroySmsg(smsg);
}
@ -770,7 +770,7 @@ static int32_t uvPrepareSendData(SSvrRespMsg* smsg, uv_buf_t* wb) {
}
STraceId* trace = &pMsg->info.traceId;
tGDebug("%s conn %p %s is sent to %s, local info:%s, len:%d, seqNum:%" PRId64 ", sid:%" PRId64 "", transLabel(pInst),
tGDebug("%s conn:%p, %s is sent to %s, local info:%s, len:%d, seqNum:%" PRId64 ", sid:%" PRId64 "", transLabel(pInst),
pConn, TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, len, pMsg->info.seqNum, pMsg->info.qId);
wb->base = (char*)pHead;
@ -780,7 +780,7 @@ static int32_t uvPrepareSendData(SSvrRespMsg* smsg, uv_buf_t* wb) {
static int32_t uvBuildToSendData(SSvrConn* pConn, uv_buf_t** ppBuf, int32_t* bufNum, queue* toSendQ) {
int32_t code = 0;
int32_t size = transQueueSize(&pConn->resps);
tDebug("%s conn %p has %d msg to send", transLabel(pConn->pInst), pConn, size);
tDebug("%s conn:%p, has %d msg to send", transLabel(pConn->pInst), pConn, size);
if (size == 0) {
return 0;
}
@ -828,13 +828,13 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrRespMsg* smsg) {
}
int32_t size = transQueueSize(&pConn->resps);
if (size == 0) {
tDebug("%s conn %p has %d msg to send", transLabel(pConn->pInst), pConn, size);
tDebug("%s conn:%p, has %d msg to send", transLabel(pConn->pInst), pConn, size);
return;
}
uv_write_t* req = allocWReqFromWQ(&pConn->wq, pConn);
if (req == NULL) {
uError("%s conn %p failed to alloc write req since %s", transLabel(pConn->pInst), pConn, tstrerror(terrno));
uError("%s conn:%p, failed to alloc write req since %s", transLabel(pConn->pInst), pConn, tstrerror(terrno));
transUnrefSrvHandle(pConn);
return;
}
@ -844,11 +844,11 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrRespMsg* smsg) {
int32_t bufNum = 0;
code = uvBuildToSendData(pConn, &pBuf, &bufNum, &pWreq->node);
if (code != 0) {
tError("%s conn %p failed to send data", transLabel(pConn->pInst), pConn);
tError("%s conn:%p, failed to send data", transLabel(pConn->pInst), pConn);
return;
}
if (bufNum == 0) {
tDebug("%s conn %p no data to send", transLabel(pConn->pInst), pConn);
tDebug("%s conn:%p, no data to send", transLabel(pConn->pInst), pConn);
return;
}
@ -856,7 +856,7 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrRespMsg* smsg) {
int32_t ret = uv_write(req, (uv_stream_t*)pConn->pTcp, pBuf, bufNum, uvOnSendCb);
if (ret != 0) {
tError("conn %p failed to write data since %s", pConn, uv_err_name(ret));
tError("conn:%p, failed to write data since %s", pConn, uv_err_name(ret));
pConn->broken = true;
while (!QUEUE_IS_EMPTY(&pWreq->node)) {
queue* head = QUEUE_HEAD(&pWreq->node);
@ -876,13 +876,13 @@ int32_t uvMayHandleReleaseResp(SSvrRespMsg* pMsg) {
if (pMsg->msg.msgType == TDMT_SCH_TASK_RELEASE && qid > 0) {
SSvrRegArg* p = taosHashGet(pConn->pQTable, &qid, sizeof(qid));
if (p == NULL) {
tError("%s conn %p already release sid:%" PRId64 "", transLabel(pConn->pInst), pConn, qid);
tError("%s conn:%p, already release sid:%" PRId64 "", transLabel(pConn->pInst), pConn, qid);
return TSDB_CODE_RPC_NO_STATE;
} else {
transFreeMsg(p->msg.pCont);
code = taosHashRemove(pConn->pQTable, &qid, sizeof(qid));
if (code != 0) {
tError("%s conn %p failed to release sid:%" PRId64 " since %s", transLabel(pConn->pInst), pConn, qid,
tError("%s conn:%p, failed to release sid:%" PRId64 " since %s", transLabel(pConn->pInst), pConn, qid,
tstrerror(code));
}
}
@ -1001,18 +1001,18 @@ static void uvShutDownCb(uv_shutdown_t* req, int status) {
static void uvWorkDoTask(uv_work_t* req) {
// doing time-consumeing task
// only auth conn currently, add more func later
tTrace("conn %p start to be processed in BG Thread", req->data);
tTrace("conn:%p, start to be processed in BG Thread", req->data);
return;
}
static void uvWorkAfterTask(uv_work_t* req, int status) {
if (status != 0) {
tTrace("conn %p failed to processed ", req->data);
tTrace("conn:%p, failed to processed ", req->data);
}
// Done time-consumeing task
// add more func later
// this func called in main loop
tTrace("conn %p already processed ", req->data);
tTrace("conn:%p, already processed ", req->data);
taosMemoryFree(req);
}
@ -1112,18 +1112,18 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
if ((code = uv_accept(q, (uv_stream_t*)(pConn->pTcp))) == 0) {
uv_os_fd_t fd;
TAOS_UNUSED(uv_fileno((const uv_handle_t*)pConn->pTcp, &fd));
tTrace("conn %p created, fd:%d", pConn, fd);
tTrace("conn:%p, created, fd:%d", pConn, fd);
struct sockaddr_storage peername, sockname;
// Get and valid the peer info
int addrlen = sizeof(peername);
if ((code = uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&peername, &addrlen)) != 0) {
tError("conn %p failed to get peer info since %s", pConn, uv_strerror(code));
tError("conn:%p, failed to get peer info since %s", pConn, uv_strerror(code));
transUnrefSrvHandle(pConn);
return;
}
if (peername.ss_family != AF_INET) {
tError("conn %p failed to get peer info since not support other protocol except ipv4", pConn);
tError("conn:%p, failed to get peer info since not support other protocol except ipv4", pConn);
transUnrefSrvHandle(pConn);
return;
}
@ -1132,12 +1132,12 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
// Get and valid the sock info
addrlen = sizeof(sockname);
if ((code = uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&sockname, &addrlen)) != 0) {
tError("conn %p failed to get local info since %s", pConn, uv_strerror(code));
tError("conn:%p, failed to get local info since %s", pConn, uv_strerror(code));
transUnrefSrvHandle(pConn);
return;
}
if (sockname.ss_family != AF_INET) {
tError("conn %p failed to get sock info since not support other protocol except ipv4", pConn);
tError("conn:%p, failed to get sock info since not support other protocol except ipv4", pConn);
transUnrefSrvHandle(pConn);
return;
}
@ -1156,7 +1156,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
}
code = uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocRecvBufferCb, uvOnRecvCb);
if (code != 0) {
tWarn("conn %p failed to start to read since %s", pConn, uv_err_name(code));
tWarn("conn:%p, failed to start to read since %s", pConn, uv_err_name(code));
transUnrefSrvHandle(pConn);
return;
}
@ -1341,7 +1341,7 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
pConn->refId = exh->refId;
QUEUE_INIT(&exh->q);
tTrace("%s handle %p, conn %p created, refId:%" PRId64, transLabel(pInst), exh, pConn, pConn->refId);
tTrace("%s handle %p, conn:%p created, refId:%" PRId64, transLabel(pInst), exh, pConn, pConn->refId);
pConn->pQTable = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (pConn->pQTable == NULL) {
@ -1405,7 +1405,7 @@ static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear) {
if (clear) {
if (!uv_is_closing((uv_handle_t*)conn->pTcp)) {
tTrace("conn %p to be destroyed", conn);
tTrace("conn:%p, to be destroyed", conn);
uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn);
}
}
@ -1421,7 +1421,7 @@ void uvConnDestroyAllState(SSvrConn* p) {
SSvrRegArg* arg = pIter;
int64_t* qid = taosHashGetKey(pIter, NULL);
(pInst->cfp)(pInst->parent, &(arg->msg), NULL);
tTrace("conn %p broken, notify server app, sid:%" PRId64 "", p, *qid);
tTrace("conn:%p, broken, notify server app, sid:%" PRId64 "", p, *qid);
pIter = taosHashIterate(pQTable, pIter);
}
@ -1441,7 +1441,7 @@ static void uvDestroyConn(uv_handle_t* handle) {
transRemoveExHandle(uvGetConnRefOfThrd(thrd), conn->refId);
STrans* pInst = thrd->pInst;
tDebug("%s conn %p destroy", transLabel(pInst), conn);
tDebug("%s conn:%p, destroy", transLabel(pInst), conn);
transQueueDestroy(&conn->resps);
@ -1718,7 +1718,7 @@ void uvHandleRelease(SSvrRespMsg* msg, SWorkThrd* thrd) { return; }
void uvHandleResp(SSvrRespMsg* msg, SWorkThrd* thrd) {
// send msg to client
tDebug("%s conn %p start to send resp (2/2)", transLabel(thrd->pInst), msg->pConn);
tDebug("%s conn:%p, start to send resp (2/2)", transLabel(thrd->pInst), msg->pConn);
uvStartSendResp(msg);
}
@ -1726,7 +1726,7 @@ int32_t uvHandleStateReq(SSvrRespMsg* msg) {
int32_t code = 0;
SSvrConn* conn = msg->pConn;
int64_t qid = msg->msg.info.qId;
tDebug("%s conn %p start to register brokenlink callback, sid:%" PRId64 "", transLabel(conn->pInst), conn, qid);
tDebug("%s conn:%p, start to register brokenlink callback, sid:%" PRId64 "", transLabel(conn->pInst), conn, qid);
SSvrRegArg arg = {.notifyCount = 0, .init = 1, .msg = msg->msg};
SSvrRegArg* p = taosHashGet(conn->pQTable, &qid, sizeof(qid));
@ -1735,12 +1735,12 @@ int32_t uvHandleStateReq(SSvrRespMsg* msg) {
}
code = taosHashPut(conn->pQTable, &qid, sizeof(qid), &arg, sizeof(arg));
if (code == 0) tDebug("conn %p register brokenlink callback succ", conn);
if (code == 0) tDebug("conn:%p, register brokenlink callback succ", conn);
return code;
}
void uvHandleRegister(SSvrRespMsg* msg, SWorkThrd* thrd) {
SSvrConn* conn = msg->pConn;
tDebug("%s conn %p register brokenlink callback", transLabel(thrd->pInst), conn);
tDebug("%s conn:%p, register brokenlink callback", transLabel(thrd->pInst), conn);
int32_t code = uvHandleStateReq(msg);
taosMemoryFree(msg);
}
@ -1859,7 +1859,7 @@ void transRefSrvHandle(void* handle) {
}
SSvrConn* pConn = handle;
pConn->ref++;
tTrace("conn %p ref count:%d", pConn, pConn->ref);
tTrace("conn:%p, ref count:%d", pConn, pConn->ref);
}
void transUnrefSrvHandle(void* handle) {
@ -1868,7 +1868,7 @@ void transUnrefSrvHandle(void* handle) {
}
SSvrConn* pConn = handle;
pConn->ref--;
tTrace("conn %p ref count:%d", pConn, pConn->ref);
tTrace("conn:%p, ref count:%d", pConn, pConn->ref);
if (pConn->ref == 0) {
destroyConn((SSvrConn*)handle, true);
}
@ -1903,7 +1903,7 @@ int32_t transReleaseSrvHandle(void* handle, int32_t status) {
m->msg = tmsg;
m->type = Normal;
tDebug("%s conn %p start to send %s, sid:%" PRId64 "", transLabel(pThrd->pInst), exh->handle, TMSG_INFO(tmsg.msgType),
tDebug("%s conn:%p, start to send %s, sid:%" PRId64 "", transLabel(pThrd->pInst), exh->handle, TMSG_INFO(tmsg.msgType),
qId);
if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
destroySmsg(m);
@ -1958,7 +1958,7 @@ int32_t transSendResponse(const STransMsg* msg) {
m->type = Normal;
STraceId* trace = (STraceId*)&msg->info.traceId;
tGDebug("conn %p start to send resp (1/2)", exh->handle);
tGDebug("conn:%p, start to send resp (1/2)", exh->handle);
if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
destroySmsg(m);
transReleaseExHandle(msg->info.refIdMgt, refId);
@ -2006,7 +2006,7 @@ int32_t transRegisterMsg(const STransMsg* msg) {
m->type = Register;
STrans* pInst = pThrd->pInst;
tDebug("%s conn %p start to register brokenlink callback", transLabel(pInst), exh->handle);
tDebug("%s conn:%p, start to register brokenlink callback", transLabel(pInst), exh->handle);
if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
destroySmsg(m);
transReleaseExHandle(msg->info.refIdMgt, refId);