refactor: add gtid for trace
This commit is contained in:
parent
54cb6b3e85
commit
ca0d858a00
|
@ -28,10 +28,10 @@ static inline void vmSendRsp(SRpcMsg *pMsg, int32_t code) {
|
|||
}
|
||||
|
||||
static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||
SVnodeMgmt *pMgmt = pInfo->ahandle;
|
||||
int32_t code = -1;
|
||||
SVnodeMgmt *pMgmt = pInfo->ahandle;
|
||||
int32_t code = -1;
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
|
||||
STraceId *trace = &pMsg->info.traceId;
|
||||
dGTrace("msg:%p, get from vnode-mgmt queue", pMsg);
|
||||
switch (pMsg->msgType) {
|
||||
case TDMT_MON_VM_INFO:
|
||||
|
@ -48,50 +48,52 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
|||
break;
|
||||
default:
|
||||
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||
dError("msg:%p, not processed in vnode-mgmt queue", pMsg);
|
||||
dGError("msg:%p, not processed in vnode-mgmt queue", pMsg);
|
||||
}
|
||||
|
||||
if (IsReq(pMsg)) {
|
||||
if (code != 0) {
|
||||
if (terrno != 0) code = terrno;
|
||||
dError("msg:%p, failed to process since %s", pMsg, terrstr());
|
||||
dGError("msg:%p, failed to process since %s", pMsg, terrstr());
|
||||
}
|
||||
vmSendRsp(pMsg, code);
|
||||
}
|
||||
|
||||
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
|
||||
dGTrace("msg:%p, is freed, code:0x%x", pMsg, code);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||
SVnodeObj *pVnode = pInfo->ahandle;
|
||||
SVnodeObj *pVnode = pInfo->ahandle;
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
|
||||
dTrace("vgId:%d, msg:%p get from vnode-query queue", pVnode->vgId, pMsg);
|
||||
dGTrace("vgId:%d, msg:%p get from vnode-query queue", pVnode->vgId, pMsg);
|
||||
int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, pMsg);
|
||||
if (code != 0) {
|
||||
if (terrno != 0) code = terrno;
|
||||
dError("vgId:%d, msg:%p failed to query since %s", pVnode->vgId, pMsg, terrstr());
|
||||
dGError("vgId:%d, msg:%p failed to query since %s", pVnode->vgId, pMsg, terrstr());
|
||||
vmSendRsp(pMsg, code);
|
||||
}
|
||||
|
||||
dTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
|
||||
dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||
SVnodeObj *pVnode = pInfo->ahandle;
|
||||
SVnodeObj *pVnode = pInfo->ahandle;
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
|
||||
dTrace("vgId:%d, msg:%p get from vnode-fetch queue", pVnode->vgId, pMsg);
|
||||
dGTrace("vgId:%d, msg:%p get from vnode-fetch queue", pVnode->vgId, pMsg);
|
||||
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
|
||||
if (code != 0) {
|
||||
if (terrno != 0) code = terrno;
|
||||
dError("vgId:%d, msg:%p failed to fetch since %s", pVnode->vgId, pMsg, terrstr());
|
||||
dGError("vgId:%d, msg:%p failed to fetch since %s", pVnode->vgId, pMsg, terrstr());
|
||||
vmSendRsp(pMsg, code);
|
||||
}
|
||||
|
||||
dTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
|
||||
dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
@ -102,16 +104,17 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
|
|||
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
|
||||
dTrace("vgId:%d, msg:%p get from vnode-sync queue", pVnode->vgId, pMsg);
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
dGTrace("vgId:%d, msg:%p get from vnode-sync queue", pVnode->vgId, pMsg);
|
||||
|
||||
int32_t code = vnodeProcessSyncReq(pVnode->pImpl, pMsg, NULL);
|
||||
if (code != 0) {
|
||||
if (terrno != 0) code = terrno;
|
||||
dError("vgId:%d, msg:%p failed to sync since %s", pVnode->vgId, pMsg, terrstr());
|
||||
dGError("vgId:%d, msg:%p failed to sync since %s", pVnode->vgId, pMsg, terrstr());
|
||||
vmSendRsp(pMsg, code);
|
||||
}
|
||||
|
||||
dTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
|
||||
dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
@ -123,55 +126,57 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
|
|||
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
|
||||
dTrace("vgId:%d, msg:%p get from vnode-merge queue", pVnode->vgId, pMsg);
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
dGTrace("vgId:%d, msg:%p get from vnode-merge queue", pVnode->vgId, pMsg);
|
||||
|
||||
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
|
||||
if (code != 0) {
|
||||
if (terrno != 0) code = terrno;
|
||||
dError("vgId:%d, msg:%p failed to merge since %s", pVnode->vgId, pMsg, terrstr());
|
||||
dGError("vgId:%d, msg:%p failed to merge since %s", pVnode->vgId, pMsg, terrstr());
|
||||
vmSendRsp(pMsg, code);
|
||||
}
|
||||
|
||||
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
|
||||
dGTrace("msg:%p, is freed, code:0x%x", pMsg, code);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) {
|
||||
SMsgHead *pHead = pMsg->pCont;
|
||||
int32_t code = 0;
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
SMsgHead *pHead = pMsg->pCont;
|
||||
int32_t code = 0;
|
||||
|
||||
pHead->contLen = ntohl(pHead->contLen);
|
||||
pHead->vgId = ntohl(pHead->vgId);
|
||||
|
||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
|
||||
if (pVnode == NULL) {
|
||||
dError("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s", pHead->vgId, pMsg, terrstr(),
|
||||
TMSG_INFO(pMsg->msgType));
|
||||
dGError("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s", pHead->vgId, pMsg, terrstr(),
|
||||
TMSG_INFO(pMsg->msgType));
|
||||
return terrno != 0 ? terrno : -1;
|
||||
}
|
||||
|
||||
switch (qtype) {
|
||||
case QUERY_QUEUE:
|
||||
vnodePreprocessQueryMsg(pVnode->pImpl, pMsg);
|
||||
dTrace("vgId:%d, msg:%p put into vnode-query queue", pVnode->vgId, pMsg);
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-query queue", pVnode->vgId, pMsg);
|
||||
taosWriteQitem(pVnode->pQueryQ, pMsg);
|
||||
break;
|
||||
case FETCH_QUEUE:
|
||||
dTrace("vgId:%d, msg:%p put into vnode-fetch queue", pVnode->vgId, pMsg);
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-fetch queue", pVnode->vgId, pMsg);
|
||||
taosWriteQitem(pVnode->pFetchQ, pMsg);
|
||||
break;
|
||||
case WRITE_QUEUE:
|
||||
dTrace("vgId:%d, msg:%p put into vnode-write queue", pVnode->vgId, pMsg);
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-write queue", pVnode->vgId, pMsg);
|
||||
taosWriteQitem(pVnode->pWriteQ, pMsg);
|
||||
break;
|
||||
case SYNC_QUEUE:
|
||||
dTrace("vgId:%d, msg:%p put into vnode-sync queue", pVnode->vgId, pMsg);
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-sync queue", pVnode->vgId, pMsg);
|
||||
taosWriteQitem(pVnode->pSyncQ, pMsg);
|
||||
break;
|
||||
case APPLY_QUEUE:
|
||||
dTrace("vgId:%d, msg:%p put into vnode-apply queue", pVnode->vgId, pMsg);
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-apply queue", pVnode->vgId, pMsg);
|
||||
taosWriteQitem(pVnode->pApplyQ, pMsg);
|
||||
break;
|
||||
default:
|
||||
|
@ -193,13 +198,15 @@ int32_t vmPutMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsg
|
|||
int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, FETCH_QUEUE); }
|
||||
|
||||
int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
dTrace("msg:%p, put into vnode-mgmt queue", pMsg);
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
dGTrace("msg:%p, put into vnode-mgmt queue", pMsg);
|
||||
taosWriteQitem(pMgmt->mgmtWorker.queue, pMsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t vmPutMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
dTrace("msg:%p, put into vnode-monitor queue", pMsg);
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
dGTrace("msg:%p, put into vnode-monitor queue", pMsg);
|
||||
taosWriteQitem(pMgmt->monitorWorker.queue, pMsg);
|
||||
return 0;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue