Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/tsdb_refact

This commit is contained in:
Hongze Cheng 2022-06-18 11:29:57 +00:00
commit 2627a0203c
22 changed files with 423 additions and 235 deletions

View File

@ -23,6 +23,7 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include "taosdef.h" #include "taosdef.h"
#include "tmsg.h" #include "tmsg.h"
#include "ttrace.h"
#define TAOS_CONN_SERVER 0 #define TAOS_CONN_SERVER 0
#define TAOS_CONN_CLIENT 1 #define TAOS_CONN_CLIENT 1
@ -41,10 +42,12 @@ typedef struct {
typedef struct SRpcHandleInfo { typedef struct SRpcHandleInfo {
// rpc info // rpc info
void * handle; // rpc handle returned to app void * handle; // rpc handle returned to app
int64_t refId; // refid, used by server int64_t refId; // refid, used by server
int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp); int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp);
int32_t persistHandle; // persist handle or not int32_t persistHandle; // persist handle or not
STraceId traceId;
// int64_t traceId;
// app info // app info
void *ahandle; // app handle set by client void *ahandle; // app handle set by client

57
include/util/ttrace.h Normal file
View File

@ -0,0 +1,57 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TRACE_H_
#define _TD_TRACE_H_
#include <stdint.h>
#include <stdlib.h>
#ifdef __cplusplus
extern "C" {
#endif
#pragma(push, 1)
typedef struct STraceId {
int64_t rootId;
int64_t msgId;
} STraceId;
#pragma(pop)
#define TRACE_SET_ROOTID(traceId, root) \
do { \
(traceId)->rootId = root; \
} while (0);
#define TRACE_GET_ROOTID(traceId) (traceId)->rootId
#define TRACE_SET_MSGID(traceId, mId) \
do { \
(traceId)->msgId = mId; \
} while (0)
#define TRACE_GET_MSGID(traceId) (traceId)->msgId
#define TRACE_TO_STR(traceId, buf) \
do { \
sprintf(buf, "0x%" PRIx64 ":0x%" PRIx64 "", traceId->rootId, traceId->msgId); \
} while (0)
#ifdef __cplusplus
}
#endif
#endif

View File

@ -101,7 +101,8 @@ void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SDnodeMgmt *pMgmt = pInfo->ahandle; SDnodeMgmt *pMgmt = pInfo->ahandle;
int32_t code = -1; int32_t code = -1;
dTrace("msg:%p, will be processed in dnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); STraceId * trace = &pMsg->info.traceId;
dGTrace("msg:%p, will be processed in dnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_DND_CONFIG_DNODE: case TDMT_DND_CONFIG_DNODE:

View File

@ -48,7 +48,8 @@ static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) {
static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SMnodeMgmt *pMgmt = pInfo->ahandle; SMnodeMgmt *pMgmt = pInfo->ahandle;
int32_t code = -1; int32_t code = -1;
dTrace("msg:%p, get from mnode queue", pMsg); STraceId * trace = &pMsg->info.traceId;
dGTrace("msg:%p, get from mnode queue", pMsg);
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_MON_MM_INFO: case TDMT_MON_MM_INFO:

View File

@ -31,7 +31,8 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SVnodeMgmt *pMgmt = pInfo->ahandle; SVnodeMgmt *pMgmt = pInfo->ahandle;
int32_t code = -1; int32_t code = -1;
dTrace("msg:%p, get from vnode-mgmt queue", pMsg); STraceId *trace = &pMsg->info.traceId;
dGTrace("msg:%p, get from vnode-mgmt queue", pMsg);
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_MON_VM_INFO: case TDMT_MON_VM_INFO:
code = vmProcessGetMonitorInfoReq(pMgmt, pMsg); code = vmProcessGetMonitorInfoReq(pMgmt, pMsg);
@ -97,7 +98,7 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle; SVnodeObj *pVnode = pInfo->ahandle;
SRpcMsg *pMsg = NULL; SRpcMsg * pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
@ -118,7 +119,7 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle; SVnodeObj *pVnode = pInfo->ahandle;
SRpcMsg *pMsg = NULL; SRpcMsg * pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;

View File

@ -40,8 +40,9 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
SMgmtWrapper *pWrapper = NULL; SMgmtWrapper *pWrapper = NULL;
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)]; SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
dTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType), STraceId *trace = &pRpc->info.traceId;
pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId); dGTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType),
pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId);
if (pRpc->msgType == TDMT_DND_NET_TEST) { if (pRpc->msgType == TDMT_DND_NET_TEST) {
dmProcessNetTestReq(pDnode, pRpc); dmProcessNetTestReq(pDnode, pRpc);

View File

@ -34,13 +34,13 @@
#include "dnode.h" #include "dnode.h"
#include "mnode.h" #include "mnode.h"
#include "qnode.h"
#include "monitor.h" #include "monitor.h"
#include "qnode.h"
#include "sync.h" #include "sync.h"
#include "wal.h" #include "wal.h"
#include "libs/function/function.h" #include "libs/function/function.h"
// clang-format off
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
@ -51,6 +51,7 @@ extern "C" {
#define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("DND ", DEBUG_INFO, 255, __VA_ARGS__); }} #define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("DND ", DEBUG_INFO, 255, __VA_ARGS__); }}
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }} #define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }}
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }} #define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }}
#define dGTrace(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dTrace(param ",GTID: %s", __VA_ARGS__, buf);} while(0)
typedef enum { typedef enum {
DNODE = 0, DNODE = 0,
@ -184,3 +185,4 @@ void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet);
#endif #endif
#endif /*_TD_DM_INT_H_*/ #endif /*_TD_DM_INT_H_*/
// clang-format on

View File

@ -40,6 +40,8 @@ extern "C" {
#define mInfo(...) { if (mDebugFlag & DEBUG_INFO) { taosPrintLog("MND ", DEBUG_INFO, 255, __VA_ARGS__); }} #define mInfo(...) { if (mDebugFlag & DEBUG_INFO) { taosPrintLog("MND ", DEBUG_INFO, 255, __VA_ARGS__); }}
#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }} #define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }}
#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }} #define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }}
#define mGTrace(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mTrace(param ", GTID: %s", __VA_ARGS__, buf);} while(0)
// clang-format on // clang-format on
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE) #define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
@ -54,7 +56,7 @@ typedef void (*ShowFreeIterFp)(SMnode *pMnode, void *pIter);
typedef struct SQWorker SQHandle; typedef struct SQWorker SQHandle;
typedef struct { typedef struct {
const char *name; const char * name;
MndInitFp initFp; MndInitFp initFp;
MndCleanupFp cleanupFp; MndCleanupFp cleanupFp;
} SMnodeStep; } SMnodeStep;
@ -63,7 +65,7 @@ typedef struct {
int64_t showId; int64_t showId;
ShowRetrieveFp retrieveFps[TSDB_MGMT_TABLE_MAX]; ShowRetrieveFp retrieveFps[TSDB_MGMT_TABLE_MAX];
ShowFreeIterFp freeIterFps[TSDB_MGMT_TABLE_MAX]; ShowFreeIterFp freeIterFps[TSDB_MGMT_TABLE_MAX];
SCacheObj *cache; SCacheObj * cache;
} SShowMgmt; } SShowMgmt;
typedef struct { typedef struct {
@ -100,14 +102,14 @@ typedef struct SMnode {
bool stopped; bool stopped;
bool restored; bool restored;
bool deploy; bool deploy;
char *path; char * path;
int64_t checkTime; int64_t checkTime;
SSdb *pSdb; SSdb * pSdb;
SArray *pSteps; SArray * pSteps;
SQHandle *pQuery; SQHandle * pQuery;
SHashObj *infosMeta; SHashObj * infosMeta;
SHashObj *perfsMeta; SHashObj * perfsMeta;
SWal *pWal; SWal * pWal;
SShowMgmt showMgmt; SShowMgmt showMgmt;
SProfileMgmt profileMgmt; SProfileMgmt profileMgmt;
STelemMgmt telemMgmt; STelemMgmt telemMgmt;

View File

@ -58,21 +58,21 @@ static void *mndBuildTimerMsg(int32_t *pContLen) {
static void mndPullupTrans(SMnode *pMnode) { static void mndPullupTrans(SMnode *pMnode) {
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void * pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
} }
static void mndCalMqRebalance(SMnode *pMnode) { static void mndCalMqRebalance(SMnode *pMnode) {
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void * pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
} }
static void mndPullupTelem(SMnode *pMnode) { static void mndPullupTelem(SMnode *pMnode) {
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void * pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
} }
@ -378,7 +378,7 @@ void mndStop(SMnode *pMnode) {
} }
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode * pMnode = pMsg->info.node;
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
int32_t code = 0; int32_t code = 0;
@ -540,7 +540,7 @@ static int32_t mndCheckMsgContent(SRpcMsg *pMsg) {
} }
int32_t mndProcessRpcMsg(SRpcMsg *pMsg) { int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode * pMnode = pMsg->info.node;
MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)]; MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
if (fp == NULL) { if (fp == NULL) {
mError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); mError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
@ -551,7 +551,8 @@ int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
if (mndCheckMsgContent(pMsg) != 0) return -1; if (mndCheckMsgContent(pMsg) != 0) return -1;
if (mndCheckMnodeState(pMsg) != 0) return -1; if (mndCheckMnodeState(pMsg) != 0) return -1;
mTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); STraceId *trace = &pMsg->info.traceId;
mGTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
int32_t code = (*fp)(pMsg); int32_t code = (*fp)(pMsg);
mndReleaseRpcRef(pMnode); mndReleaseRpcRef(pMnode);
@ -592,7 +593,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
SMonGrantInfo *pGrantInfo) { SMonGrantInfo *pGrantInfo) {
if (mndAcquireRpcRef(pMnode) != 0) return -1; if (mndAcquireRpcRef(pMnode) != 0) return -1;
SSdb *pSdb = pMnode->pSdb; SSdb * pSdb = pMnode->pSdb;
int64_t ms = taosGetTimestampMs(); int64_t ms = taosGetTimestampMs();
pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc)); pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
@ -668,7 +669,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries; pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
tstrncpy(desc.status, "unsynced", sizeof(desc.status)); tstrncpy(desc.status, "unsynced", sizeof(desc.status));
for (int32_t i = 0; i < pVgroup->replica; ++i) { for (int32_t i = 0; i < pVgroup->replica; ++i) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; SVnodeGid * pVgid = &pVgroup->vnodeGid[i];
SMonVnodeDesc *pVnDesc = &desc.vnodes[i]; SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
pVnDesc->dnode_id = pVgid->dnodeId; pVnDesc->dnode_id = pVgid->dnodeId;
tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->role), sizeof(pVnDesc->vnode_role)); tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->role), sizeof(pVnDesc->vnode_role));

