Merge pull request #14057 from taosdata/fix/mnode
refactor: add gtid to log
This commit is contained in:
commit
55c524e811
|
@ -127,7 +127,7 @@ int32_t tEncodeSEpSet(SEncoder *pEncoder, const SEpSet *pEp) {
|
||||||
if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1;
|
if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1;
|
||||||
for (int32_t i = 0; i < TSDB_MAX_REPLICA; i++) {
|
for (int32_t i = 0; i < TSDB_MAX_REPLICA; i++) {
|
||||||
if (tEncodeU16(pEncoder, pEp->eps[i].port) < 0) return -1;
|
if (tEncodeU16(pEncoder, pEp->eps[i].port) < 0) return -1;
|
||||||
if (tEncodeCStr(pEncoder, pEp->eps[i].fqdn) < 0) return -1;
|
if (tEncodeCStrWithLen(pEncoder, pEp->eps[i].fqdn, TSDB_FQDN_LEN) < 0) return -1;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -3054,7 +3054,7 @@ int32_t tSerializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) {
|
||||||
if (tEncodeCStr(&encoder, pReq->app) < 0) return -1;
|
if (tEncodeCStr(&encoder, pReq->app) < 0) return -1;
|
||||||
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
|
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
|
||||||
if (tEncodeCStr(&encoder, pReq->user) < 0) return -1;
|
if (tEncodeCStr(&encoder, pReq->user) < 0) return -1;
|
||||||
if (tEncodeCStr(&encoder, pReq->passwd) < 0) return -1;
|
if (tEncodeCStrWithLen(&encoder, pReq->passwd, TSDB_PASSWORD_LEN) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, pReq->startTime) < 0) return -1;
|
if (tEncodeI64(&encoder, pReq->startTime) < 0) return -1;
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
|
|
@ -73,6 +73,7 @@ int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
||||||
|
const STraceId *trace = &pMsg->info.traceId;
|
||||||
SDCreateMnodeReq createReq = {0};
|
SDCreateMnodeReq createReq = {0};
|
||||||
if (tDeserializeSDCreateMnodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
|
if (tDeserializeSDCreateMnodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
@ -81,7 +82,7 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
||||||
|
|
||||||
if (createReq.replica != 1) {
|
if (createReq.replica != 1) {
|
||||||
terrno = TSDB_CODE_INVALID_OPTION;
|
terrno = TSDB_CODE_INVALID_OPTION;
|
||||||
dError("failed to create mnode since %s", terrstr());
|
dGError("failed to create mnode since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -91,7 +92,7 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
||||||
mgmt.path = pInput->path;
|
mgmt.path = pInput->path;
|
||||||
mgmt.name = pInput->name;
|
mgmt.name = pInput->name;
|
||||||
if (mmWriteFile(&mgmt, &createReq.replicas[0], deployed) != 0) {
|
if (mmWriteFile(&mgmt, &createReq.replicas[0], deployed) != 0) {
|
||||||
dError("failed to write mnode file since %s", terrstr());
|
dGError("failed to write mnode file since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -99,7 +100,8 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
||||||
SDDropMnodeReq dropReq = {0};
|
const STraceId *trace = &pMsg->info.traceId;
|
||||||
|
SDDropMnodeReq dropReq = {0};
|
||||||
if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
|
if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -107,7 +109,7 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
||||||
|
|
||||||
if (pInput->pData->dnodeId != 0 && dropReq.dnodeId != pInput->pData->dnodeId) {
|
if (pInput->pData->dnodeId != 0 && dropReq.dnodeId != pInput->pData->dnodeId) {
|
||||||
terrno = TSDB_CODE_INVALID_OPTION;
|
terrno = TSDB_CODE_INVALID_OPTION;
|
||||||
dError("failed to drop mnode since %s", terrstr());
|
dGError("failed to drop mnode since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -117,7 +119,7 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
|
||||||
mgmt.path = pInput->path;
|
mgmt.path = pInput->path;
|
||||||
mgmt.name = pInput->name;
|
mgmt.name = pInput->name;
|
||||||
if (mmWriteFile(&mgmt, NULL, deployed) != 0) {
|
if (mmWriteFile(&mgmt, NULL, deployed) != 0) {
|
||||||
dError("failed to write mnode file since %s", terrstr());
|
dGError("failed to write mnode file since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
|
|
||||||
static inline int32_t mmAcquire(SMnodeMgmt *pMgmt) {
|
static inline int32_t mmAcquire(SMnodeMgmt *pMgmt) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
taosThreadRwlockRdlock(&pMgmt->lock);
|
taosThreadRwlockRdlock(&pMgmt->lock);
|
||||||
if (pMgmt->stopped) {
|
if (pMgmt->stopped) {
|
||||||
code = -1;
|
code = -1;
|
||||||
|
@ -48,7 +47,8 @@ static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) {
|
||||||
static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
SMnodeMgmt *pMgmt = pInfo->ahandle;
|
SMnodeMgmt *pMgmt = pInfo->ahandle;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
STraceId * trace = &pMsg->info.traceId;
|
|
||||||
|
const STraceId *trace = &pMsg->info.traceId;
|
||||||
dGTrace("msg:%p, get from mnode queue", pMsg);
|
dGTrace("msg:%p, get from mnode queue", pMsg);
|
||||||
|
|
||||||
switch (pMsg->msgType) {
|
switch (pMsg->msgType) {
|
||||||
|
@ -72,7 +72,7 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
mndPostProcessQueryMsg(pMsg);
|
mndPostProcessQueryMsg(pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
|
dGTrace("msg:%p, is freed, code:0x%x", pMsg, code);
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
}
|
}
|
||||||
|
@ -80,7 +80,9 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
static void mmProcessSyncMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
static void mmProcessSyncMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
SMnodeMgmt *pMgmt = pInfo->ahandle;
|
SMnodeMgmt *pMgmt = pInfo->ahandle;
|
||||||
pMsg->info.node = pMgmt->pMnode;
|
pMsg->info.node = pMgmt->pMnode;
|
||||||
dTrace("msg:%p, get from mnode-sync queue", pMsg);
|
|
||||||
|
const STraceId *trace = &pMsg->info.traceId;
|
||||||
|
dGTrace("msg:%p, get from mnode-sync queue", pMsg);
|
||||||
|
|
||||||
SMsgHead *pHead = pMsg->pCont;
|
SMsgHead *pHead = pMsg->pCont;
|
||||||
pHead->contLen = ntohl(pHead->contLen);
|
pHead->contLen = ntohl(pHead->contLen);
|
||||||
|
@ -88,20 +90,22 @@ static void mmProcessSyncMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
|
|
||||||
int32_t code = mndProcessSyncMsg(pMsg);
|
int32_t code = mndProcessSyncMsg(pMsg);
|
||||||
|
|
||||||
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
|
dGTrace("msg:%p, is freed, code:0x%x", pMsg, code);
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pMsg) {
|
static inline int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pMsg) {
|
||||||
|
const STraceId *trace = &pMsg->info.traceId;
|
||||||
|
|
||||||
if (mmAcquire(pMgmt) == 0) {
|
if (mmAcquire(pMgmt) == 0) {
|
||||||
dTrace("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
|
dGTrace("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
|
||||||
taosWriteQitem(pWorker->queue, pMsg);
|
taosWriteQitem(pWorker->queue, pMsg);
|
||||||
mmRelease(pMgmt);
|
mmRelease(pMgmt);
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
dTrace("msg:%p, failed to put into %s queue since %s, type:%s", pMsg, pWorker->name, terrstr(),
|
dGTrace("msg:%p, failed to put into %s queue since %s, type:%s", pMsg, pWorker->name, terrstr(),
|
||||||
TMSG_INFO(pMsg->msgType));
|
TMSG_INFO(pMsg->msgType));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -121,19 +125,17 @@ int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
pMsg->info.node = pMgmt->pMnode;
|
pMsg->info.node = pMgmt->pMnode;
|
||||||
if (mndPreProcessQueryMsg(pMsg) != 0) {
|
if (mndPreProcessQueryMsg(pMsg) != 0) {
|
||||||
dError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType));
|
const STraceId *trace = &pMsg->info.traceId;
|
||||||
|
dGError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg);
|
return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mmPutMsgToFetchQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
int32_t mmPutMsgToFetchQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
pMsg->info.node = pMgmt->pMnode;
|
|
||||||
|
|
||||||
return mmPutMsgToWorker(pMgmt, &pMgmt->fetchWorker, pMsg);
|
return mmPutMsgToWorker(pMgmt, &pMgmt->fetchWorker, pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t mmPutMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
int32_t mmPutMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
return mmPutMsgToWorker(pMgmt, &pMgmt->monitorWorker, pMsg);
|
return mmPutMsgToWorker(pMgmt, &pMgmt->monitorWorker, pMsg);
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,9 +17,44 @@
|
||||||
#include "dmMgmt.h"
|
#include "dmMgmt.h"
|
||||||
#include "qworker.h"
|
#include "qworker.h"
|
||||||
|
|
||||||
static void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet);
|
static inline void dmSendRsp(SRpcMsg *pMsg) {
|
||||||
static void dmSendRsp(SRpcMsg *pMsg);
|
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
|
||||||
static void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg);
|
if (InChildProc(pWrapper)) {
|
||||||
|
dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP);
|
||||||
|
} else {
|
||||||
|
rpcSendResponse(pMsg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||||
|
SEpSet epSet = {0};
|
||||||
|
dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet);
|
||||||
|
|
||||||
|
const int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
|
||||||
|
pMsg->pCont = rpcMallocCont(contLen);
|
||||||
|
if (pMsg->pCont == NULL) {
|
||||||
|
pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
} else {
|
||||||
|
tSerializeSEpSet(pMsg->pCont, contLen, &epSet);
|
||||||
|
pMsg->contLen = contLen;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) {
|
||||||
|
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
|
||||||
|
int32_t contLen = tSerializeSEpSet(NULL, 0, pNewEpSet);
|
||||||
|
|
||||||
|
rsp.pCont = rpcMallocCont(contLen);
|
||||||
|
if (rsp.pCont == NULL) {
|
||||||
|
pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
} else {
|
||||||
|
tSerializeSEpSet(rsp.pCont, contLen, pNewEpSet);
|
||||||
|
rsp.contLen = contLen;
|
||||||
|
}
|
||||||
|
dmSendRsp(&rsp);
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
pMsg->pCont = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
|
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
|
||||||
NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
|
NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
|
||||||
|
@ -28,31 +63,38 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
dTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name);
|
const STraceId *trace = &pMsg->info.traceId;
|
||||||
|
dGTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name);
|
||||||
pMsg->info.wrapper = pWrapper;
|
pMsg->info.wrapper = pWrapper;
|
||||||
return (*msgFp)(pWrapper->pMgmt, pMsg);
|
return (*msgFp)(pWrapper->pMgmt, pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
||||||
SDnodeTrans * pTrans = &pDnode->trans;
|
SDnodeTrans *pTrans = &pDnode->trans;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
SRpcMsg * pMsg = NULL;
|
SRpcMsg *pMsg = NULL;
|
||||||
SMgmtWrapper *pWrapper = NULL;
|
SMgmtWrapper *pWrapper = NULL;
|
||||||
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
|
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
|
||||||
|
|
||||||
STraceId *trace = &pRpc->info.traceId;
|
const STraceId *trace = &pRpc->info.traceId;
|
||||||
dGTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType),
|
dGTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType),
|
||||||
pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId);
|
pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId);
|
||||||
|
|
||||||
if (pRpc->msgType == TDMT_DND_NET_TEST) {
|
switch (pRpc->msgType) {
|
||||||
dmProcessNetTestReq(pDnode, pRpc);
|
case TDMT_DND_NET_TEST:
|
||||||
return;
|
dmProcessNetTestReq(pDnode, pRpc);
|
||||||
} else if (pRpc->msgType == TDMT_MND_SYSTABLE_RETRIEVE_RSP || pRpc->msgType == TDMT_VND_FETCH_RSP) {
|
return;
|
||||||
qWorkerProcessFetchRsp(NULL, NULL, pRpc, 0);
|
case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
|
||||||
return;
|
case TDMT_VND_FETCH_RSP:
|
||||||
} else if (pRpc->msgType == TDMT_MND_STATUS_RSP && pEpSet != NULL) {
|
qWorkerProcessFetchRsp(NULL, NULL, pRpc, 0);
|
||||||
dmSetMnodeEpSet(&pDnode->data, pEpSet);
|
return;
|
||||||
} else {
|
case TDMT_MND_STATUS_RSP:
|
||||||
|
if (pEpSet != NULL) {
|
||||||
|
dmSetMnodeEpSet(&pDnode->data, pEpSet);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pDnode->status != DND_STAT_RUNNING) {
|
if (pDnode->status != DND_STAT_RUNNING) {
|
||||||
|
@ -73,39 +115,43 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
||||||
if (pHandle->defaultNtype == NODE_END) {
|
if (pHandle->defaultNtype == NODE_END) {
|
||||||
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
} else {
|
}
|
||||||
pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
|
|
||||||
if (pHandle->needCheckVgId) {
|
pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
|
||||||
if (pRpc->contLen > 0) {
|
if (pHandle->needCheckVgId) {
|
||||||
SMsgHead *pHead = pRpc->pCont;
|
if (pRpc->contLen > 0) {
|
||||||
int32_t vgId = ntohl(pHead->vgId);
|
const SMsgHead *pHead = pRpc->pCont;
|
||||||
if (vgId == QNODE_HANDLE) {
|
const int32_t vgId = ntohl(pHead->vgId);
|
||||||
|
switch (vgId) {
|
||||||
|
case QNODE_HANDLE:
|
||||||
pWrapper = &pDnode->wrappers[QNODE];
|
pWrapper = &pDnode->wrappers[QNODE];
|
||||||
} else if (vgId == SNODE_HANDLE) {
|
break;
|
||||||
|
case SNODE_HANDLE:
|
||||||
pWrapper = &pDnode->wrappers[SNODE];
|
pWrapper = &pDnode->wrappers[SNODE];
|
||||||
} else if (vgId == MNODE_HANDLE) {
|
break;
|
||||||
|
case MNODE_HANDLE:
|
||||||
pWrapper = &pDnode->wrappers[MNODE];
|
pWrapper = &pDnode->wrappers[MNODE];
|
||||||
} else {
|
break;
|
||||||
}
|
default:
|
||||||
} else {
|
break;
|
||||||
terrno = TSDB_CODE_INVALID_MSG_LEN;
|
|
||||||
goto _OVER;
|
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
terrno = TSDB_CODE_INVALID_MSG_LEN;
|
||||||
|
goto _OVER;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dmMarkWrapper(pWrapper) != 0) {
|
if (dmMarkWrapper(pWrapper) != 0) {
|
||||||
pWrapper = NULL;
|
pWrapper = NULL;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
} else {
|
|
||||||
pRpc->info.wrapper = pWrapper;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pRpc->info.wrapper = pWrapper;
|
||||||
pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
|
pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
|
||||||
if (pMsg == NULL) goto _OVER;
|
if (pMsg == NULL) goto _OVER;
|
||||||
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
|
|
||||||
|
|
||||||
dTrace("msg:%p, is created, type:%s handle:%p", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle);
|
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
|
||||||
|
dGTrace("msg:%p, is created, type:%s handle:%p", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle);
|
||||||
|
|
||||||
if (InParentProc(pWrapper)) {
|
if (InParentProc(pWrapper)) {
|
||||||
code = dmPutToProcCQueue(&pWrapper->proc, pMsg, DND_FUNC_REQ);
|
code = dmPutToProcCQueue(&pWrapper->proc, pMsg, DND_FUNC_REQ);
|
||||||
|
@ -115,13 +161,11 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
dTrace("failed to process msg:%p since %s, handle:%p", pMsg, terrstr(), pRpc->info.handle);
|
|
||||||
|
|
||||||
if (terrno != 0) code = terrno;
|
if (terrno != 0) code = terrno;
|
||||||
|
dGTrace("msg:%p, failed to process since %s", pMsg, terrstr());
|
||||||
|
|
||||||
if (IsReq(pRpc)) {
|
if (IsReq(pRpc)) {
|
||||||
SRpcMsg rsp = {.code = code, .info = pRpc->info};
|
SRpcMsg rsp = {.code = code, .info = pRpc->info};
|
||||||
|
|
||||||
if ((code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_APP_NOT_READY) && pRpc->msgType > TDMT_MND_MSG &&
|
if ((code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_APP_NOT_READY) && pRpc->msgType > TDMT_MND_MSG &&
|
||||||
pRpc->msgType < TDMT_VND_MSG) {
|
pRpc->msgType < TDMT_VND_MSG) {
|
||||||
dmBuildMnodeRedirectRsp(pDnode, &rsp);
|
dmBuildMnodeRedirectRsp(pDnode, &rsp);
|
||||||
|
@ -135,7 +179,7 @@ _OVER:
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pMsg != NULL) {
|
if (pMsg != NULL) {
|
||||||
dTrace("msg:%p, is freed", pMsg);
|
dGTrace("msg:%p, is freed", pMsg);
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
}
|
}
|
||||||
rpcFreeCont(pRpc->pCont);
|
rpcFreeCont(pRpc->pCont);
|
||||||
|
@ -149,11 +193,11 @@ int32_t dmInitMsgHandle(SDnode *pDnode) {
|
||||||
|
|
||||||
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
|
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
|
||||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
|
||||||
SArray * pArray = (*pWrapper->func.getHandlesFp)();
|
SArray *pArray = (*pWrapper->func.getHandlesFp)();
|
||||||
if (pArray == NULL) return -1;
|
if (pArray == NULL) return -1;
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
||||||
SMgmtHandle * pMgmt = taosArrayGet(pArray, i);
|
SMgmtHandle *pMgmt = taosArrayGet(pArray, i);
|
||||||
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
|
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
|
||||||
if (pMgmt->needCheckVgId) {
|
if (pMgmt->needCheckVgId) {
|
||||||
pHandle->needCheckVgId = pMgmt->needCheckVgId;
|
pHandle->needCheckVgId = pMgmt->needCheckVgId;
|
||||||
|
@ -184,45 +228,6 @@ static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline void dmSendRsp(SRpcMsg *pMsg) {
|
|
||||||
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
|
|
||||||
if (InChildProc(pWrapper)) {
|
|
||||||
dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP);
|
|
||||||
} else {
|
|
||||||
rpcSendResponse(pMsg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
|
|
||||||
SEpSet epSet = {0};
|
|
||||||
dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet);
|
|
||||||
|
|
||||||
int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
|
|
||||||
pMsg->pCont = rpcMallocCont(contLen);
|
|
||||||
if (pMsg->pCont == NULL) {
|
|
||||||
pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
} else {
|
|
||||||
tSerializeSEpSet(pMsg->pCont, contLen, &epSet);
|
|
||||||
pMsg->contLen = contLen;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) {
|
|
||||||
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
|
|
||||||
int32_t contLen = tSerializeSEpSet(NULL, 0, pNewEpSet);
|
|
||||||
|
|
||||||
rsp.pCont = rpcMallocCont(contLen);
|
|
||||||
if (rsp.pCont == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
} else {
|
|
||||||
tSerializeSEpSet(rsp.pCont, contLen, pNewEpSet);
|
|
||||||
rsp.contLen = contLen;
|
|
||||||
}
|
|
||||||
dmSendRsp(&rsp);
|
|
||||||
rpcFreeCont(pMsg->pCont);
|
|
||||||
pMsg->pCont = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) {
|
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) {
|
||||||
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
|
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
|
||||||
if (InChildProc(pWrapper)) {
|
if (InChildProc(pWrapper)) {
|
||||||
|
|
|
@ -40,18 +40,27 @@
|
||||||
#include "wal.h"
|
#include "wal.h"
|
||||||
|
|
||||||
#include "libs/function/function.h"
|
#include "libs/function/function.h"
|
||||||
// clang-format off
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}
|
// clang-format off
|
||||||
#define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}
|
|
||||||
#define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("DND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}
|
#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}
|
||||||
#define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("DND ", DEBUG_INFO, 255, __VA_ARGS__); }}
|
#define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}
|
||||||
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }}
|
#define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("DND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}
|
||||||
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }}
|
#define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("DND ", DEBUG_INFO, 255, __VA_ARGS__); }}
|
||||||
#define dGTrace(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dTrace(param ",GTID: %s", __VA_ARGS__, buf);} while(0)
|
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }}
|
||||||
|
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }}
|
||||||
|
|
||||||
|
#define dGFatal(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dFatal(param ", gtid:%s", __VA_ARGS__, buf);}
|
||||||
|
#define dGError(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dError(param ", gtid:%s", __VA_ARGS__, buf);}
|
||||||
|
#define dGWarn(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dWarn (param ", gtid:%s", __VA_ARGS__, buf);}
|
||||||
|
#define dGInfo(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dInfo (param ", gtid:%s", __VA_ARGS__, buf);}
|
||||||
|
#define dGDebug(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dDebug(param ", gtid:%s", __VA_ARGS__, buf);}
|
||||||
|
#define dGTrace(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dTrace(param ", gtid:%s", __VA_ARGS__, buf);}
|
||||||
|
|
||||||
|
// clang-format on
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
DNODE = 0,
|
DNODE = 0,
|
||||||
|
@ -185,4 +194,3 @@ void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#endif /*_TD_DM_INT_H_*/
|
#endif /*_TD_DM_INT_H_*/
|
||||||
// clang-format on
|
|
||||||
|
|
|
@ -102,6 +102,7 @@ int32_t Testbase::SendShowReq(int8_t showType, const char* tb, const char* db) {
|
||||||
ASSERT(pRsp->pCont != nullptr);
|
ASSERT(pRsp->pCont != nullptr);
|
||||||
|
|
||||||
if (pRsp->contLen == 0) return -1;
|
if (pRsp->contLen == 0) return -1;
|
||||||
|
if (pRsp->code != 0) return -1;
|
||||||
|
|
||||||
showRsp = (SRetrieveMetaTableRsp*)pRsp->pCont;
|
showRsp = (SRetrieveMetaTableRsp*)pRsp->pCont;
|
||||||
showRsp->handle = htobe64(showRsp->handle); // show Id
|
showRsp->handle = htobe64(showRsp->handle); // show Id
|
||||||
|
|
|
@ -34,13 +34,20 @@ extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// clang-format off
|
// clang-format off
|
||||||
#define mFatal(...) { if (mDebugFlag & DEBUG_FATAL) { taosPrintLog("MND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}
|
|
||||||
#define mError(...) { if (mDebugFlag & DEBUG_ERROR) { taosPrintLog("MND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}
|
#define mFatal(...) { if (mDebugFlag & DEBUG_FATAL) { taosPrintLog("MND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}
|
||||||
#define mWarn(...) { if (mDebugFlag & DEBUG_WARN) { taosPrintLog("MND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}
|
#define mError(...) { if (mDebugFlag & DEBUG_ERROR) { taosPrintLog("MND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}
|
||||||
#define mInfo(...) { if (mDebugFlag & DEBUG_INFO) { taosPrintLog("MND ", DEBUG_INFO, 255, __VA_ARGS__); }}
|
#define mWarn(...) { if (mDebugFlag & DEBUG_WARN) { taosPrintLog("MND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}
|
||||||
#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }}
|
#define mInfo(...) { if (mDebugFlag & DEBUG_INFO) { taosPrintLog("MND ", DEBUG_INFO, 255, __VA_ARGS__); }}
|
||||||
#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }}
|
#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }}
|
||||||
#define mGTrace(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mTrace(param ", GTID: %s", __VA_ARGS__, buf);} while(0)
|
#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }}
|
||||||
|
|
||||||
|
#define mGFatal(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mFatal(param ", gtid:%s", __VA_ARGS__, buf);}
|
||||||
|
#define mGError(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mError(param ", gtid:%s", __VA_ARGS__, buf);}
|
||||||
|
#define mGWarn(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mWarn (param ", gtid:%s", __VA_ARGS__, buf);}
|
||||||
|
#define mGInfo(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mInfo (param ", gtid:%s", __VA_ARGS__, buf);}
|
||||||
|
#define mGDebug(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mDebug(param ", gtid:%s", __VA_ARGS__, buf);}
|
||||||
|
#define mGTrace(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mTrace(param ", gtid:%s", __VA_ARGS__, buf);}
|
||||||
|
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
||||||
|
|
|
@ -59,22 +59,28 @@ static void *mndBuildTimerMsg(int32_t *pContLen) {
|
||||||
static void mndPullupTrans(SMnode *pMnode) {
|
static void mndPullupTrans(SMnode *pMnode) {
|
||||||
int32_t contLen = 0;
|
int32_t contLen = 0;
|
||||||
void *pReq = mndBuildTimerMsg(&contLen);
|
void *pReq = mndBuildTimerMsg(&contLen);
|
||||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
|
if (pReq != NULL) {
|
||||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
|
||||||
|
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mndCalMqRebalance(SMnode *pMnode) {
|
static void mndCalMqRebalance(SMnode *pMnode) {
|
||||||
int32_t contLen = 0;
|
int32_t contLen = 0;
|
||||||
void *pReq = mndBuildTimerMsg(&contLen);
|
void *pReq = mndBuildTimerMsg(&contLen);
|
||||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen};
|
if (pReq != NULL) {
|
||||||
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
|
SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen};
|
||||||
|
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mndPullupTelem(SMnode *pMnode) {
|
static void mndPullupTelem(SMnode *pMnode) {
|
||||||
int32_t contLen = 0;
|
int32_t contLen = 0;
|
||||||
void *pReq = mndBuildTimerMsg(&contLen);
|
void *pReq = mndBuildTimerMsg(&contLen);
|
||||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
|
if (pReq != NULL) {
|
||||||
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
|
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
|
||||||
|
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mndPushTtlTime(SMnode *pMnode) {
|
static void mndPushTtlTime(SMnode *pMnode) {
|
||||||
|
@ -89,10 +95,11 @@ static void mndPushTtlTime(SMnode *pMnode) {
|
||||||
int32_t contLen = sizeof(SMsgHead) + sizeof(int32_t);
|
int32_t contLen = sizeof(SMsgHead) + sizeof(int32_t);
|
||||||
SMsgHead *pHead = rpcMallocCont(contLen);
|
SMsgHead *pHead = rpcMallocCont(contLen);
|
||||||
if (pHead == NULL) {
|
if (pHead == NULL) {
|
||||||
mError("ttl time malloc err. contLen:%d", contLen);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
pHead->contLen = htonl(contLen);
|
pHead->contLen = htonl(contLen);
|
||||||
pHead->vgId = htonl(pVgroup->vgId);
|
pHead->vgId = htonl(pVgroup->vgId);
|
||||||
|
|
||||||
|
@ -100,13 +107,13 @@ static void mndPushTtlTime(SMnode *pMnode) {
|
||||||
*(int32_t *)(POINTER_SHIFT(pHead, sizeof(SMsgHead))) = htonl(t);
|
*(int32_t *)(POINTER_SHIFT(pHead, sizeof(SMsgHead))) = htonl(t);
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {.msgType = TDMT_VND_DROP_TTL_TABLE, .pCont = pHead, .contLen = contLen};
|
SRpcMsg rpcMsg = {.msgType = TDMT_VND_DROP_TTL_TABLE, .pCont = pHead, .contLen = contLen};
|
||||||
|
|
||||||
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
int32_t code = tmsgSendReq(&epSet, &rpcMsg);
|
int32_t code = tmsgSendReq(&epSet, &rpcMsg);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
mError("ttl time seed err. code:%d", code);
|
mError("failed to send ttl time seed msg, code:0x%x", code);
|
||||||
|
} else {
|
||||||
|
mInfo("send ttl time seed msg, time:%d", t);
|
||||||
}
|
}
|
||||||
mError("ttl time seed succ. time:%d", t);
|
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -117,7 +124,7 @@ static void *mndThreadFp(void *param) {
|
||||||
setThreadName("mnode-timer");
|
setThreadName("mnode-timer");
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
if (lastTime % (864000) == 0) { // sleep 1 day for ttl
|
if (lastTime % 864000 == 0) {
|
||||||
mndPushTtlTime(pMnode);
|
mndPushTtlTime(pMnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -549,23 +556,25 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
||||||
static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
|
static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
|
||||||
if (!IsReq(pMsg)) return 0;
|
if (!IsReq(pMsg)) return 0;
|
||||||
if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0;
|
if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0;
|
||||||
|
if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
|
||||||
|
pMsg->msgType == TDMT_MND_TRANS_TIMER) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
if (pMsg->msgType != TDMT_MND_MQ_TIMER && pMsg->msgType != TDMT_MND_TELEM_TIMER &&
|
const STraceId *trace = &pMsg->info.traceId;
|
||||||
pMsg->msgType != TDMT_MND_TRANS_TIMER) {
|
mGError("msg:%p, failed to check mnode state since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType));
|
||||||
mError("msg:%p, failed to check mnode state since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType));
|
|
||||||
|
|
||||||
SEpSet epSet = {0};
|
SEpSet epSet = {0};
|
||||||
mndGetMnodeEpSet(pMsg->info.node, &epSet);
|
mndGetMnodeEpSet(pMsg->info.node, &epSet);
|
||||||
|
|
||||||
int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
|
int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
|
||||||
pMsg->info.rsp = rpcMallocCont(contLen);
|
pMsg->info.rsp = rpcMallocCont(contLen);
|
||||||
if (pMsg->info.rsp != NULL) {
|
if (pMsg->info.rsp != NULL) {
|
||||||
tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet);
|
tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet);
|
||||||
pMsg->info.rspLen = contLen;
|
pMsg->info.rspLen = contLen;
|
||||||
terrno = TSDB_CODE_RPC_REDIRECT;
|
terrno = TSDB_CODE_RPC_REDIRECT;
|
||||||
} else {
|
} else {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -575,17 +584,20 @@ static int32_t mndCheckMsgContent(SRpcMsg *pMsg) {
|
||||||
if (!IsReq(pMsg)) return 0;
|
if (!IsReq(pMsg)) return 0;
|
||||||
if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0;
|
if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0;
|
||||||
|
|
||||||
mError("msg:%p, failed to check msg, cont:%p contLen:%d, app:%p type:%s", pMsg, pMsg->pCont, pMsg->contLen,
|
const STraceId *trace = &pMsg->info.traceId;
|
||||||
|
mGError("msg:%p, failed to check msg, cont:%p contLen:%d, app:%p type:%s", pMsg, pMsg->pCont, pMsg->contLen,
|
||||||
pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
|
pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
|
||||||
terrno = TSDB_CODE_INVALID_MSG_LEN;
|
terrno = TSDB_CODE_INVALID_MSG_LEN;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
|
int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
|
||||||
SMnode *pMnode = pMsg->info.node;
|
SMnode *pMnode = pMsg->info.node;
|
||||||
|
const STraceId *trace = &pMsg->info.traceId;
|
||||||
|
|
||||||
MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
|
MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
|
||||||
if (fp == NULL) {
|
if (fp == NULL) {
|
||||||
mError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
|
mGError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
|
||||||
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -593,18 +605,17 @@ int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
|
||||||
if (mndCheckMsgContent(pMsg) != 0) return -1;
|
if (mndCheckMsgContent(pMsg) != 0) return -1;
|
||||||
if (mndCheckMnodeState(pMsg) != 0) return -1;
|
if (mndCheckMnodeState(pMsg) != 0) return -1;
|
||||||
|
|
||||||
STraceId *trace = &pMsg->info.traceId;
|
|
||||||
mGTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
|
mGTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
|
||||||
int32_t code = (*fp)(pMsg);
|
int32_t code = (*fp)(pMsg);
|
||||||
mndReleaseRpcRef(pMnode);
|
mndReleaseRpcRef(pMnode);
|
||||||
|
|
||||||
if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
mTrace("msg:%p, won't response immediately since in progress", pMsg);
|
mGTrace("msg:%p, won't response immediately since in progress", pMsg);
|
||||||
} else if (code == 0) {
|
} else if (code == 0) {
|
||||||
mTrace("msg:%p, successfully processed", pMsg);
|
mGTrace("msg:%p, successfully processed", pMsg);
|
||||||
} else {
|
} else {
|
||||||
mError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
|
mGError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
|
||||||
TMSG_INFO(pMsg->msgType));
|
TMSG_INFO(pMsg->msgType));
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
@ -620,7 +631,6 @@ void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
|
||||||
// Note: uid 0 is reserved
|
// Note: uid 0 is reserved
|
||||||
int64_t mndGenerateUid(char *name, int32_t len) {
|
int64_t mndGenerateUid(char *name, int32_t len) {
|
||||||
int32_t hashval = MurmurHash3_32(name, len);
|
int32_t hashval = MurmurHash3_32(name, len);
|
||||||
|
|
||||||
do {
|
do {
|
||||||
int64_t us = taosGetTimestampUs();
|
int64_t us = taosGetTimestampUs();
|
||||||
int64_t x = (us & 0x000000FFFFFFFFFF) << 24;
|
int64_t x = (us & 0x000000FFFFFFFFFF) << 24;
|
||||||
|
|
|
@ -122,30 +122,33 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType
|
||||||
int32_t pid, const char *app, int64_t startTime) {
|
int32_t pid, const char *app, int64_t startTime) {
|
||||||
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
||||||
|
|
||||||
char connStr[255] = {0};
|
char connStr[255] = {0};
|
||||||
int32_t len = snprintf(connStr, sizeof(connStr), "%s%d%d%d%s", user, ip, port, pid, app);
|
int32_t len = snprintf(connStr, sizeof(connStr), "%s%d%d%d%s", user, ip, port, pid, app);
|
||||||
uint32_t connId = mndGenerateUid(connStr, len);
|
uint32_t connId = mndGenerateUid(connStr, len);
|
||||||
if (startTime == 0) startTime = taosGetTimestampMs();
|
if (startTime == 0) startTime = taosGetTimestampMs();
|
||||||
|
|
||||||
SConnObj connObj = {.id = connId,
|
SConnObj connObj = {
|
||||||
.connType = connType,
|
.id = connId,
|
||||||
.appStartTimeMs = startTime,
|
.connType = connType,
|
||||||
.pid = pid,
|
.appStartTimeMs = startTime,
|
||||||
.ip = ip,
|
.pid = pid,
|
||||||
.port = port,
|
.ip = ip,
|
||||||
.killed = 0,
|
.port = port,
|
||||||
.loginTimeMs = taosGetTimestampMs(),
|
.killed = 0,
|
||||||
.lastAccessTimeMs = 0,
|
.loginTimeMs = taosGetTimestampMs(),
|
||||||
.killId = 0,
|
.lastAccessTimeMs = 0,
|
||||||
.numOfQueries = 0,
|
.killId = 0,
|
||||||
.pQueries = NULL};
|
.numOfQueries = 0,
|
||||||
|
.pQueries = NULL,
|
||||||
|
};
|
||||||
|
|
||||||
connObj.lastAccessTimeMs = connObj.loginTimeMs;
|
connObj.lastAccessTimeMs = connObj.loginTimeMs;
|
||||||
tstrncpy(connObj.user, user, TSDB_USER_LEN);
|
tstrncpy(connObj.user, user, TSDB_USER_LEN);
|
||||||
tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN);
|
tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN);
|
||||||
|
|
||||||
int32_t keepTime = tsShellActivityTimer * 3;
|
int32_t keepTime = tsShellActivityTimer * 3;
|
||||||
SConnObj *pConn = taosCachePut(pMgmt->connCache, &connId, sizeof(uint32_t), &connObj, sizeof(connObj), keepTime * 1000);
|
SConnObj *pConn =
|
||||||
|
taosCachePut(pMgmt->connCache, &connId, sizeof(uint32_t), &connObj, sizeof(connObj), keepTime * 1000);
|
||||||
if (pConn == NULL) {
|
if (pConn == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
mError("conn:%d, failed to put into cache since %s, user:%s", connId, user, terrstr());
|
mError("conn:%d, failed to put into cache since %s, user:%s", connId, user, terrstr());
|
||||||
|
@ -174,7 +177,6 @@ static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pConn->lastAccessTimeMs = taosGetTimestampMs();
|
pConn->lastAccessTimeMs = taosGetTimestampMs();
|
||||||
|
|
||||||
mTrace("conn:%u, acquired from cache, data:%p", pConn->id, pConn);
|
mTrace("conn:%u, acquired from cache, data:%p", pConn->id, pConn);
|
||||||
return pConn;
|
return pConn;
|
||||||
}
|
}
|
||||||
|
@ -207,13 +209,14 @@ static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
|
static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
SUserObj *pUser = NULL;
|
SUserObj *pUser = NULL;
|
||||||
SDbObj *pDb = NULL;
|
SDbObj *pDb = NULL;
|
||||||
SConnObj *pConn = NULL;
|
SConnObj *pConn = NULL;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
SConnectReq connReq = {0};
|
SConnectReq connReq = {0};
|
||||||
char ip[30] = {0};
|
char ip[30] = {0};
|
||||||
|
const STraceId *trace = &pReq->info.traceId;
|
||||||
|
|
||||||
if (tDeserializeSConnectReq(pReq->pCont, pReq->contLen, &connReq) != 0) {
|
if (tDeserializeSConnectReq(pReq->pCont, pReq->contLen, &connReq) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
@ -224,11 +227,11 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
|
pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
|
||||||
if (pUser == NULL) {
|
if (pUser == NULL) {
|
||||||
mError("user:%s, failed to login while acquire user since %s", pReq->info.conn.user, terrstr());
|
mGError("user:%s, failed to login while acquire user since %s", pReq->info.conn.user, terrstr());
|
||||||
goto CONN_OVER;
|
goto CONN_OVER;
|
||||||
}
|
}
|
||||||
if (0 != strncmp(connReq.passwd, pUser->pass, TSDB_PASSWORD_LEN - 1)) {
|
if (0 != strncmp(connReq.passwd, pUser->pass, TSDB_PASSWORD_LEN - 1)) {
|
||||||
mError("user:%s, failed to auth while acquire user, input:%s", pReq->info.conn.user, connReq.passwd);
|
mGError("user:%s, failed to auth while acquire user, input:%s", pReq->info.conn.user, connReq.passwd);
|
||||||
code = TSDB_CODE_RPC_AUTH_FAILURE;
|
code = TSDB_CODE_RPC_AUTH_FAILURE;
|
||||||
goto CONN_OVER;
|
goto CONN_OVER;
|
||||||
}
|
}
|
||||||
|
@ -239,8 +242,8 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
|
||||||
pDb = mndAcquireDb(pMnode, db);
|
pDb = mndAcquireDb(pMnode, db);
|
||||||
if (pDb == NULL) {
|
if (pDb == NULL) {
|
||||||
terrno = TSDB_CODE_MND_INVALID_DB;
|
terrno = TSDB_CODE_MND_INVALID_DB;
|
||||||
mError("user:%s, failed to login from %s while use db:%s since %s", pReq->info.conn.user, ip, connReq.db,
|
mGError("user:%s, failed to login from %s while use db:%s since %s", pReq->info.conn.user, ip, connReq.db,
|
||||||
terrstr());
|
terrstr());
|
||||||
goto CONN_OVER;
|
goto CONN_OVER;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -248,7 +251,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
|
||||||
pConn = mndCreateConn(pMnode, pReq->info.conn.user, connReq.connType, pReq->info.conn.clientIp,
|
pConn = mndCreateConn(pMnode, pReq->info.conn.user, connReq.connType, pReq->info.conn.clientIp,
|
||||||
pReq->info.conn.clientPort, connReq.pid, connReq.app, connReq.startTime);
|
pReq->info.conn.clientPort, connReq.pid, connReq.app, connReq.startTime);
|
||||||
if (pConn == NULL) {
|
if (pConn == NULL) {
|
||||||
mError("user:%s, failed to login from %s while create connection since %s", pReq->info.conn.user, ip, terrstr());
|
mGError("user:%s, failed to login from %s while create connection since %s", pReq->info.conn.user, ip, terrstr());
|
||||||
goto CONN_OVER;
|
goto CONN_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -273,7 +276,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
|
||||||
pReq->info.rspLen = contLen;
|
pReq->info.rspLen = contLen;
|
||||||
pReq->info.rsp = pRsp;
|
pReq->info.rsp = pRsp;
|
||||||
|
|
||||||
mDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->info.conn.user, ip, pConn->port, pConn->id, connReq.app);
|
mGDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->info.conn.user, ip, pConn->port, pConn->id, connReq.app);
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
||||||
|
@ -302,7 +305,7 @@ static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SAppObj *mndCreateApp(SMnode *pMnode, uint32_t clientIp, SAppHbReq* pReq) {
|
static SAppObj *mndCreateApp(SMnode *pMnode, uint32_t clientIp, SAppHbReq *pReq) {
|
||||||
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
||||||
|
|
||||||
SAppObj app;
|
SAppObj app;
|
||||||
|
@ -314,22 +317,19 @@ static SAppObj *mndCreateApp(SMnode *pMnode, uint32_t clientIp, SAppHbReq* pReq)
|
||||||
memcpy(&app.summary, &pReq->summary, sizeof(pReq->summary));
|
memcpy(&app.summary, &pReq->summary, sizeof(pReq->summary));
|
||||||
app.lastAccessTimeMs = taosGetTimestampMs();
|
app.lastAccessTimeMs = taosGetTimestampMs();
|
||||||
|
|
||||||
int32_t keepTime = tsShellActivityTimer * 3;
|
const int32_t keepTime = tsShellActivityTimer * 3;
|
||||||
SAppObj *pApp = taosCachePut(pMgmt->appCache, &pReq->appId, sizeof(pReq->appId), &app, sizeof(app), keepTime * 1000);
|
SAppObj *pApp = taosCachePut(pMgmt->appCache, &pReq->appId, sizeof(pReq->appId), &app, sizeof(app), keepTime * 1000);
|
||||||
if (pApp == NULL) {
|
if (pApp == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
mError("failed to app %" PRIx64 " into cache since %s", pReq->appId, terrstr());
|
mError("failed to app %" PRIx64 " into cache since %s", pReq->appId, terrstr());
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
mTrace("app %" PRIx64 " is put into cache", pReq->appId);
|
mTrace("app %" PRIx64 " is put into cache", pReq->appId);
|
||||||
return pApp;
|
return pApp;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mndFreeApp(SAppObj *pApp) {
|
static void mndFreeApp(SAppObj *pApp) { mTrace("app %" PRIx64 " is destroyed", pApp->appId); }
|
||||||
mTrace("app %" PRIx64 " is destroyed", pApp->appId);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static SAppObj *mndAcquireApp(SMnode *pMnode, int64_t appId) {
|
static SAppObj *mndAcquireApp(SMnode *pMnode, int64_t appId) {
|
||||||
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
||||||
|
@ -356,7 +356,7 @@ static void mndReleaseApp(SMnode *pMnode, SAppObj *pApp) {
|
||||||
|
|
||||||
void *mndGetNextApp(SMnode *pMnode, SCacheIter *pIter) {
|
void *mndGetNextApp(SMnode *pMnode, SCacheIter *pIter) {
|
||||||
SAppObj *pApp = NULL;
|
SAppObj *pApp = NULL;
|
||||||
bool hasNext = taosCacheIterNext(pIter);
|
bool hasNext = taosCacheIterNext(pIter);
|
||||||
if (hasNext) {
|
if (hasNext) {
|
||||||
size_t dataLen = 0;
|
size_t dataLen = 0;
|
||||||
pApp = taosCacheIterGetData(pIter, &dataLen);
|
pApp = taosCacheIterGetData(pIter, &dataLen);
|
||||||
|
@ -439,8 +439,8 @@ static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndUpdateAppInfo(SMnode *pMnode, SClientHbReq *pHbReq, SRpcConnInfo *connInfo) {
|
static int32_t mndUpdateAppInfo(SMnode *pMnode, SClientHbReq *pHbReq, SRpcConnInfo *connInfo) {
|
||||||
SAppHbReq* pReq = &pHbReq->app;
|
SAppHbReq *pReq = &pHbReq->app;
|
||||||
SAppObj *pApp = mndAcquireApp(pMnode, pReq->appId);
|
SAppObj *pApp = mndAcquireApp(pMnode, pReq->appId);
|
||||||
if (pApp == NULL) {
|
if (pApp == NULL) {
|
||||||
pApp = mndCreateApp(pMnode, connInfo->clientIp, pReq);
|
pApp = mndCreateApp(pMnode, connInfo->clientIp, pReq);
|
||||||
if (pApp == NULL) {
|
if (pApp == NULL) {
|
||||||
|
@ -448,7 +448,7 @@ static int32_t mndUpdateAppInfo(SMnode *pMnode, SClientHbReq *pHbReq, SRpcConnIn
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
mDebug("a new app %" PRIx64 "created", pReq->appId);
|
mDebug("a new app %" PRIx64 "created", pReq->appId);
|
||||||
mndReleaseApp(pMnode, pApp);
|
mndReleaseApp(pMnode, pApp);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -464,7 +464,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
|
||||||
SClientHbBatchRsp *pBatchRsp) {
|
SClientHbBatchRsp *pBatchRsp) {
|
||||||
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
||||||
SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL};
|
SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL};
|
||||||
SRpcConnInfo connInfo = pMsg->info.conn;
|
SRpcConnInfo connInfo = pMsg->info.conn;
|
||||||
|
|
||||||
mndUpdateAppInfo(pMnode, pHbReq, &connInfo);
|
mndUpdateAppInfo(pMnode, pHbReq, &connInfo);
|
||||||
|
|
||||||
|
@ -637,9 +637,9 @@ static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
mInfo("kill query msg is received, queryId:%s", killReq.queryStrId);
|
mInfo("kill query msg is received, queryId:%s", killReq.queryStrId);
|
||||||
int32_t connId = 0;
|
int32_t connId = 0;
|
||||||
uint64_t queryId = 0;
|
uint64_t queryId = 0;
|
||||||
char* p = strchr(killReq.queryStrId, ':');
|
char *p = strchr(killReq.queryStrId, ':');
|
||||||
if (NULL == p) {
|
if (NULL == p) {
|
||||||
mError("invalid query id %s", killReq.queryStrId);
|
mError("invalid query id %s", killReq.queryStrId);
|
||||||
terrno = TSDB_CODE_MND_INVALID_QUERY_ID;
|
terrno = TSDB_CODE_MND_INVALID_QUERY_ID;
|
||||||
|
@ -853,12 +853,12 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndRetrieveApps(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
static int32_t mndRetrieveApps(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
int32_t cols = 0;
|
int32_t cols = 0;
|
||||||
SAppObj *pApp = NULL;
|
SAppObj *pApp = NULL;
|
||||||
|
|
||||||
if (pShow->pIter == NULL) {
|
if (pShow->pIter == NULL) {
|
||||||
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
||||||
pShow->pIter = taosCacheCreateIter(pMgmt->appCache);
|
pShow->pIter = taosCacheCreateIter(pMgmt->appCache);
|
||||||
|
@ -931,7 +931,6 @@ static int32_t mndRetrieveApps(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlo
|
||||||
return numOfRows;
|
return numOfRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
|
static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
|
||||||
if (pIter != NULL) {
|
if (pIter != NULL) {
|
||||||
taosCacheDestroyIter(pIter);
|
taosCacheDestroyIter(pIter);
|
||||||
|
|
|
@ -26,13 +26,21 @@ extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// clang-format off
|
// clang-format off
|
||||||
#define vFatal(...) do { if (vDebugFlag & DEBUG_FATAL) { taosPrintLog("VND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
|
|
||||||
#define vError(...) do { if (vDebugFlag & DEBUG_ERROR) { taosPrintLog("VND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
|
#define vFatal(...) { if (vDebugFlag & DEBUG_FATAL) { taosPrintLog("VND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}
|
||||||
#define vWarn(...) do { if (vDebugFlag & DEBUG_WARN) { taosPrintLog("VND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
|
#define vError(...) { if (vDebugFlag & DEBUG_ERROR) { taosPrintLog("VND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}
|
||||||
#define vInfo(...) do { if (vDebugFlag & DEBUG_INFO) { taosPrintLog("VND ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
|
#define vWarn(...) { if (vDebugFlag & DEBUG_WARN) { taosPrintLog("VND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}
|
||||||
#define vDebug(...) do { if (vDebugFlag & DEBUG_DEBUG) { taosPrintLog("VND ", DEBUG_DEBUG, vDebugFlag, __VA_ARGS__); }} while(0)
|
#define vInfo(...) { if (vDebugFlag & DEBUG_INFO) { taosPrintLog("VND ", DEBUG_INFO, 255, __VA_ARGS__); }}
|
||||||
#define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", DEBUG_TRACE, vDebugFlag, __VA_ARGS__); }} while(0)
|
#define vDebug(...) { if (vDebugFlag & DEBUG_DEBUG) { taosPrintLog("VND ", DEBUG_DEBUG, vDebugFlag, __VA_ARGS__); }}
|
||||||
#define vGTrace(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vTrace(param " GTID: %s", __VA_ARGS__, buf);} while(0)//#define vDye(...) do
|
#define vTrace(...) { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", DEBUG_TRACE, vDebugFlag, __VA_ARGS__); }}
|
||||||
|
|
||||||
|
#define vGFatal(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vFatal(param ", gtid:%s", __VA_ARGS__, buf);}
|
||||||
|
#define vGError(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vError(param ", gtid:%s", __VA_ARGS__, buf);}
|
||||||
|
#define vGWarn(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vWarn (param ", gtid:%s", __VA_ARGS__, buf);}
|
||||||
|
#define vGInfo(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vInfo (param ", gtid:%s", __VA_ARGS__, buf);}
|
||||||
|
#define vGDebug(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vDebug(param ", gtid:%s", __VA_ARGS__, buf);}
|
||||||
|
#define vGTrace(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vTrace(param ", gtid:%s", __VA_ARGS__, buf);}
|
||||||
|
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
||||||
// vnodeCfg.c
|
// vnodeCfg.c
|
||||||
|
|
|
@ -24,16 +24,16 @@ extern "C" {
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
#include "ttrace.h"
|
#include "ttrace.h"
|
||||||
|
|
||||||
#define tFatal(...) do {if (rpcDebugFlag & DEBUG_FATAL){ taosPrintLog("RPC FATAL ", DEBUG_FATAL, rpcDebugFlag, __VA_ARGS__); }} while (0)
|
#define tFatal(...) { if (rpcDebugFlag & DEBUG_FATAL) { taosPrintLog("RPC FATAL ", DEBUG_FATAL, rpcDebugFlag, __VA_ARGS__); }}
|
||||||
#define tError(...)do { if (rpcDebugFlag & DEBUG_ERROR){ taosPrintLog("RPC ERROR ", DEBUG_ERROR, rpcDebugFlag, __VA_ARGS__); } } while(0)
|
#define tError(...) { if (rpcDebugFlag & DEBUG_ERROR ){ taosPrintLog("RPC ERROR ", DEBUG_ERROR, rpcDebugFlag, __VA_ARGS__); }}
|
||||||
#define tWarn(...) do { if (rpcDebugFlag & DEBUG_WARN) { taosPrintLog("RPC WARN ", DEBUG_WARN, rpcDebugFlag, __VA_ARGS__); }} while(0)
|
#define tWarn(...) { if (rpcDebugFlag & DEBUG_WARN) { taosPrintLog("RPC WARN ", DEBUG_WARN, rpcDebugFlag, __VA_ARGS__); }}
|
||||||
#define tInfo(...) do { if (rpcDebugFlag & DEBUG_INFO) { taosPrintLog("RPC ", DEBUG_INFO, rpcDebugFlag, __VA_ARGS__); }} while(0)
|
#define tInfo(...) { if (rpcDebugFlag & DEBUG_INFO) { taosPrintLog("RPC ", DEBUG_INFO, rpcDebugFlag, __VA_ARGS__); }}
|
||||||
#define tDebug(...) do {if (rpcDebugFlag & DEBUG_DEBUG){ taosPrintLog("RPC ", DEBUG_DEBUG, rpcDebugFlag, __VA_ARGS__); }} while(0)
|
#define tDebug(...) { if (rpcDebugFlag & DEBUG_DEBUG) { taosPrintLog("RPC ", DEBUG_DEBUG, rpcDebugFlag, __VA_ARGS__); }}
|
||||||
#define tTrace(...) do {if (rpcDebugFlag & DEBUG_TRACE){ taosPrintLog("RPC ", DEBUG_TRACE, rpcDebugFlag, __VA_ARGS__); }} while(0)
|
#define tTrace(...) { if (rpcDebugFlag & DEBUG_TRACE) { taosPrintLog("RPC ", DEBUG_TRACE, rpcDebugFlag, __VA_ARGS__); }}
|
||||||
#define tDump(x, y) do {if (rpcDebugFlag & DEBUG_DUMP) { taosDumpData((unsigned char *)x, y); } } while(0)
|
#define tDump(x, y) { if (rpcDebugFlag & DEBUG_DUMP) { taosDumpData((unsigned char *)x, y); } }
|
||||||
|
|
||||||
//#define tTR(param, ...) do { char buf[40] = {0};TRACE_TO_STR(trace, buf);tTrace("TRID: %s "param, buf, __VA_ARGS__);} while(0)
|
//#define tTR(param, ...) do { char buf[40] = {0};TRACE_TO_STR(trace, buf);tTrace("TRID: %s "param, buf, __VA_ARGS__);} while(0)
|
||||||
#define tGTrace(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); tTrace(param ", GTID: %s", __VA_ARGS__, buf);} while(0)
|
#define tGTrace(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); tTrace(param ", gtid:%s", __VA_ARGS__, buf);}
|
||||||
|
|
||||||
// clang-format on
|
// clang-format on
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
Loading…
Reference in New Issue