diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index fd57eef83a..6ab962e13f 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -23,6 +23,7 @@ extern "C" { #include #include "taosdef.h" #include "tmsg.h" +#include "ttrace.h" #define TAOS_CONN_SERVER 0 #define TAOS_CONN_CLIENT 1 @@ -41,10 +42,12 @@ typedef struct { typedef struct SRpcHandleInfo { // rpc info - void * handle; // rpc handle returned to app - int64_t refId; // refid, used by server - int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp); - int32_t persistHandle; // persist handle or not + void * handle; // rpc handle returned to app + int64_t refId; // refid, used by server + int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp); + int32_t persistHandle; // persist handle or not + STraceId traceId; + // int64_t traceId; SRpcConnInfo connInfo; // app info diff --git a/include/util/ttrace.h b/include/util/ttrace.h index aceb287aed..b6bf9bb6f7 100644 --- a/include/util/ttrace.h +++ b/include/util/ttrace.h @@ -21,18 +21,33 @@ extern "C" { #endif -typedef int64_t STraceId; -typedef int32_t STraceSubId; +#pragma(push, 1) +typedef struct { + int64_t rootId; + int64_t msgId; +} STraceId; -STraceId traceInitId(STraceSubId *h, STraceSubId *l); +#pragma(pop) -void traceId2Str(STraceId *id, char *buf); +#define TRACE_SET_ROOTID(traceId, root) \ + do { \ + (traceId)->rootId = root; \ + } while (0); -void traceSetSubId(STraceId *id, int32_t *subId); +#define TRACE_GET_ROOTID(traceId) (traceId)->rootId -STraceSubId traceGetParentId(STraceId *id); +#define TRACE_SET_MSGID(traceId, mId) \ + do { \ + (traceId)->msgId = mId; \ + } while (0) + +#define TRACE_GET_MSGID(traceId) (traceId)->msgId + +#define TRACE_TO_STR(traceId, buf) \ + do { \ + sprintf(buf, "0x%" PRIx64 ": 0x%" PRIx64 "", traceId->rootId, traceId->msgId); \ + } while (0) -STraceSubId traceGenSubId(); #ifdef __cplusplus } #endif diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index a5a499aaf5..c945ca00fd 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -154,7 +154,7 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra .info.persistHandle = persistHandle, .code = 0}; assert(pInfo->fp != NULL); - + TRACE_SET_ROOTID(&rpcMsg.info.traceId, pInfo->requestId); rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx); return TSDB_CODE_SUCCESS; } @@ -208,14 +208,14 @@ void destroyQueryExecRes(SQueryExecRes* pRes) { switch (pRes->msgType) { case TDMT_VND_ALTER_TABLE: case TDMT_MND_ALTER_STB: { - tFreeSTableMetaRsp((STableMetaRsp *)pRes->res); + tFreeSTableMetaRsp((STableMetaRsp*)pRes->res); taosMemoryFreeClear(pRes->res); break; } case TDMT_VND_SUBMIT: { tFreeSSubmitRsp((SSubmitRsp*)pRes->res); break; - } + } case TDMT_VND_QUERY: { taosArrayDestroy((SArray*)pRes->res); break; @@ -224,5 +224,3 @@ void destroyQueryExecRes(SQueryExecRes* pRes) { qError("invalid exec result for request type %d", pRes->msgType); } } - - diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index e680e30042..146b127422 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -26,6 +26,7 @@ extern "C" { #include "transLog.h" #include "transportInt.h" #include "trpc.h" +#include "ttrace.h" #include "tutil.h" typedef void* queue[2]; @@ -140,6 +141,7 @@ typedef struct { char spi : 2; char user[TSDB_UNI_LEN]; + STraceId traceId; uint64_t ahandle; // ahandle assigned by client uint32_t code; // del later uint32_t msgType; diff --git a/source/libs/transport/inc/transLog.h b/source/libs/transport/inc/transLog.h index ebf231cfcf..02e2050283 100644 --- a/source/libs/transport/inc/transLog.h +++ b/source/libs/transport/inc/transLog.h @@ -22,6 +22,7 @@ extern "C" { // clang-format off #include "tlog.h" +#include "ttrace.h" #define tFatal(...) do {if (rpcDebugFlag & DEBUG_FATAL){ taosPrintLog("RPC FATAL ", DEBUG_FATAL, rpcDebugFlag, __VA_ARGS__); }} while (0) #define tError(...)do { if (rpcDebugFlag & DEBUG_ERROR){ taosPrintLog("RPC ERROR ", DEBUG_ERROR, rpcDebugFlag, __VA_ARGS__); } } while(0) @@ -30,6 +31,10 @@ extern "C" { #define tDebug(...) do {if (rpcDebugFlag & DEBUG_DEBUG){ taosPrintLog("RPC ", DEBUG_DEBUG, rpcDebugFlag, __VA_ARGS__); }} while(0) #define tTrace(...) do {if (rpcDebugFlag & DEBUG_TRACE){ taosPrintLog("RPC ", DEBUG_TRACE, rpcDebugFlag, __VA_ARGS__); }} while(0) #define tDump(x, y) do {if (rpcDebugFlag & DEBUG_DUMP) { taosDumpData((unsigned char *)x, y); } } while(0) + +//#define tTR(param, ...) do { char buf[40] = {0};TRACE_TO_STR(trace, buf);tTrace("TRID: %s "param, buf, __VA_ARGS__);} while(0) +#define tTR(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); tTrace(param " TRID: %s", __VA_ARGS__, buf);} while(0) + // clang-format on #ifdef __cplusplus } diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 925de2f321..1ec96f4a7a 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -163,6 +163,11 @@ void rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) { transSetDefaultAddr(thandle, ip, fqdn); } +// void rpcSetMsgTraceId(SRpcMsg* pMsg, STraceId uid) { +// SRpcHandleInfo* pInfo = &pMsg->info; +// pInfo->traceId = uid; +//} + int32_t rpcInit() { // impl later return 0; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 6759490510..a90091c6c6 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -48,9 +48,10 @@ typedef struct SCliMsg { STransConnCtx* ctx; STransMsg msg; queue q; - uint64_t st; STransMsgType type; - int sent; //(0: no send, 1: alread sent) + + uint64_t st; + int sent; //(0: no send, 1: alread sent) } SCliMsg; typedef struct SCliThrdObj { @@ -167,10 +168,10 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { snprintf(key, sizeof(key), "%s:%d", ip, (int)port); \ } while (0) -#define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1) -#define CONN_PERSIST_TIME(para) (para * 1000 * 10) -#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) -#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label) +#define CONN_HOST_THREAD_IDX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1) +#define CONN_PERSIST_TIME(para) (para * 1000 * 10) +#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) +#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label) #define CONN_SHOULD_RELEASE(conn, head) \ do { \ if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ @@ -280,6 +281,7 @@ void cliHandleResp(SCliConn* conn) { transMsg.code = pHead->code; transMsg.msgType = pHead->msgType; transMsg.info.ahandle = NULL; + transMsg.info.traceId = pHead->traceId; SCliMsg* pMsg = NULL; STransConnCtx* pCtx = NULL; @@ -324,18 +326,20 @@ void cliHandleResp(SCliConn* conn) { transMsg.info.handle = conn; tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn); } - - tDebug("%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, code: %d", pTransInst->label, conn, - TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), - taosInetNtoa(conn->localAddr.sin_addr), ntohs(conn->localAddr.sin_port), transMsg.contLen, transMsg.code); + // char buf[64] = {0}; + // TRACE_TO_STR(&transMsg.info.traceId, buf); + STraceId* trace = &transMsg.info.traceId; + tTR("conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, code: %d", conn, TMSG_INFO(pHead->msgType), + taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), taosInetNtoa(conn->localAddr.sin_addr), + ntohs(conn->localAddr.sin_port), transMsg.contLen, transMsg.code); if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) { - tTrace("%s except, server continue send while cli ignore it", CONN_GET_INST_LABEL(conn)); + tDebug("%s except, server continue send while cli ignore it", CONN_GET_INST_LABEL(conn)); // transUnrefCliHandle(conn); return; } if (CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) { - tTrace("%s except, server continue send while cli ignore it", CONN_GET_INST_LABEL(conn)); + tDebug("%s except, server continue send while cli ignore it", CONN_GET_INST_LABEL(conn)); // transUnrefCliHandle(conn); return; } @@ -639,11 +643,16 @@ void cliSend(SCliConn* pConn) { pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user)); + pHead->traceId = pMsg->info.traceId; uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); - tDebug("%s conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, - TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), - taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port)); + + // char buf[64] = {0}; + // TRACE_TO_STR(&pMsg->info.traceId, buf); + STraceId* trace = &pMsg->info.traceId; + tTR("conn %p %s is sent to %s:%d, local info %s:%d", pConn, TMSG_INFO(pHead->msgType), + taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr), + ntohs(pConn->localAddr.sin_port)); if (pHead->persist == 1) { CONN_SET_PERSIST_BY_APP(pConn); @@ -723,7 +732,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { if (conn != NULL) { tTrace("%s conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn); } else { - tTrace("not found conn in conn pool %p", pThrd->pool); + tTrace("%s not found conn in conn pool %p", ((STrans*)pThrd->pTransInst)->label, pThrd->pool); } } return conn; @@ -1007,17 +1016,18 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { } } + STraceId* trace = &pResp->info.traceId; if (pCtx->pSem != NULL) { - tTrace("%s conn %p(sync) handle resp", pTransInst->label, pConn); + tTR("conn %p(sync) handle resp", pConn); if (pCtx->pRsp == NULL) { - tTrace("%s conn %p(sync) failed to resp, ignore", pTransInst->label, pConn); + tTR("conn %p(sync) failed to resp, ignore", pConn); } else { memcpy((char*)pCtx->pRsp, (char*)pResp, sizeof(*pResp)); } tsem_post(pCtx->pSem); pCtx->pRsp = NULL; } else { - tTrace("%s conn %p handle resp", pTransInst->label, pConn); + tTR("conn %p handle resp", pConn); if (pResp->code != 0 || pCtx->retryCount == 0 || transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) { pTransInst->cfp(pTransInst->parent, pResp, NULL); } else { @@ -1074,16 +1084,17 @@ void transReleaseCliHandle(void* handle) { void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { STrans* pTransInst = (STrans*)shandle; - int index = CONN_HOST_THREAD_INDEX((SCliConn*)pReq->info.handle); - if (index == -1) { - index = cliRBChoseIdx(pTransInst); + int idx = CONN_HOST_THREAD_IDX((SCliConn*)pReq->info.handle); + if (idx == -1) { + idx = cliRBChoseIdx(pTransInst); } + TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); pCtx->epSet = *pEpSet; pCtx->ahandle = pReq->info.ahandle; pCtx->msgType = pReq->msgType; - pCtx->hThrdIdx = index; + pCtx->hThrdIdx = idx; if (ctx != NULL) { pCtx->appCtx = *ctx; @@ -1096,28 +1107,30 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra cliMsg->st = taosGetTimestampUs(); cliMsg->type = Normal; - SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; + SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[idx]; - tDebug("%s send request at thread:%d, threadID: %08" PRId64 ", msg: %p, dst: %s:%d, app:%p", pTransInst->label, - index, thrd->pid, pReq, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), - pReq->info.ahandle); + STraceId* trace = &pReq->info.traceId; + tTR("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", pTransInst->label, thrd->pid, + EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); ASSERT(transSendAsync(thrd->asyncPool, &(cliMsg->q)) == 0); } void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) { STrans* pTransInst = (STrans*)shandle; - int index = CONN_HOST_THREAD_INDEX(pReq->info.handle); - if (index == -1) { - index = cliRBChoseIdx(pTransInst); + int idx = CONN_HOST_THREAD_IDX(pReq->info.handle); + if (idx == -1) { + idx = cliRBChoseIdx(pTransInst); } tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t)); tsem_init(sem, 0, 0); + TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); + STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); pCtx->epSet = *pEpSet; pCtx->ahandle = pReq->info.ahandle; pCtx->msgType = pReq->msgType; - pCtx->hThrdIdx = index; + pCtx->hThrdIdx = idx; pCtx->pSem = sem; pCtx->pRsp = pRsp; @@ -1127,16 +1140,17 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM cliMsg->st = taosGetTimestampUs(); cliMsg->type = Normal; - SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; - tDebug("%s send request at thread:%d, threadID:%08" PRId64 ", msg: %p, dst: %s:%d, app:%p", pTransInst->label, index, - thrd->pid, pReq, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); + SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[idx]; + + STraceId* trace = &pReq->info.traceId; + tTR("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", pTransInst->label, thrd->pid, + EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); transSendAsync(thrd->asyncPool, &(cliMsg->q)); tsem_wait(sem); tsem_destroy(sem); taosMemoryFree(sem); } - /* * **/ @@ -1159,7 +1173,7 @@ void transSetDefaultAddr(void* ahandle, const char* ip, const char* fqdn) { cliMsg->type = Update; SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i]; - tDebug("%s update epset at thread:%d, threadID:%08" PRId64 "", pTransInst->label, i, thrd->pid); + tDebug("%s update epset at thread:%08" PRId64 "", pTransInst->label, thrd->pid); transSendAsync(thrd->asyncPool, &(cliMsg->q)); } diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 61f791df9c..8cd7f9d827 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -376,17 +376,18 @@ static void transDQTimeout(uv_timer_t* timer) { SDelayQueue* queue = timer->data; tTrace("timer %p timeout", timer); uint64_t timeout = 0; + int64_t current = taosGetTimestampMs(); do { HeapNode* minNode = heapMin(queue->heap); if (minNode == NULL) break; SDelayTask* task = container_of(minNode, SDelayTask, node); - if (task->execTime <= taosGetTimestampMs()) { + if (task->execTime <= current) { heapRemove(queue->heap, minNode); task->func(task->arg); taosMemoryFree(task); timeout = 0; } else { - timeout = task->execTime - taosGetTimestampMs(); + timeout = task->execTime - current; break; } } while (1); diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 0272e4c00a..e90187ca73 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -270,11 +270,8 @@ static void uvHandleReq(SSvrConn* pConn) { transMsg.msgType = pHead->msgType; transMsg.code = pHead->code; - transMsg.info.ahandle = (void*)pHead->ahandle; - transMsg.info.handle = NULL; - - // transDestroyBuffer(&pConn->readBuf); transClearBuffer(&pConn->readBuf); + pConn->inType = pHead->msgType; if (pConn->status == ConnNormal) { if (pHead->persist == 1) { @@ -283,16 +280,18 @@ static void uvHandleReq(SSvrConn* pConn) { tDebug("conn %p acquired by server app", pConn); } } + STraceId* trace = &pHead->traceId; if (pConn->status == ConnNormal && pHead->noResp == 0) { transRefSrvHandle(pConn); - tDebug("conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType), - taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr), - ntohs(pConn->localAddr.sin_port), transMsg.contLen); + + tTR("conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType), + taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr), + ntohs(pConn->localAddr.sin_port), transMsg.contLen); } else { - tDebug("conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d, code: %d", pConn, - TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), - taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen, pHead->noResp, - transMsg.code); + tTR("conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d, code: %d", pConn, + TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), + taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen, pHead->noResp, + transMsg.code); // no ref here } @@ -300,11 +299,14 @@ static void uvHandleReq(SSvrConn* pConn) { // 1. server application should not send resp on handle // 2. once send out data, cli conn released to conn pool immediately // 3. not mixed with persist - + transMsg.info.ahandle = (void*)pHead->ahandle; transMsg.info.handle = (void*)transAcquireExHandle(refMgt, pConn->refId); transMsg.info.refId = pConn->refId; - tTrace("handle %p conn: %p translated to app, refId: %" PRIu64 "", transMsg.info.handle, pConn, pConn->refId); + transMsg.info.traceId = pHead->traceId; + + tTR("handle %p conn: %p translated to app, refId: %" PRIu64 "", transMsg.info.handle, pConn, pConn->refId); assert(transMsg.info.handle != NULL); + if (pHead->noResp == 1) { transMsg.info.refId = -1; } @@ -424,8 +426,6 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) { } static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { - tTrace("conn %p prepare to send resp", smsg->pConn); - SSvrConn* pConn = smsg->pConn; STransMsg* pMsg = &smsg->msg; if (pMsg->pCont == 0) { @@ -434,6 +434,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { } STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); pHead->ahandle = (uint64_t)pMsg->info.ahandle; + pHead->traceId = pMsg->info.traceId; if (pConn->status == ConnNormal) { pHead->msgType = pConn->inType + 1; @@ -454,9 +455,11 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { char* msg = (char*)pHead; int32_t len = transMsgLenFromCont(pMsg->contLen); - tDebug("conn %p %s is sent to %s:%d, local info: %s:%d, msglen:%d", pConn, TMSG_INFO(pHead->msgType), - taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr), - ntohs(pConn->localAddr.sin_port), len); + + STraceId* trace = &pMsg->info.traceId; + tTR("conn %p %s is sent to %s:%d, local info: %s:%d, msglen:%d", pConn, TMSG_INFO(pHead->msgType), + taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr), + ntohs(pConn->localAddr.sin_port), len); pHead->msgLen = htonl(len); wb->base = msg; diff --git a/source/util/src/ttrace.c b/source/util/src/ttrace.c index 1bad4cf5e5..f183fd58fd 100644 --- a/source/util/src/ttrace.c +++ b/source/util/src/ttrace.c @@ -18,51 +18,62 @@ #include "tuuid.h" // clang-format off -static TdThreadOnce init = PTHREAD_ONCE_INIT; -static void * ids = NULL; -static TdThreadMutex mtx; - -void traceInit() { - ids = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); - taosThreadMutexInit(&mtx, NULL); -} -void traceCreateEnv() { - taosThreadOnce(&init, traceInit); -} -void traceDestroyEnv() { - taosThreadMutexDestroy(&mtx); - taosHashCleanup(ids); -} - -STraceId traceInitId(STraceSubId *h, STraceSubId *l) { - STraceId id = *h; - id = ((id << 32) & 0xFFFFFFFF) | ((*l) & 0xFFFFFFFF); - return id; -} -void traceId2Str(STraceId *id, char *buf) { - int32_t f = (*id >> 32) & 0xFFFFFFFF; - int32_t s = (*id) & 0xFFFFFFFF; - sprintf(buf, "%d:%d", f, s); -} - -void traceSetSubId(STraceId *id, STraceSubId *subId) { - int32_t parent = ((*id >> 32) & 0xFFFFFFFF); - taosThreadMutexLock(&mtx); - taosHashPut(ids, subId, sizeof(*subId), &parent, sizeof(parent)); - taosThreadMutexUnlock(&mtx); -} - -STraceSubId traceGetParentId(STraceId *id) { - int32_t parent = ((*id >> 32) & 0xFFFFFFFF); - taosThreadMutexLock(&mtx); - STraceSubId *p = taosHashGet(ids, (void *)&parent, sizeof(parent)); - parent = *p; - taosThreadMutexUnlock(&mtx); - - return parent; -} - -STraceSubId traceGenSubId() { - return tGenIdPI32(); -} +//static TdThreadOnce init = PTHREAD_ONCE_INIT; +//static void * ids = NULL; +//static TdThreadMutex mtx; +// +//void traceInit() { +// ids = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); +// taosThreadMutexInit(&mtx, NULL); +//} +//void traceCreateEnv() { +// taosThreadOnce(&init, traceInit); +//} +//void traceDestroyEnv() { +// taosThreadMutexDestroy(&mtx); +// taosHashCleanup(ids); +//} +// +//STraceId traceInitId(STraceSubId *h, STraceSubId *l) { +// STraceId id = *h; +// id = ((id << 32) & 0xFFFFFFFF) | ((*l) & 0xFFFFFFFF); +// return id; +//} +//void traceId2Str(STraceId *id, char *buf) { +// int32_t f = (*id >> 32) & 0xFFFFFFFF; +// int32_t s = (*id) & 0xFFFFFFFF; +// sprintf(buf, "%d:%d", f, s); +//} +// +//void traceSetSubId(STraceId *id, STraceSubId *subId) { +// int32_t parent = ((*id >> 32) & 0xFFFFFFFF); +// taosThreadMutexLock(&mtx); +// taosHashPut(ids, subId, sizeof(*subId), &parent, sizeof(parent)); +// taosThreadMutexUnlock(&mtx); +//} +// +//STraceSubId traceGetParentId(STraceId *id) { +// int32_t parent = ((*id >> 32) & 0xFFFFFFFF); +// taosThreadMutexLock(&mtx); +// STraceSubId *p = taosHashGet(ids, (void *)&parent, sizeof(parent)); +// parent = *p; +// taosThreadMutexUnlock(&mtx); +// +// return parent; +//} +// +//STraceSubId traceGenSubId() { +// return tGenIdPI32(); +//} +//void traceSetRootId(STraceId *traceid, int64_t rootId) { +// traceId->rootId = rootId; +//} +//int64_t traceGetRootId(STraceId *traceId); +// +//void traceSetMsgId(STraceId *traceid, int64_t msgId); +//int64_t traceGetMsgId(STraceId *traceid); +// +//char *trace2Str(STraceId *id); +// +//void traceSetSubId(STraceId *id, int32_t *subId); // clang-format on