View File

@ -18,6 +18,7 @@
#include "sync.h" #include "sync.h"
#include "syncTools.h" #include "syncTools.h"
#include "ttrace.h"
#include "vnodeInt.h" #include "vnodeInt.h"
#ifdef __cplusplus #ifdef __cplusplus
@ -31,6 +32,7 @@ extern "C" {
#define vInfo(...) do { if (vDebugFlag & DEBUG_INFO) { taosPrintLog("VND ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) #define vInfo(...) do { if (vDebugFlag & DEBUG_INFO) { taosPrintLog("VND ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
#define vDebug(...) do { if (vDebugFlag & DEBUG_DEBUG) { taosPrintLog("VND ", DEBUG_DEBUG, vDebugFlag, __VA_ARGS__); }} while(0) #define vDebug(...) do { if (vDebugFlag & DEBUG_DEBUG) { taosPrintLog("VND ", DEBUG_DEBUG, vDebugFlag, __VA_ARGS__); }} while(0)
#define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", DEBUG_TRACE, vDebugFlag, __VA_ARGS__); }} while(0) #define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", DEBUG_TRACE, vDebugFlag, __VA_ARGS__); }} while(0)
#define vGTrace(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vTrace(param " GTID: %s", __VA_ARGS__, buf);} while(0)//#define vDye(...) do
// clang-format on // clang-format on
// vnodeCfg.c // vnodeCfg.c
@ -89,4 +91,4 @@ void vnodeSyncClose(SVnode* pVnode);
} }
#endif #endif
#endif /*_TD_VND_H_*/ #endif /*_TD_VND_H_*/

View File

@ -84,8 +84,8 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return TSDB_CODE_INVALID_MSG; return TSDB_CODE_INVALID_MSG;
} }
STraceId *trace = &pMsg->info.traceId;
vInfo("vgId:%d, start to alter vnode replica to %d, handle:%p", TD_VID(pVnode), req.replica, pMsg->info.handle); vGTrace("vgId:%d, start to alter vnode replica to %d, handle:%p", TD_VID(pVnode), req.replica, pMsg->info.handle);
SSyncCfg cfg = {.replicaNum = req.replica, .myIndex = req.selfIndex}; SSyncCfg cfg = {.replicaNum = req.replica, .myIndex = req.selfIndex};
for (int32_t r = 0; r < req.replica; ++r) { for (int32_t r = 0; r < req.replica; ++r) {
SNodeInfo *pNode = &cfg.nodeInfo[r]; SNodeInfo *pNode = &cfg.nodeInfo[r];
@ -119,14 +119,15 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
} }
void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnode *pVnode = pInfo->ahandle; SVnode * pVnode = pInfo->ahandle;
int32_t vgId = pVnode->config.vgId; int32_t vgId = pVnode->config.vgId;
int32_t code = 0; int32_t code = 0;
SRpcMsg *pMsg = NULL; SRpcMsg *pMsg = NULL;
for (int32_t m = 0; m < numOfMsgs; m++) { for (int32_t m = 0; m < numOfMsgs; m++) {
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
vTrace("vgId:%d, msg:%p get from vnode-write queue handle:%p", vgId, pMsg, pMsg->info.handle); STraceId *trace = &pMsg->info.traceId;
vGTrace("vgId:%d, msg:%p get from vnode-write queue handle:%p", vgId, pMsg, pMsg->info.handle);
if (pMsg->msgType == TDMT_VND_ALTER_REPLICA) { if (pMsg->msgType == TDMT_VND_ALTER_REPLICA) {
code = vnodeProcessAlterReplicaReq(pVnode, pMsg); code = vnodeProcessAlterReplicaReq(pVnode, pMsg);
@ -149,10 +150,10 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps; newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps;
} }
vTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", vgId, pMsg, newEpSet.numOfEps, vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", vgId, pMsg, newEpSet.numOfEps,
newEpSet.inUse); newEpSet.inUse);
for (int32_t i = 0; i < newEpSet.numOfEps; ++i) { for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
vTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", vgId, pMsg, i, newEpSet.eps[i].fqdn, newEpSet.eps[i].port); vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", vgId, pMsg, i, newEpSet.eps[i].fqdn, newEpSet.eps[i].port);
} }
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info}; SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
@ -164,7 +165,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} }
vTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code); vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
@ -173,15 +174,16 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
} }
void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnode *pVnode = pInfo->ahandle; SVnode * pVnode = pInfo->ahandle;
int32_t vgId = pVnode->config.vgId; int32_t vgId = pVnode->config.vgId;
int32_t code = 0; int32_t code = 0;
SRpcMsg *pMsg = NULL; SRpcMsg *pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
vTrace("vgId:%d, msg:%p get from vnode-apply queue, index:%" PRId64 " type:%s handle:%p", vgId, pMsg, STraceId *trace = &pMsg->info.traceId;
pMsg->info.conn.applyIndex, TMSG_INFO(pMsg->msgType), pMsg->info.handle); vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p", vgId, pMsg, TMSG_INFO(pMsg->msgType),
pMsg->info.handle);
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info}; SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
if (rsp.code == 0) { if (rsp.code == 0) {
@ -196,7 +198,7 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} }
vTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, rsp.code); vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, rsp.code);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
@ -218,8 +220,9 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
char *syncNodeStr = sync2SimpleStr(pVnode->sync); char *syncNodeStr = sync2SimpleStr(pVnode->sync);
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr); snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
static int64_t vndTick = 0; static int64_t vndTick = 0;
STraceId * trace = &pMsg->info.traceId;
if (++vndTick % 10 == 1) { if (++vndTick % 10 == 1) {
vTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr); vGTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr);
} }
syncRpcMsgLog2(logBuf, pMsg); syncRpcMsgLog2(logBuf, pMsg);
taosMemoryFree(syncNodeStr); taosMemoryFree(syncNodeStr);
@ -334,8 +337,9 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info); syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
rpcMsg.info.conn.applyIndex = cbMeta.index; rpcMsg.info.conn.applyIndex = cbMeta.index;
vInfo("vgId:%d, alter vnode replica is confirmed, type:%s contLen:%d seq:%" PRIu64 " index:%" PRId64 " handle:%p", STraceId *trace = (STraceId *)&pMsg->info.traceId;
TD_VID(pVnode), TMSG_INFO(pMsg->msgType), pMsg->contLen, cbMeta.seqNum, cbMeta.index, rpcMsg.info.handle); vGTrace("vgId:%d, alter vnode replica is confirmed, type:%s contLen:%d seq:%" PRIu64 " handle:%p", TD_VID(pVnode),
TMSG_INFO(pMsg->msgType), pMsg->contLen, cbMeta.seqNum, rpcMsg.info.handle);
if (rpcMsg.info.handle != NULL) { if (rpcMsg.info.handle != NULL) {
tmsgSendRsp(&rpcMsg); tmsgSendRsp(&rpcMsg);
} }
@ -344,7 +348,7 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon
} }
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
SVnode *pVnode = pFsm->data; SVnode * pVnode = pFsm->data;
SSnapshot snapshot = {0}; SSnapshot snapshot = {0};
SyncIndex beginIndex = SYNC_INDEX_INVALID; SyncIndex beginIndex = SYNC_INDEX_INVALID;
char logBuf[256] = {0}; char logBuf[256] = {0};

View File

@ -154,7 +154,7 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra
.info.persistHandle = persistHandle, .info.persistHandle = persistHandle,
.code = 0}; .code = 0};
assert(pInfo->fp != NULL); assert(pInfo->fp != NULL);
TRACE_SET_ROOTID(&rpcMsg.info.traceId, pInfo->requestId);
rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx); rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -204,14 +204,14 @@ void destroyQueryExecRes(SQueryExecRes* pRes) {
switch (pRes->msgType) { switch (pRes->msgType) {
case TDMT_VND_ALTER_TABLE: case TDMT_VND_ALTER_TABLE:
case TDMT_MND_ALTER_STB: { case TDMT_MND_ALTER_STB: {
tFreeSTableMetaRsp((STableMetaRsp *)pRes->res); tFreeSTableMetaRsp((STableMetaRsp*)pRes->res);
taosMemoryFreeClear(pRes->res); taosMemoryFreeClear(pRes->res);
break; break;
} }
case TDMT_VND_SUBMIT: { case TDMT_VND_SUBMIT: {
tFreeSSubmitRsp((SSubmitRsp*)pRes->res); tFreeSSubmitRsp((SSubmitRsp*)pRes->res);
break; break;
} }
case TDMT_VND_QUERY: { case TDMT_VND_QUERY: {
taosArrayDestroy((SArray*)pRes->res); taosArrayDestroy((SArray*)pRes->res);
break; break;
@ -220,5 +220,3 @@ void destroyQueryExecRes(SQueryExecRes* pRes) {
qError("invalid exec result for request type %d", pRes->msgType); qError("invalid exec result for request type %d", pRes->msgType);
} }
} }

View File

@ -841,15 +841,6 @@ void syncNodeStart(SSyncNode* pSyncNode) {
syncNodeBecomeLeader(pSyncNode, "one replica start"); syncNodeBecomeLeader(pSyncNode, "one replica start");
// Raft 3.6.2 Committing entries from previous terms // Raft 3.6.2 Committing entries from previous terms
// use this now
syncNodeAppendNoop(pSyncNode);
syncMaybeAdvanceCommitIndex(pSyncNode); // maybe only one replica
if (gRaftDetailLog) {
syncNodeLog2("==state change become leader immediately==", pSyncNode);
}
return; return;
} }
@ -1390,7 +1381,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port); syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port);
do { do {
char eventLog[128]; char eventLog[256];
snprintf(eventLog, sizeof(eventLog), "snapshot sender reset for %lu, newIndex:%d, %s:%d, %p", snprintf(eventLog, sizeof(eventLog), "snapshot sender reset for %lu, newIndex:%d, %s:%d, %p",
(pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j]); (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j]);
syncNodeEventLog(pSyncNode, eventLog); syncNodeEventLog(pSyncNode, eventLog);
@ -1405,7 +1396,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
(pSyncNode->senders)[i]->replicaIndex = i; (pSyncNode->senders)[i]->replicaIndex = i;
do { do {
char eventLog[128]; char eventLog[256];
snprintf(eventLog, sizeof(eventLog), "snapshot sender udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", snprintf(eventLog, sizeof(eventLog), "snapshot sender udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d",
oldreplicaIndex, i, host, port, (pSyncNode->senders)[i], reset); oldreplicaIndex, i, host, port, (pSyncNode->senders)[i], reset);
syncNodeEventLog(pSyncNode, eventLog); syncNodeEventLog(pSyncNode, eventLog);
@ -1583,6 +1574,10 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
// start heartbeat timer // start heartbeat timer
syncNodeStartHeartbeatTimer(pSyncNode); syncNodeStartHeartbeatTimer(pSyncNode);
// append noop
syncNodeAppendNoop(pSyncNode);
syncMaybeAdvanceCommitIndex(pSyncNode); // maybe only one replica
// trace log // trace log
do { do {
int32_t debugStrLen = strlen(debugStr); int32_t debugStrLen = strlen(debugStr);
@ -1608,10 +1603,6 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
// Raft 3.6.2 Committing entries from previous terms // Raft 3.6.2 Committing entries from previous terms
// use this now
syncNodeAppendNoop(pSyncNode);
syncMaybeAdvanceCommitIndex(pSyncNode); // maybe only one replica
// do not use this // do not use this
// syncNodeEqNoop(pSyncNode); // syncNodeEqNoop(pSyncNode);
} }

View File

@ -26,6 +26,7 @@ extern "C" {
#include "transLog.h" #include "transLog.h"
#include "transportInt.h" #include "transportInt.h"
#include "trpc.h" #include "trpc.h"
#include "ttrace.h"
#include "tutil.h" #include "tutil.h"
typedef void* queue[2]; typedef void* queue[2];
@ -140,6 +141,7 @@ typedef struct {
char spi : 2; char spi : 2;
char user[TSDB_UNI_LEN]; char user[TSDB_UNI_LEN];
STraceId traceId;
uint64_t ahandle; // ahandle assigned by client uint64_t ahandle; // ahandle assigned by client
uint32_t code; // del later uint32_t code; // del later
uint32_t msgType; uint32_t msgType;

View File

@ -22,6 +22,7 @@ extern "C" {
// clang-format off // clang-format off
#include "tlog.h" #include "tlog.h"
#include "ttrace.h"
#define tFatal(...) do {if (rpcDebugFlag & DEBUG_FATAL){ taosPrintLog("RPC FATAL ", DEBUG_FATAL, rpcDebugFlag, __VA_ARGS__); }} while (0) #define tFatal(...) do {if (rpcDebugFlag & DEBUG_FATAL){ taosPrintLog("RPC FATAL ", DEBUG_FATAL, rpcDebugFlag, __VA_ARGS__); }} while (0)
#define tError(...)do { if (rpcDebugFlag & DEBUG_ERROR){ taosPrintLog("RPC ERROR ", DEBUG_ERROR, rpcDebugFlag, __VA_ARGS__); } } while(0) #define tError(...)do { if (rpcDebugFlag & DEBUG_ERROR){ taosPrintLog("RPC ERROR ", DEBUG_ERROR, rpcDebugFlag, __VA_ARGS__); } } while(0)
@ -30,6 +31,10 @@ extern "C" {
#define tDebug(...) do {if (rpcDebugFlag & DEBUG_DEBUG){ taosPrintLog("RPC ", DEBUG_DEBUG, rpcDebugFlag, __VA_ARGS__); }} while(0) #define tDebug(...) do {if (rpcDebugFlag & DEBUG_DEBUG){ taosPrintLog("RPC ", DEBUG_DEBUG, rpcDebugFlag, __VA_ARGS__); }} while(0)
#define tTrace(...) do {if (rpcDebugFlag & DEBUG_TRACE){ taosPrintLog("RPC ", DEBUG_TRACE, rpcDebugFlag, __VA_ARGS__); }} while(0) #define tTrace(...) do {if (rpcDebugFlag & DEBUG_TRACE){ taosPrintLog("RPC ", DEBUG_TRACE, rpcDebugFlag, __VA_ARGS__); }} while(0)
#define tDump(x, y) do {if (rpcDebugFlag & DEBUG_DUMP) { taosDumpData((unsigned char *)x, y); } } while(0) #define tDump(x, y) do {if (rpcDebugFlag & DEBUG_DUMP) { taosDumpData((unsigned char *)x, y); } } while(0)
//#define tTR(param, ...) do { char buf[40] = {0};TRACE_TO_STR(trace, buf);tTrace("TRID: %s "param, buf, __VA_ARGS__);} while(0)
#define tGTrace(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); tTrace(param ", GTID: %s", __VA_ARGS__, buf);} while(0)
// clang-format on // clang-format on
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -163,6 +163,11 @@ void rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) {
transSetDefaultAddr(thandle, ip, fqdn); transSetDefaultAddr(thandle, ip, fqdn);
} }
// void rpcSetMsgTraceId(SRpcMsg* pMsg, STraceId uid) {
// SRpcHandleInfo* pInfo = &pMsg->info;
// pInfo->traceId = uid;
//}
int32_t rpcInit() { int32_t rpcInit() {
// impl later // impl later
return 0; return 0;

View File

@ -48,9 +48,10 @@ typedef struct SCliMsg {
STransConnCtx* ctx; STransConnCtx* ctx;
STransMsg msg; STransMsg msg;
queue q; queue q;
uint64_t st;
STransMsgType type; STransMsgType type;
int sent; //(0: no send, 1: alread sent)
uint64_t st;
int sent; //(0: no send, 1: alread sent)
} SCliMsg; } SCliMsg;
typedef struct SCliThrdObj { typedef struct SCliThrdObj {
@ -167,27 +168,27 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
snprintf(key, sizeof(key), "%s:%d", ip, (int)port); \ snprintf(key, sizeof(key), "%s:%d", ip, (int)port); \
} while (0) } while (0)
#define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1) #define CONN_HOST_THREAD_IDX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1)
#define CONN_PERSIST_TIME(para) (para * 1000 * 10) #define CONN_PERSIST_TIME(para) (para * 1000 * 10)
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) #define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label)
#define CONN_SHOULD_RELEASE(conn, head) \ #define CONN_SHOULD_RELEASE(conn, head) \
do { \ do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
uint64_t ahandle = head->ahandle; \ uint64_t ahandle = head->ahandle; \
CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); \ CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); \
conn->status = ConnRelease; \ conn->status = ConnRelease; \
transClearBuffer(&conn->readBuf); \ transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \ transFreeMsg(transContFromHead((char*)head)); \
tDebug("cli conn %p receive release request, ref: %d", conn, T_REF_VAL_GET(conn)); \ tDebug("%s conn %p receive release request, ref: %d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn)); \
if (T_REF_VAL_GET(conn) > 1) { \ if (T_REF_VAL_GET(conn) > 1) { \
transUnrefCliHandle(conn); \ transUnrefCliHandle(conn); \
} \ } \
destroyCmsg(pMsg); \ destroyCmsg(pMsg); \
cliReleaseUnfinishedMsg(conn); \ cliReleaseUnfinishedMsg(conn); \
addConnToPool(((SCliThrdObj*)conn->hostThrd)->pool, conn); \ addConnToPool(((SCliThrdObj*)conn->hostThrd)->pool, conn); \
return; \ return; \
} \ } \
} while (0) } while (0)
#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \ #define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \
@ -280,6 +281,7 @@ void cliHandleResp(SCliConn* conn) {
transMsg.code = pHead->code; transMsg.code = pHead->code;
transMsg.msgType = pHead->msgType; transMsg.msgType = pHead->msgType;
transMsg.info.ahandle = NULL; transMsg.info.ahandle = NULL;
transMsg.info.traceId = pHead->traceId;
SCliMsg* pMsg = NULL; SCliMsg* pMsg = NULL;
STransConnCtx* pCtx = NULL; STransConnCtx* pCtx = NULL;
@ -293,27 +295,28 @@ void cliHandleResp(SCliConn* conn) {
if (transMsg.info.ahandle == NULL) { if (transMsg.info.ahandle == NULL) {
transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType)); transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType));
} }
tDebug("cli conn %p construct ahandle %p, persist: 0", conn, transMsg.info.ahandle); tDebug("%s conn %p construct ahandle %p, persist: 0", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
} else { } else {
transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL; transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
tDebug("cli conn %p get ahandle %p, persist: 0", conn, transMsg.info.ahandle); tDebug("%s conn %p get ahandle %p, persist: 0", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
} }
} else { } else {
uint64_t ahandle = (uint64_t)pHead->ahandle; uint64_t ahandle = (uint64_t)pHead->ahandle;
CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
if (pMsg == NULL) { if (pMsg == NULL) {
transMsg.info.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType); transMsg.info.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType);
tDebug("cli conn %p construct ahandle %p by %s, persist: 1", conn, transMsg.info.ahandle, tDebug("%s conn %p construct ahandle %p by %s, persist: 1", CONN_GET_INST_LABEL(conn), conn,
TMSG_INFO(transMsg.msgType)); transMsg.info.ahandle, TMSG_INFO(transMsg.msgType));
if (!CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) { if (!CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType)); transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType));
tDebug("cli conn %p construct ahandle %p due brokenlink, persist: 1", conn, transMsg.info.ahandle); tDebug("%s conn %p construct ahandle %p due brokenlink, persist: 1", CONN_GET_INST_LABEL(conn), conn,
transMsg.info.ahandle);
} }
} else { } else {
pCtx = pMsg ? pMsg->ctx : NULL; pCtx = pMsg ? pMsg->ctx : NULL;
transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL; transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
tDebug("cli conn %p get ahandle %p, persist: 1", conn, transMsg.info.ahandle); tDebug("%s conn %p get ahandle %p, persist: 1", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
} }
} }
// buf's mem alread translated to transMsg.pCont // buf's mem alread translated to transMsg.pCont
@ -321,26 +324,27 @@ void cliHandleResp(SCliConn* conn) {
if (!CONN_NO_PERSIST_BY_APP(conn)) { if (!CONN_NO_PERSIST_BY_APP(conn)) {
transMsg.info.handle = conn; transMsg.info.handle = conn;
tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn); tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
} }
// char buf[64] = {0};
tDebug("%s cli conn %p %s received from %s:%d, local info:%s:%d, msg size:%d code:0x%x", pTransInst->label, conn, // TRACE_TO_STR(&transMsg.info.traceId, buf);
TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), STraceId* trace = &transMsg.info.traceId;
taosInetNtoa(conn->localAddr.sin_addr), ntohs(conn->localAddr.sin_port), transMsg.contLen, transMsg.code); tGTrace("conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, code: %d", conn, TMSG_INFO(pHead->msgType),
taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), taosInetNtoa(conn->localAddr.sin_addr),
ntohs(conn->localAddr.sin_port), transMsg.contLen, transMsg.code);
if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) { if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
tTrace("except, server continue send while cli ignore it"); tDebug("%s except, server continue send while cli ignore it", CONN_GET_INST_LABEL(conn));
// transUnrefCliHandle(conn); // transUnrefCliHandle(conn);
return; return;
} }
if (CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) { if (CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
tTrace("except, server continue send while cli ignore it"); tDebug("%s except, server continue send while cli ignore it", CONN_GET_INST_LABEL(conn));
// transUnrefCliHandle(conn); // transUnrefCliHandle(conn);
return; return;
} }
if (cliAppCb(conn, &transMsg, pMsg) != 0) { if (cliAppCb(conn, &transMsg, pMsg) != 0) {
tTrace("try to send req to next node");
return; return;
} }
destroyCmsg(pMsg); destroyCmsg(pMsg);
@ -363,7 +367,7 @@ void cliHandleResp(SCliConn* conn) {
void cliHandleExcept(SCliConn* pConn) { void cliHandleExcept(SCliConn* pConn) {
if (transQueueEmpty(&pConn->cliMsgs)) { if (transQueueEmpty(&pConn->cliMsgs)) {
if (pConn->broken == true && CONN_NO_PERSIST_BY_APP(pConn)) { if (pConn->broken == true && CONN_NO_PERSIST_BY_APP(pConn)) {
tTrace("%s cli conn %p handle except, persist:0", CONN_GET_INST_LABEL(pConn), pConn); tTrace("%s conn %p handle except, persist:0", CONN_GET_INST_LABEL(pConn), pConn);
transUnrefCliHandle(pConn); transUnrefCliHandle(pConn);
return; return;
} }
@ -386,11 +390,11 @@ void cliHandleExcept(SCliConn* pConn) {
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
transMsg.info.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType); transMsg.info.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
tDebug("%s cli conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.info.ahandle, tDebug("%s conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.info.ahandle,
TMSG_INFO(transMsg.msgType)); TMSG_INFO(transMsg.msgType));
if (transMsg.info.ahandle == NULL) { if (transMsg.info.ahandle == NULL) {
transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType)); transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType));
tDebug("%s cli conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn, tDebug("%s conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn,
transMsg.info.ahandle); transMsg.info.ahandle);
} }
} else { } else {
@ -404,11 +408,10 @@ void cliHandleExcept(SCliConn* pConn) {
} }
} }
if (cliAppCb(pConn, &transMsg, pMsg) != 0) { if (cliAppCb(pConn, &transMsg, pMsg) != 0) {
tTrace("try to send req to next node");
return; return;
} }
destroyCmsg(pMsg); destroyCmsg(pMsg);
tTrace("%s cli conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn); tTrace("%s conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn);
} while (!transQueueEmpty(&pConn->cliMsgs)); } while (!transQueueEmpty(&pConn->cliMsgs));
transUnrefCliHandle(pConn); transUnrefCliHandle(pConn);
} }
@ -417,7 +420,7 @@ void cliTimeoutCb(uv_timer_t* handle) {
SCliThrdObj* pThrd = handle->data; SCliThrdObj* pThrd = handle->data;
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
int64_t currentTime = pThrd->nextTimeout; int64_t currentTime = pThrd->nextTimeout;
tTrace("%s, cli conn timeout, try to remove expire conn from conn pool", pTransInst->label); tTrace("%s conn timeout, try to remove expire conn from conn pool", pTransInst->label);
SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL); SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL);
while (p != NULL) { while (p != NULL) {
@ -492,7 +495,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
char key[128] = {0}; char key[128] = {0};
CONN_CONSTRUCT_HASH_KEY(key, conn->ip, conn->port); CONN_CONSTRUCT_HASH_KEY(key, conn->ip, conn->port);
tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap); tTrace("%s conn %p added to conn pool, read buf cap: %d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap);
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
// list already create before // list already create before
@ -515,10 +518,10 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
if (nread > 0) { if (nread > 0) {
pBuf->len += nread; pBuf->len += nread;
if (transReadComplete(pBuf)) { if (transReadComplete(pBuf)) {
tTrace("%s cli conn %p read complete", CONN_GET_INST_LABEL(conn), conn); tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
cliHandleResp(conn); cliHandleResp(conn);
} else { } else {
tTrace("%s cli conn %p read partial packet, continue to read", CONN_GET_INST_LABEL(conn), conn); tTrace("%s conn %p read partial packet, continue to read", CONN_GET_INST_LABEL(conn), conn);
} }
return; return;
} }
@ -528,11 +531,11 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
// ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb // ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb
// nread might be 0, which does not indicate an error or EOF. This is equivalent to EAGAIN or EWOULDBLOCK under // nread might be 0, which does not indicate an error or EOF. This is equivalent to EAGAIN or EWOULDBLOCK under
// read(2). // read(2).
tTrace("%s cli conn %p read empty", CONN_GET_INST_LABEL(conn), conn); tTrace("%s conn %p read empty", CONN_GET_INST_LABEL(conn), conn);
return; return;
} }
if (nread < 0) { if (nread < 0) {
tError("%s cli conn %p read error: %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread)); tError("%s conn %p read error: %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread));
conn->broken = true; conn->broken = true;
cliHandleExcept(conn); cliHandleExcept(conn);
} }
@ -557,7 +560,7 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) {
return conn; return conn;
} }
static void cliDestroyConn(SCliConn* conn, bool clear) { static void cliDestroyConn(SCliConn* conn, bool clear) {
tTrace("%s cli conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
QUEUE_REMOVE(&conn->conn); QUEUE_REMOVE(&conn->conn);
if (clear) { if (clear) {
@ -570,7 +573,7 @@ static void cliDestroy(uv_handle_t* handle) {
taosMemoryFree(conn->stream); taosMemoryFree(conn->stream);
transCtxCleanup(&conn->ctx); transCtxCleanup(&conn->ctx);
transQueueDestroy(&conn->cliMsgs); transQueueDestroy(&conn->cliMsgs);
tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
transDestroyBuffer(&conn->readBuf); transDestroyBuffer(&conn->readBuf);
taosMemoryFree(conn); taosMemoryFree(conn);
} }
@ -597,14 +600,14 @@ static void cliSendCb(uv_write_t* req, int status) {
SCliConn* pConn = req->data; SCliConn* pConn = req->data;
if (status == 0) { if (status == 0) {
tTrace("%s cli conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn); tTrace("%s conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn);
} else { } else {
tError("%s cli conn %p failed to write: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status)); tError("%s conn %p failed to write: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status));
cliHandleExcept(pConn); cliHandleExcept(pConn);
return; return;
} }
if (cliHandleNoResp(pConn) == true) { if (cliHandleNoResp(pConn) == true) {
tTrace("%s cli conn %p no resp required", CONN_GET_INST_LABEL(pConn), pConn); tTrace("%s conn %p no resp required", CONN_GET_INST_LABEL(pConn), pConn);
return; return;
} }
uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb); uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb);
@ -640,11 +643,16 @@ void cliSend(SCliConn* pConn) {
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user)); memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
pHead->traceId = pMsg->info.traceId;
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn,
TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), // char buf[64] = {0};
taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port)); // TRACE_TO_STR(&pMsg->info.traceId, buf);
STraceId* trace = &pMsg->info.traceId;
tGTrace("conn %p %s is sent to %s:%d, local info %s:%d", pConn, TMSG_INFO(pHead->msgType),
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr),
ntohs(pConn->localAddr.sin_port));
if (pHead->persist == 1) { if (pHead->persist == 1) {
CONN_SET_PERSIST_BY_APP(pConn); CONN_SET_PERSIST_BY_APP(pConn);
@ -662,7 +670,7 @@ void cliConnCb(uv_connect_t* req, int status) {
// impl later // impl later
SCliConn* pConn = req->data; SCliConn* pConn = req->data;
if (status != 0) { if (status != 0) {
tError("%s cli conn %p failed to connect server: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status)); tError("%s conn %p failed to connect server: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status));
cliHandleExcept(pConn); cliHandleExcept(pConn);
return; return;
} }
@ -672,7 +680,7 @@ void cliConnCb(uv_connect_t* req, int status) {
addrlen = sizeof(pConn->localAddr); addrlen = sizeof(pConn->localAddr);
uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->localAddr, &addrlen); uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->localAddr, &addrlen);
tTrace("%s cli conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn); tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
assert(pConn->stream == req->handle); assert(pConn->stream == req->handle);
cliSend(pConn); cliSend(pConn);
@ -691,7 +699,7 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
} }
static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SCliConn* conn = pMsg->msg.info.handle; SCliConn* conn = pMsg->msg.info.handle;
tDebug("%s cli conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn); tDebug("%s conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn);
if (T_REF_VAL_GET(conn) == 2) { if (T_REF_VAL_GET(conn) == 2) {
transUnrefCliHandle(conn); transUnrefCliHandle(conn);
@ -716,15 +724,15 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
if (pMsg->msg.info.handle != NULL) { if (pMsg->msg.info.handle != NULL) {
conn = (SCliConn*)(pMsg->msg.info.handle); conn = (SCliConn*)(pMsg->msg.info.handle);
if (conn != NULL) { if (conn != NULL) {
tTrace("%s cli conn %p reused", CONN_GET_INST_LABEL(conn), conn); tTrace("%s conn %p reused", CONN_GET_INST_LABEL(conn), conn);
} }
} else { } else {
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet)); conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet));
if (conn != NULL) { if (conn != NULL) {
tTrace("%s cli conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn); tTrace("%s conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn);
} else { } else {
tTrace("not found conn in conn pool %p", pThrd->pool); tTrace("%s not found conn in conn pool %p", ((STrans*)pThrd->pTransInst)->label, pThrd->pool);
} }
} }
return conn; return conn;
@ -750,6 +758,8 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr); cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr);
transPrintEpSet(&pCtx->epSet);
SCliConn* conn = cliGetConn(pMsg, pThrd); SCliConn* conn = cliGetConn(pMsg, pThrd);
if (conn != NULL) { if (conn != NULL) {
conn->hThrdIdx = pCtx->hThrdIdx; conn->hThrdIdx = pCtx->hThrdIdx;
@ -768,11 +778,11 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
int ret = transSetConnOption((uv_tcp_t*)conn->stream); int ret = transSetConnOption((uv_tcp_t*)conn->stream);
if (ret) { if (ret) {
tError("%s cli conn %p failed to set conn option, errmsg %s", pTransInst->label, conn, uv_err_name(ret)); tError("%s conn %p failed to set conn option, errmsg %s", pTransInst->label, conn, uv_err_name(ret));
} }
int fd = taosCreateSocketWithTimeOutOpt(TRANS_CONN_TIMEOUT); int fd = taosCreateSocketWithTimeOutOpt(TRANS_CONN_TIMEOUT);
if (fd == -1) { if (fd == -1) {
tTrace("%s cli conn %p failed to create socket", pTransInst->label, conn); tTrace("%s conn %p failed to create socket", pTransInst->label, conn);
cliHandleExcept(conn); cliHandleExcept(conn);
return; return;
} }
@ -782,10 +792,10 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
addr.sin_family = AF_INET; addr.sin_family = AF_INET;
addr.sin_addr.s_addr = taosGetIpv4FromFqdn(conn->ip); addr.sin_addr.s_addr = taosGetIpv4FromFqdn(conn->ip);
addr.sin_port = (uint16_t)htons((uint16_t)conn->port); addr.sin_port = (uint16_t)htons((uint16_t)conn->port);
tTrace("%s cli conn %p try to connect to %s:%d", pTransInst->label, conn, conn->ip, conn->port); tTrace("%s conn %p try to connect to %s:%d", pTransInst->label, conn, conn->ip, conn->port);
ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
if (ret != 0) { if (ret != 0) {
tTrace("%s cli conn %p failed to connect to %s:%d, reason: %s", pTransInst->label, conn, conn->ip, conn->port, tTrace("%s conn %p failed to connect to %s:%d, reason: %s", pTransInst->label, conn, conn->ip, conn->port,
uv_err_name(ret)); uv_err_name(ret));
cliHandleExcept(conn); cliHandleExcept(conn);
return; return;
@ -944,14 +954,13 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
if (pMsg == NULL || pMsg->ctx == NULL) { if (pMsg == NULL || pMsg->ctx == NULL) {
tTrace("%s cli conn %p handle resp", pTransInst->label, pConn); tTrace("%s conn %p handle resp", pTransInst->label, pConn);
pTransInst->cfp(pTransInst->parent, pResp, NULL); pTransInst->cfp(pTransInst->parent, pResp, NULL);
return 0; return 0;
} }
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
SEpSet* pEpSet = &pCtx->epSet; SEpSet* pEpSet = &pCtx->epSet;
transPrintEpSet(pEpSet);
if (pCtx->retryCount == 0) { if (pCtx->retryCount == 0) {
pCtx->origEpSet = pCtx->epSet; pCtx->origEpSet = pCtx->epSet;
@ -964,6 +973,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
(pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pResp->code == TSDB_CODE_APP_NOT_READY || (pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pResp->code == TSDB_CODE_APP_NOT_READY ||
pResp->code == TSDB_CODE_NODE_NOT_DEPLOYED || pResp->code == TSDB_CODE_SYN_NOT_LEADER)) { pResp->code == TSDB_CODE_NODE_NOT_DEPLOYED || pResp->code == TSDB_CODE_SYN_NOT_LEADER)) {
pMsg->sent = 0; pMsg->sent = 0;
tTrace("try to send req to next node");
pMsg->st = taosGetTimestampUs(); pMsg->st = taosGetTimestampUs();
pCtx->retryCount += 1; pCtx->retryCount += 1;
if (pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { if (pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
@ -974,22 +984,27 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
arg->param1 = pMsg; arg->param1 = pMsg;
arg->param2 = pThrd; arg->param2 = pThrd;
transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL); transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
tTrace("use local epset, current in use: %d, retry count:%d, limit: %d", pEpSet->inUse, pCtx->retryCount + 1, transPrintEpSet(pEpSet);
pEpSet->numOfEps * 3); tTrace("%s use local epset, inUse: %d, retry count:%d, limit: %d", pTransInst->label, pEpSet->inUse,
pCtx->retryCount + 1, pEpSet->numOfEps * 3);
transUnrefCliHandle(pConn); transUnrefCliHandle(pConn);
return -1; return -1;
} }
} else if (pCtx->retryCount < TRANS_RETRY_COUNT_LIMIT) { } else if (pCtx->retryCount < TRANS_RETRY_COUNT_LIMIT) {
if (pResp->contLen == 0) { if (pResp->contLen == 0) {
pEpSet->inUse = (++pEpSet->inUse) % pEpSet->numOfEps; pEpSet->inUse = (++pEpSet->inUse) % pEpSet->numOfEps;
tTrace("use local epset, current in use: %d, retry count:%d, limit: %d", pEpSet->inUse, pCtx->retryCount + 1, transPrintEpSet(&pCtx->epSet);
TRANS_RETRY_COUNT_LIMIT); tTrace("%s use local epset, inUse: %d, retry count:%d, limit: %d", pTransInst->label, pEpSet->inUse,
pCtx->retryCount + 1, TRANS_RETRY_COUNT_LIMIT);
} else { } else {
SEpSet epSet = {0}; SEpSet epSet = {0};
tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet); tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet);
pCtx->epSet = epSet; pCtx->epSet = epSet;
tTrace("use remote epset, current in use: %d, retry count:%d, limit: %d", pEpSet->inUse, pCtx->retryCount + 1,
TRANS_RETRY_COUNT_LIMIT); transPrintEpSet(&pCtx->epSet);
tTrace("%s use remote epset, inUse: %d, retry count:%d, limit: %d", pTransInst->label, pEpSet->inUse,
pCtx->retryCount + 1, TRANS_RETRY_COUNT_LIMIT);
} }
addConnToPool(pThrd->pool, pConn); addConnToPool(pThrd->pool, pConn);
@ -1001,17 +1016,18 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
} }
} }
STraceId* trace = &pResp->info.traceId;
if (pCtx->pSem != NULL) { if (pCtx->pSem != NULL) {
tTrace("%s cli conn %p(sync) handle resp", pTransInst->label, pConn); tGTrace("conn %p(sync) handle resp", pConn);
if (pCtx->pRsp == NULL) { if (pCtx->pRsp == NULL) {
tTrace("%s cli conn %p(sync) failed to resp, ignore", pTransInst->label, pConn); tGTrace("conn %p(sync) failed to resp, ignore", pConn);
} else { } else {
memcpy((char*)pCtx->pRsp, (char*)pResp, sizeof(*pResp)); memcpy((char*)pCtx->pRsp, (char*)pResp, sizeof(*pResp));
} }
tsem_post(pCtx->pSem); tsem_post(pCtx->pSem);
pCtx->pRsp = NULL; pCtx->pRsp = NULL;
} else { } else {
tTrace("%s cli conn %p handle resp", pTransInst->label, pConn); tGTrace("conn %p handle resp", pConn);
if (pResp->code != 0 || pCtx->retryCount == 0 || transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) { if (pResp->code != 0 || pCtx->retryCount == 0 || transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) {
pTransInst->cfp(pTransInst->parent, pResp, NULL); pTransInst->cfp(pTransInst->parent, pResp, NULL);
} else { } else {
@ -1039,6 +1055,7 @@ void transRefCliHandle(void* handle) {
return; return;
} }
int ref = T_REF_INC((SCliConn*)handle); int ref = T_REF_INC((SCliConn*)handle);
tTrace("%s conn %p ref %d", CONN_GET_INST_LABEL((SCliConn*)handle), handle, ref);
UNUSED(ref); UNUSED(ref);
} }
void transUnrefCliHandle(void* handle) { void transUnrefCliHandle(void* handle) {
@ -1046,7 +1063,7 @@ void transUnrefCliHandle(void* handle) {
return; return;
} }
int ref = T_REF_DEC((SCliConn*)handle); int ref = T_REF_DEC((SCliConn*)handle);
tDebug("%s cli conn %p ref %d", CONN_GET_INST_LABEL((SCliConn*)handle), handle, ref); tTrace("%s conn %p ref %d", CONN_GET_INST_LABEL((SCliConn*)handle), handle, ref);
if (ref == 0) { if (ref == 0) {
cliDestroyConn((SCliConn*)handle, true); cliDestroyConn((SCliConn*)handle, true);
} }
@ -1067,16 +1084,17 @@ void transReleaseCliHandle(void* handle) {
void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
STrans* pTransInst = (STrans*)shandle; STrans* pTransInst = (STrans*)shandle;
int index = CONN_HOST_THREAD_INDEX((SCliConn*)pReq->info.handle); int idx = CONN_HOST_THREAD_IDX((SCliConn*)pReq->info.handle);
if (index == -1) { if (idx == -1) {
index = cliRBChoseIdx(pTransInst); idx = cliRBChoseIdx(pTransInst);
} }
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
pCtx->epSet = *pEpSet; pCtx->epSet = *pEpSet;
pCtx->ahandle = pReq->info.ahandle; pCtx->ahandle = pReq->info.ahandle;
pCtx->msgType = pReq->msgType; pCtx->msgType = pReq->msgType;
pCtx->hThrdIdx = index; pCtx->hThrdIdx = idx;
if (ctx != NULL) { if (ctx != NULL) {
pCtx->appCtx = *ctx; pCtx->appCtx = *ctx;
@ -1089,27 +1107,30 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra
cliMsg->st = taosGetTimestampUs(); cliMsg->st = taosGetTimestampUs();
cliMsg->type = Normal; cliMsg->type = Normal;
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[idx];
tDebug("send request at thread:%d, threadID: %08" PRId64 ", msg: %p, dst: %s:%d, app:%p", index, thrd->pid, pReq, STraceId* trace = &pReq->info.traceId;
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", pTransInst->label, thrd->pid,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
ASSERT(transSendAsync(thrd->asyncPool, &(cliMsg->q)) == 0); ASSERT(transSendAsync(thrd->asyncPool, &(cliMsg->q)) == 0);
} }
void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) { void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) {
STrans* pTransInst = (STrans*)shandle; STrans* pTransInst = (STrans*)shandle;
int index = CONN_HOST_THREAD_INDEX(pReq->info.handle); int idx = CONN_HOST_THREAD_IDX(pReq->info.handle);
if (index == -1) { if (idx == -1) {
index = cliRBChoseIdx(pTransInst); idx = cliRBChoseIdx(pTransInst);
} }
tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t)); tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t));
tsem_init(sem, 0, 0); tsem_init(sem, 0, 0);
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
pCtx->epSet = *pEpSet; pCtx->epSet = *pEpSet;
pCtx->ahandle = pReq->info.ahandle; pCtx->ahandle = pReq->info.ahandle;
pCtx->msgType = pReq->msgType; pCtx->msgType = pReq->msgType;
pCtx->hThrdIdx = index; pCtx->hThrdIdx = idx;
pCtx->pSem = sem; pCtx->pSem = sem;
pCtx->pRsp = pRsp; pCtx->pRsp = pRsp;
@ -1119,16 +1140,17 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM
cliMsg->st = taosGetTimestampUs(); cliMsg->st = taosGetTimestampUs();
cliMsg->type = Normal; cliMsg->type = Normal;
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[idx];
tDebug("send request at thread:%d, threadID:%08" PRId64 ", msg: %p, dst: %s:%d, app:%p", index, thrd->pid, pReq,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); STraceId* trace = &pReq->info.traceId;
tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", pTransInst->label, thrd->pid,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
transSendAsync(thrd->asyncPool, &(cliMsg->q)); transSendAsync(thrd->asyncPool, &(cliMsg->q));
tsem_wait(sem); tsem_wait(sem);
tsem_destroy(sem); tsem_destroy(sem);
taosMemoryFree(sem); taosMemoryFree(sem);
} }
/* /*
* *
**/ **/
@ -1151,7 +1173,7 @@ void transSetDefaultAddr(void* ahandle, const char* ip, const char* fqdn) {
cliMsg->type = Update; cliMsg->type = Update;
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i]; SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i];
tDebug("update epset at thread:%d, threadID:%08" PRId64 "", i, thrd->pid); tDebug("%s update epset at thread:%08" PRId64 "", pTransInst->label, thrd->pid);
transSendAsync(thrd->asyncPool, &(cliMsg->q)); transSendAsync(thrd->asyncPool, &(cliMsg->q));
} }

View File

@ -381,7 +381,6 @@ static void transDQTimeout(uv_timer_t* timer) {
HeapNode* minNode = heapMin(queue->heap); HeapNode* minNode = heapMin(queue->heap);
if (minNode == NULL) break; if (minNode == NULL) break;
SDelayTask* task = container_of(minNode, SDelayTask, node); SDelayTask* task = container_of(minNode, SDelayTask, node);
if (task->execTime <= current) { if (task->execTime <= current) {
heapRemove(queue->heap, minNode); heapRemove(queue->heap, minNode);
task->func(task->arg); task->func(task->arg);
@ -444,7 +443,7 @@ int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_
} }
} }
tTrace("timer %p put task into queue, timeoutMs: %" PRIu64 "", queue->timer, timeoutMs); tTrace("timer %p put task into delay queue, timeoutMs: %" PRIu64 "", queue->timer, timeoutMs);
heapInsert(queue->heap, &task->node); heapInsert(queue->heap, &task->node);
uv_timer_start(queue->timer, transDQTimeout, timeoutMs, 0); uv_timer_start(queue->timer, transDQTimeout, timeoutMs, 0);
return 0; return 0;
@ -455,11 +454,17 @@ void transPrintEpSet(SEpSet* pEpSet) {
tTrace("NULL epset"); tTrace("NULL epset");
return; return;
} }
tTrace("epset begin inUse: %d", pEpSet->inUse); char buf[512] = {0};
int len = snprintf(buf, sizeof(buf), "epset { ");
for (int i = 0; i < pEpSet->numOfEps; i++) { for (int i = 0; i < pEpSet->numOfEps; i++) {
tTrace("ip: %s, port: %d", pEpSet->eps[i].fqdn, pEpSet->eps[i].port); if (i == pEpSet->numOfEps - 1) {
len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d ", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
} else {
len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d, ", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
}
} }
tTrace("epset end"); len += snprintf(buf + len, sizeof(buf) - len, "}");
tTrace("%s, inUse: %d", buf, pEpSet->inUse);
} }
bool transEpSetIsEqual(SEpSet* a, SEpSet* b) { bool transEpSetIsEqual(SEpSet* a, SEpSet* b) {
if (a->numOfEps != b->numOfEps || a->inUse != b->inUse) { if (a->numOfEps != b->numOfEps || a->inUse != b->inUse) {

View File

@ -169,7 +169,7 @@ static bool addHandleToAcceptloop(void* arg);
conn->status = ConnRelease; \ conn->status = ConnRelease; \
transClearBuffer(&conn->readBuf); \ transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \ transFreeMsg(transContFromHead((char*)head)); \
tTrace("server conn %p received release request", conn); \ tTrace("conn %p received release request", conn); \
\ \
STransMsg tmsg = {.code = 0, .info.handle = (void*)conn, .info.ahandle = NULL}; \ STransMsg tmsg = {.code = 0, .info.handle = (void*)conn, .info.ahandle = NULL}; \
SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); \ SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); \
@ -181,7 +181,7 @@ static bool addHandleToAcceptloop(void* arg);
return; \ return; \
} \ } \
if (conn->regArg.init) { \ if (conn->regArg.init) { \
tTrace("server conn %p release, notify server app", conn); \ tTrace("conn %p release, notify server app", conn); \
STrans* pTransInst = conn->pTransInst; \ STrans* pTransInst = conn->pTransInst; \
(*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); \ (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); \
memset(&conn->regArg, 0, sizeof(conn->regArg)); \ memset(&conn->regArg, 0, sizeof(conn->regArg)); \
@ -209,25 +209,25 @@ static bool addHandleToAcceptloop(void* arg);
#define ASYNC_CHECK_HANDLE(exh1, refId) \ #define ASYNC_CHECK_HANDLE(exh1, refId) \
do { \ do { \
if (refId > 0) { \ if (refId > 0) { \
tTrace("server handle step1"); \ tTrace("handle step1"); \
SExHandle* exh2 = transAcquireExHandle(refMgt, refId); \ SExHandle* exh2 = transAcquireExHandle(refMgt, refId); \
if (exh2 == NULL || refId != exh2->refId) { \ if (exh2 == NULL || refId != exh2->refId) { \
tTrace("server handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \ tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \
exh2 ? exh2->refId : 0, refId); \ exh2 ? exh2->refId : 0, refId); \
goto _return1; \ goto _return1; \
} \ } \
} else if (refId == 0) { \ } else if (refId == 0) { \
tTrace("server handle step2"); \ tTrace("handle step2"); \
SExHandle* exh2 = transAcquireExHandle(refMgt, refId); \ SExHandle* exh2 = transAcquireExHandle(refMgt, refId); \
if (exh2 == NULL || refId != exh2->refId) { \ if (exh2 == NULL || refId != exh2->refId) { \
tTrace("server handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \ tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, refId, \
refId, exh2 ? exh2->refId : 0); \ exh2 ? exh2->refId : 0); \
goto _return1; \ goto _return1; \
} else { \ } else { \
refId = exh1->refId; \ refId = exh1->refId; \
} \ } \
} else if (refId < 0) { \ } else if (refId < 0) { \
tTrace("server handle step3"); \ tTrace("handle step3"); \
goto _return2; \ goto _return2; \
} \ } \
} while (0) } while (0)
@ -270,29 +270,28 @@ static void uvHandleReq(SSvrConn* pConn) {
transMsg.msgType = pHead->msgType; transMsg.msgType = pHead->msgType;
transMsg.code = pHead->code; transMsg.code = pHead->code;
transMsg.info.ahandle = (void*)pHead->ahandle;
transMsg.info.handle = NULL;
// transDestroyBuffer(&pConn->readBuf);
transClearBuffer(&pConn->readBuf); transClearBuffer(&pConn->readBuf);
pConn->inType = pHead->msgType; pConn->inType = pHead->msgType;
if (pConn->status == ConnNormal) { if (pConn->status == ConnNormal) {
if (pHead->persist == 1) { if (pHead->persist == 1) {
pConn->status = ConnAcquire; pConn->status = ConnAcquire;
transRefSrvHandle(pConn); transRefSrvHandle(pConn);
tDebug("server conn %p acquired by server app", pConn); tDebug("conn %p acquired by server app", pConn);
} }
} }
STraceId* trace = &pHead->traceId;
if (pConn->status == ConnNormal && pHead->noResp == 0) { if (pConn->status == ConnNormal && pHead->noResp == 0) {
transRefSrvHandle(pConn); transRefSrvHandle(pConn);
tDebug("server conn %p %s received from %s:%d, local info:%s:%d, msg size:%d code:0x%x", pConn,
TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), tGTrace("conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType),
taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen, transMsg.code); taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr),
ntohs(pConn->localAddr.sin_port), transMsg.contLen);
} else { } else {
tDebug("server conn %p %s received from %s:%d, local info:%s:%d, msg size:%d, resp:%d code:0x%x", pConn, tGTrace("conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d, code: %d", pConn,
TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen, pHead->noResp, taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen, pHead->noResp,
transMsg.code); transMsg.code);
// no ref here // no ref here
} }
@ -300,11 +299,14 @@ static void uvHandleReq(SSvrConn* pConn) {
// 1. server application should not send resp on handle // 1. server application should not send resp on handle
// 2. once send out data, cli conn released to conn pool immediately // 2. once send out data, cli conn released to conn pool immediately
// 3. not mixed with persist // 3. not mixed with persist
transMsg.info.ahandle = (void*)pHead->ahandle;
transMsg.info.handle = (void*)transAcquireExHandle(refMgt, pConn->refId); transMsg.info.handle = (void*)transAcquireExHandle(refMgt, pConn->refId);
transMsg.info.refId = pConn->refId; transMsg.info.refId = pConn->refId;
tTrace("server handle %p conn: %p translated to app, refId: %" PRIu64 "", transMsg.info.handle, pConn, pConn->refId); transMsg.info.traceId = pHead->traceId;
tGTrace("handle %p conn: %p translated to app, refId: %" PRIu64 "", transMsg.info.handle, pConn, pConn->refId);
assert(transMsg.info.handle != NULL); assert(transMsg.info.handle != NULL);
if (pHead->noResp == 1) { if (pHead->noResp == 1) {
transMsg.info.refId = -1; transMsg.info.refId = -1;
} }
@ -328,12 +330,12 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
SConnBuffer* pBuf = &conn->readBuf; SConnBuffer* pBuf = &conn->readBuf;
if (nread > 0) { if (nread > 0) {
pBuf->len += nread; pBuf->len += nread;
tTrace("server conn %p read summary, total read: %d, current read: %d", conn, pBuf->len, (int)nread); tTrace("conn %p total read: %d, current read: %d", conn, pBuf->len, (int)nread);
if (transReadComplete(pBuf)) { if (transReadComplete(pBuf)) {
tTrace("server conn %p alread read complete packet", conn); tTrace("conn %p alread read complete packet", conn);
uvHandleReq(conn); uvHandleReq(conn);
} else { } else {
tTrace("server %p read partial packet, continue to read", conn); tTrace("conn %p read partial packet, continue to read", conn);
} }
return; return;
} }
@ -341,12 +343,12 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
return; return;
} }
tError("server conn %p read error: %s", conn, uv_err_name(nread)); tError("conn %p read error: %s", conn, uv_err_name(nread));
if (nread < 0) { if (nread < 0) {
conn->broken = true; conn->broken = true;
if (conn->status == ConnAcquire) { if (conn->status == ConnAcquire) {
if (conn->regArg.init) { if (conn->regArg.init) {
tTrace("server conn %p broken, notify server app", conn); tTrace("conn %p broken, notify server app", conn);
STrans* pTransInst = conn->pTransInst; STrans* pTransInst = conn->pTransInst;
(*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
memset(&conn->regArg, 0, sizeof(conn->regArg)); memset(&conn->regArg, 0, sizeof(conn->regArg));
@ -363,14 +365,14 @@ void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b
void uvOnTimeoutCb(uv_timer_t* handle) { void uvOnTimeoutCb(uv_timer_t* handle) {
// opt // opt
SSvrConn* pConn = handle->data; SSvrConn* pConn = handle->data;
tError("server conn %p time out", pConn); tError("conn %p time out", pConn);
} }
void uvOnSendCb(uv_write_t* req, int status) { void uvOnSendCb(uv_write_t* req, int status) {
SSvrConn* conn = req->data; SSvrConn* conn = req->data;
// transClearBuffer(&conn->readBuf); // transClearBuffer(&conn->readBuf);
if (status == 0) { if (status == 0) {
tTrace("server conn %p data already was written on stream", conn); tTrace("conn %p data already was written on stream", conn);
if (!transQueueEmpty(&conn->srvMsgs)) { if (!transQueueEmpty(&conn->srvMsgs)) {
SSvrMsg* msg = transQueuePop(&conn->srvMsgs); SSvrMsg* msg = transQueuePop(&conn->srvMsgs);
// if (msg->type == Release && conn->status != ConnNormal) { // if (msg->type == Release && conn->status != ConnNormal) {
@ -407,7 +409,7 @@ void uvOnSendCb(uv_write_t* req, int status) {
} }
} }
} else { } else {
tError("server conn %p failed to write data, %s", conn, uv_err_name(status)); tError("conn %p failed to write data, %s", conn, uv_err_name(status));
conn->broken = true; conn->broken = true;
transUnrefSrvHandle(conn); transUnrefSrvHandle(conn);
} }
@ -424,8 +426,6 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) {
} }
static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
tTrace("server conn %p prepare to send resp", smsg->pConn);
SSvrConn* pConn = smsg->pConn; SSvrConn* pConn = smsg->pConn;
STransMsg* pMsg = &smsg->msg; STransMsg* pMsg = &smsg->msg;
if (pMsg->pCont == 0) { if (pMsg->pCont == 0) {
@ -434,6 +434,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
} }
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
pHead->ahandle = (uint64_t)pMsg->info.ahandle; pHead->ahandle = (uint64_t)pMsg->info.ahandle;
pHead->traceId = pMsg->info.traceId;
if (pConn->status == ConnNormal) { if (pConn->status == ConnNormal) {
pHead->msgType = pConn->inType + 1; pHead->msgType = pConn->inType + 1;
@ -454,9 +455,11 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
char* msg = (char*)pHead; char* msg = (char*)pHead;
int32_t len = transMsgLenFromCont(pMsg->contLen); int32_t len = transMsgLenFromCont(pMsg->contLen);
tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d, msglen:%d", pConn, TMSG_INFO(pHead->msgType),
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr), STraceId* trace = &pMsg->info.traceId;
ntohs(pConn->localAddr.sin_port), len); tGTrace("conn %p %s is sent to %s:%d, local info: %s:%d, msglen:%d", pConn, TMSG_INFO(pHead->msgType),
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr),
ntohs(pConn->localAddr.sin_port), len);
pHead->msgLen = htonl(len); pHead->msgLen = htonl(len);
wb->base = msg; wb->base = msg;
@ -545,7 +548,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
int64_t refId = transMsg.info.refId; int64_t refId = transMsg.info.refId;
SExHandle* exh2 = transAcquireExHandle(refMgt, refId); SExHandle* exh2 = transAcquireExHandle(refMgt, refId);
if (exh2 == NULL || exh1 != exh2) { if (exh2 == NULL || exh1 != exh2) {
tTrace("server handle except msg %p, ignore it", exh1); tTrace("handle except msg %p, ignore it", exh1);
transReleaseExHandle(refMgt, refId); transReleaseExHandle(refMgt, refId);
destroySmsg(msg); destroySmsg(msg);
continue; continue;
@ -583,18 +586,18 @@ static void uvShutDownCb(uv_shutdown_t* req, int status) {
static void uvWorkDoTask(uv_work_t* req) { static void uvWorkDoTask(uv_work_t* req) {
// doing time-consumeing task // doing time-consumeing task
// only auth conn currently, add more func later // only auth conn currently, add more func later
tTrace("server conn %p start to be processed in BG Thread", req->data); tTrace("conn %p start to be processed in BG Thread", req->data);
return; return;
} }
static void uvWorkAfterTask(uv_work_t* req, int status) { static void uvWorkAfterTask(uv_work_t* req, int status) {
if (status != 0) { if (status != 0) {
tTrace("server conn %p failed to processed ", req->data); tTrace("conn %p failed to processed ", req->data);
} }
// Done time-consumeing task // Done time-consumeing task
// add more func later // add more func later
// this func called in main loop // this func called in main loop
tTrace("server conn %p already processed ", req->data); tTrace("conn %p already processed ", req->data);
taosMemoryFree(req); taosMemoryFree(req);
} }
@ -629,7 +632,7 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) {
} }
} }
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
tTrace("server connection coming"); tTrace("connection coming");
if (nread < 0) { if (nread < 0) {
if (nread != UV_EOF) { if (nread != UV_EOF) {
tError("read error %s", uv_err_name(nread)); tError("read error %s", uv_err_name(nread));
@ -678,18 +681,18 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) { if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
uv_os_fd_t fd; uv_os_fd_t fd;
uv_fileno((const uv_handle_t*)pConn->pTcp, &fd); uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
tTrace("server conn %p created, fd: %d", pConn, fd); tTrace("conn %p created, fd: %d", pConn, fd);
int addrlen = sizeof(pConn->addr); int addrlen = sizeof(pConn->addr);
if (0 != uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&pConn->addr, &addrlen)) { if (0 != uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&pConn->addr, &addrlen)) {
tError("server conn %p failed to get peer info", pConn); tError("conn %p failed to get peer info", pConn);
transUnrefSrvHandle(pConn); transUnrefSrvHandle(pConn);
return; return;
} }
addrlen = sizeof(pConn->localAddr); addrlen = sizeof(pConn->localAddr);
if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&pConn->localAddr, &addrlen)) { if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&pConn->localAddr, &addrlen)) {
tError("server conn %p failed to get local info", pConn); tError("conn %p failed to get local info", pConn);
transUnrefSrvHandle(pConn); transUnrefSrvHandle(pConn);
return; return;
} }
@ -798,7 +801,7 @@ static SSvrConn* createConn(void* hThrd) {
pConn->refId = exh->refId; pConn->refId = exh->refId;
transRefSrvHandle(pConn); transRefSrvHandle(pConn);
tTrace("server handle %p, conn %p created, refId: %" PRId64 "", exh, pConn, pConn->refId); tTrace("handle %p, conn %p created, refId: %" PRId64 "", exh, pConn, pConn->refId);
return pConn; return pConn;
} }
@ -809,7 +812,7 @@ static void destroyConn(SSvrConn* conn, bool clear) {
transDestroyBuffer(&conn->readBuf); transDestroyBuffer(&conn->readBuf);
if (clear) { if (clear) {
tTrace("server conn %p to be destroyed", conn); tTrace("conn %p to be destroyed", conn);
// uv_shutdown_t* req = taosMemoryMalloc(sizeof(uv_shutdown_t)); // uv_shutdown_t* req = taosMemoryMalloc(sizeof(uv_shutdown_t));
uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn); uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn);
// uv_close(conn->pTcp) // uv_close(conn->pTcp)
@ -845,7 +848,7 @@ static void uvDestroyConn(uv_handle_t* handle) {
transReleaseExHandle(refMgt, conn->refId); transReleaseExHandle(refMgt, conn->refId);
transRemoveExHandle(refMgt, conn->refId); transRemoveExHandle(refMgt, conn->refId);
tDebug("server conn %p destroy", conn); tDebug("conn %p destroy", conn);
// uv_timer_stop(&conn->pTimer); // uv_timer_stop(&conn->pTimer);
transQueueDestroy(&conn->srvMsgs); transQueueDestroy(&conn->srvMsgs);
@ -974,18 +977,18 @@ void uvHandleRelease(SSvrMsg* msg, SWorkThrdObj* thrd) {
uvStartSendRespInternal(msg); uvStartSendRespInternal(msg);
return; return;
} else if (conn->status == ConnRelease || conn->status == ConnNormal) { } else if (conn->status == ConnRelease || conn->status == ConnNormal) {
tDebug("server conn %p already released, ignore release-msg", conn); tDebug("conn %p already released, ignore release-msg", conn);
} }
destroySmsg(msg); destroySmsg(msg);
} }
void uvHandleResp(SSvrMsg* msg, SWorkThrdObj* thrd) { void uvHandleResp(SSvrMsg* msg, SWorkThrdObj* thrd) {
// send msg to client // send msg to client
tDebug("server conn %p start to send resp (2/2)", msg->pConn); tDebug("conn %p start to send resp (2/2)", msg->pConn);
uvStartSendResp(msg); uvStartSendResp(msg);
} }
void uvHandleRegister(SSvrMsg* msg, SWorkThrdObj* thrd) { void uvHandleRegister(SSvrMsg* msg, SWorkThrdObj* thrd) {
SSvrConn* conn = msg->pConn; SSvrConn* conn = msg->pConn;
tDebug("server conn %p register brokenlink callback", conn); tDebug("conn %p register brokenlink callback", conn);
if (conn->status == ConnAcquire) { if (conn->status == ConnAcquire) {
if (!transQueuePush(&conn->srvMsgs, msg)) { if (!transQueuePush(&conn->srvMsgs, msg)) {
return; return;
@ -994,7 +997,7 @@ void uvHandleRegister(SSvrMsg* msg, SWorkThrdObj* thrd) {
conn->regArg.notifyCount = 0; conn->regArg.notifyCount = 0;
conn->regArg.init = 1; conn->regArg.init = 1;
conn->regArg.msg = msg->msg; conn->regArg.msg = msg->msg;
tDebug("server conn %p register brokenlink callback succ", conn); tDebug("conn %p register brokenlink callback succ", conn);
if (conn->broken) { if (conn->broken) {
STrans* pTransInst = conn->pTransInst; STrans* pTransInst = conn->pTransInst;
@ -1062,7 +1065,7 @@ void transRefSrvHandle(void* handle) {
return; return;
} }
int ref = T_REF_INC((SSvrConn*)handle); int ref = T_REF_INC((SSvrConn*)handle);
tDebug("server conn %p ref count: %d", handle, ref); tDebug("conn %p ref count: %d", handle, ref);
} }
void transUnrefSrvHandle(void* handle) { void transUnrefSrvHandle(void* handle) {
@ -1070,7 +1073,7 @@ void transUnrefSrvHandle(void* handle) {
return; return;
} }
int ref = T_REF_DEC((SSvrConn*)handle); int ref = T_REF_DEC((SSvrConn*)handle);
tDebug("server conn %p ref count: %d", handle, ref); tDebug("conn %p ref count: %d", handle, ref);
if (ref == 0) { if (ref == 0) {
destroyConn((SSvrConn*)handle, true); destroyConn((SSvrConn*)handle, true);
} }
@ -1091,16 +1094,16 @@ void transReleaseSrvHandle(void* handle) {
m->msg = tmsg; m->msg = tmsg;
m->type = Release; m->type = Release;
tTrace("server conn %p start to release", exh->handle); tTrace("conn %p start to release", exh->handle);
transSendAsync(pThrd->asyncPool, &m->q); transSendAsync(pThrd->asyncPool, &m->q);
transReleaseExHandle(refMgt, refId); transReleaseExHandle(refMgt, refId);
return; return;
_return1: _return1:
tTrace("server handle %p failed to send to release handle", exh); tTrace("handle %p failed to send to release handle", exh);
transReleaseExHandle(refMgt, refId); transReleaseExHandle(refMgt, refId);
return; return;
_return2: _return2:
tTrace("server handle %p failed to send to release handle", exh); tTrace("handle %p failed to send to release handle", exh);
return; return;
} }
void transSendResponse(const STransMsg* msg) { void transSendResponse(const STransMsg* msg) {
@ -1118,17 +1121,19 @@ void transSendResponse(const STransMsg* msg) {
SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg)); SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg));
m->msg = tmsg; m->msg = tmsg;
m->type = Normal; m->type = Normal;
tDebug("server conn %p start to send resp (1/2)", exh->handle);
STraceId* trace = (STraceId*)&msg->info.traceId;
tGTrace("conn %p start to send resp (1/2)", exh->handle);
transSendAsync(pThrd->asyncPool, &m->q); transSendAsync(pThrd->asyncPool, &m->q);
transReleaseExHandle(refMgt, refId); transReleaseExHandle(refMgt, refId);
return; return;
_return1: _return1:
tTrace("server handle %p failed to send resp", exh); tTrace("handle %p failed to send resp", exh);
rpcFreeCont(msg->pCont); rpcFreeCont(msg->pCont);
transReleaseExHandle(refMgt, refId); transReleaseExHandle(refMgt, refId);
return; return;
_return2: _return2:
tTrace("server handle %p failed to send resp", exh); tTrace("handle %p failed to send resp", exh);
rpcFreeCont(msg->pCont); rpcFreeCont(msg->pCont);
return; return;
} }
@ -1146,18 +1151,19 @@ void transRegisterMsg(const STransMsg* msg) {
SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg)); SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg));
m->msg = tmsg; m->msg = tmsg;
m->type = Register; m->type = Register;
tTrace("server conn %p start to register brokenlink callback", exh->handle);
tTrace("conn %p start to register brokenlink callback", exh->handle);
transSendAsync(pThrd->asyncPool, &m->q); transSendAsync(pThrd->asyncPool, &m->q);
transReleaseExHandle(refMgt, refId); transReleaseExHandle(refMgt, refId);
return; return;
_return1: _return1:
tTrace("server handle %p failed to send to register brokenlink", exh); tTrace("handle %p failed to register brokenlink", exh);
rpcFreeCont(msg->pCont); rpcFreeCont(msg->pCont);
transReleaseExHandle(refMgt, refId); transReleaseExHandle(refMgt, refId);
return; return;
_return2: _return2:
tTrace("server handle %p failed to send to register brokenlink", exh); tTrace("handle %p failed to register brokenlink", exh);
rpcFreeCont(msg->pCont); rpcFreeCont(msg->pCont);
} }

View File

@ -56,7 +56,7 @@ typedef struct SHashEntry {
} SHashEntry; } SHashEntry;
struct SHashObj { struct SHashObj {
SHashEntry **hashList; SHashEntry ** hashList;
size_t capacity; // number of slots size_t capacity; // number of slots
int64_t size; // number of elements in hash table int64_t size; // number of elements in hash table
_hash_fn_t hashFp; // hash function _hash_fn_t hashFp; // hash function
@ -65,7 +65,7 @@ struct SHashObj {
SRWLatch lock; // read-write spin lock SRWLatch lock; // read-write spin lock
SHashLockTypeE type; // lock type SHashLockTypeE type; // lock type
bool enableUpdate; // enable update bool enableUpdate; // enable update
SArray *pMemBlock; // memory block allocated for SHashEntry SArray * pMemBlock; // memory block allocated for SHashEntry
_hash_before_fn_t callbackFp; // function invoked before return the value to caller _hash_before_fn_t callbackFp; // function invoked before return the value to caller
}; };
@ -633,7 +633,7 @@ void taosHashTableResize(SHashObj *pHashObj) {
} }
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
void *pNewEntryList = taosMemoryRealloc(pHashObj->hashList, sizeof(void *) * newCapacity); void * pNewEntryList = taosMemoryRealloc(pHashObj->hashList, sizeof(void *) * newCapacity);
if (pNewEntryList == NULL) { if (pNewEntryList == NULL) {
// uDebug("cache resize failed due to out of memory, capacity remain:%zu", pHashObj->capacity); // uDebug("cache resize failed due to out of memory, capacity remain:%zu", pHashObj->capacity);
return; return;
@ -642,7 +642,7 @@ void taosHashTableResize(SHashObj *pHashObj) {
pHashObj->hashList = pNewEntryList; pHashObj->hashList = pNewEntryList;
size_t inc = newCapacity - pHashObj->capacity; size_t inc = newCapacity - pHashObj->capacity;
void *p = taosMemoryCalloc(inc, sizeof(SHashEntry)); void * p = taosMemoryCalloc(inc, sizeof(SHashEntry));
for (int32_t i = 0; i < inc; ++i) { for (int32_t i = 0; i < inc; ++i) {
pHashObj->hashList[i + pHashObj->capacity] = (void *)((char *)p + i * sizeof(SHashEntry)); pHashObj->hashList[i + pHashObj->capacity] = (void *)((char *)p + i * sizeof(SHashEntry));
@ -653,9 +653,9 @@ void taosHashTableResize(SHashObj *pHashObj) {
pHashObj->capacity = newCapacity; pHashObj->capacity = newCapacity;
for (int32_t idx = 0; idx < pHashObj->capacity; ++idx) { for (int32_t idx = 0; idx < pHashObj->capacity; ++idx) {
SHashEntry *pe = pHashObj->hashList[idx]; SHashEntry *pe = pHashObj->hashList[idx];
SHashNode *pNode; SHashNode * pNode;
SHashNode *pNext; SHashNode * pNext;
SHashNode *pPrev = NULL; SHashNode * pPrev = NULL;
if (pe->num == 0) { if (pe->num == 0) {
assert(pe->next == NULL); assert(pe->next == NULL);

79
source/util/src/ttrace.c Normal file
View File

@ -0,0 +1,79 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "ttrace.h"
#include "taos.h"
#include "thash.h"
#include "tuuid.h"
// clang-format off
//static TdThreadOnce init = PTHREAD_ONCE_INIT;
//static void * ids = NULL;
//static TdThreadMutex mtx;
//
//void traceInit() {
// ids = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
// taosThreadMutexInit(&mtx, NULL);
//}
//void traceCreateEnv() {
// taosThreadOnce(&init, traceInit);
//}
//void traceDestroyEnv() {
// taosThreadMutexDestroy(&mtx);
// taosHashCleanup(ids);
//}
//
//STraceId traceInitId(STraceSubId *h, STraceSubId *l) {
// STraceId id = *h;
// id = ((id << 32) & 0xFFFFFFFF) | ((*l) & 0xFFFFFFFF);
// return id;
//}
//void traceId2Str(STraceId *id, char *buf) {
// int32_t f = (*id >> 32) & 0xFFFFFFFF;
// int32_t s = (*id) & 0xFFFFFFFF;
// sprintf(buf, "%d:%d", f, s);
//}
//
//void traceSetSubId(STraceId *id, STraceSubId *subId) {
// int32_t parent = ((*id >> 32) & 0xFFFFFFFF);
// taosThreadMutexLock(&mtx);
// taosHashPut(ids, subId, sizeof(*subId), &parent, sizeof(parent));
// taosThreadMutexUnlock(&mtx);
//}
//
//STraceSubId traceGetParentId(STraceId *id) {
// int32_t parent = ((*id >> 32) & 0xFFFFFFFF);
// taosThreadMutexLock(&mtx);
// STraceSubId *p = taosHashGet(ids, (void *)&parent, sizeof(parent));
// parent = *p;
// taosThreadMutexUnlock(&mtx);
//
// return parent;
//}
//
//STraceSubId traceGenSubId() {
// return tGenIdPI32();
//}
//void traceSetRootId(STraceId *traceid, int64_t rootId) {
// traceId->rootId = rootId;
//}
//int64_t traceGetRootId(STraceId *traceId);
//
//void traceSetMsgId(STraceId *traceid, int64_t msgId);
//int64_t traceGetMsgId(STraceId *traceid);
//
//char *trace2Str(STraceId *id);
//
//void traceSetSubId(STraceId *id, int32_t *subId);
// clang-format on

View File

@ -49,7 +49,7 @@ int64_t tGenIdPI64(void) {
} }
int64_t id; int64_t id;
while (true) { while (true) {
int64_t ts = taosGetTimestampMs(); int64_t ts = taosGetTimestampMs();
uint64_t pid = taosGetPId(); uint64_t pid = taosGetPId();