add query epset
This commit is contained in:
parent
53efe75146
commit
e389dcedc8
|
@ -41,6 +41,7 @@ static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) {
|
static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) {
|
||||||
|
pMsg->info.hasEpSet = 1;
|
||||||
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
|
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
|
||||||
int32_t contLen = tSerializeSEpSet(NULL, 0, pNewEpSet);
|
int32_t contLen = tSerializeSEpSet(NULL, 0, pNewEpSet);
|
||||||
|
|
||||||
|
|
|
@ -58,7 +58,7 @@ static void *mndBuildTimerMsg(int32_t *pContLen) {
|
||||||
|
|
||||||
static void mndPullupTrans(SMnode *pMnode) {
|
static void mndPullupTrans(SMnode *pMnode) {
|
||||||
int32_t contLen = 0;
|
int32_t contLen = 0;
|
||||||
void *pReq = mndBuildTimerMsg(&contLen);
|
void * pReq = mndBuildTimerMsg(&contLen);
|
||||||
if (pReq != NULL) {
|
if (pReq != NULL) {
|
||||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
|
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
|
||||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||||
|
@ -67,14 +67,14 @@ static void mndPullupTrans(SMnode *pMnode) {
|
||||||
|
|
||||||
static void mndTtlTimer(SMnode *pMnode) {
|
static void mndTtlTimer(SMnode *pMnode) {
|
||||||
int32_t contLen = 0;
|
int32_t contLen = 0;
|
||||||
void *pReq = mndBuildTimerMsg(&contLen);
|
void * pReq = mndBuildTimerMsg(&contLen);
|
||||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen};
|
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen};
|
||||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mndCalMqRebalance(SMnode *pMnode) {
|
static void mndCalMqRebalance(SMnode *pMnode) {
|
||||||
int32_t contLen = 0;
|
int32_t contLen = 0;
|
||||||
void *pReq = mndBuildTimerMsg(&contLen);
|
void * pReq = mndBuildTimerMsg(&contLen);
|
||||||
if (pReq != NULL) {
|
if (pReq != NULL) {
|
||||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen};
|
SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen};
|
||||||
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
|
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
|
||||||
|
@ -83,7 +83,7 @@ static void mndCalMqRebalance(SMnode *pMnode) {
|
||||||
|
|
||||||
static void mndPullupTelem(SMnode *pMnode) {
|
static void mndPullupTelem(SMnode *pMnode) {
|
||||||
int32_t contLen = 0;
|
int32_t contLen = 0;
|
||||||
void *pReq = mndBuildTimerMsg(&contLen);
|
void * pReq = mndBuildTimerMsg(&contLen);
|
||||||
if (pReq != NULL) {
|
if (pReq != NULL) {
|
||||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
|
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
|
||||||
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
|
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
|
||||||
|
@ -395,7 +395,7 @@ void mndStop(SMnode *pMnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
||||||
SMnode *pMnode = pMsg->info.node;
|
SMnode * pMnode = pMsg->info.node;
|
||||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
@ -413,7 +413,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
do {
|
do {
|
||||||
char *syncNodeStr = sync2SimpleStr(pMgmt->sync);
|
char * syncNodeStr = sync2SimpleStr(pMgmt->sync);
|
||||||
static int64_t mndTick = 0;
|
static int64_t mndTick = 0;
|
||||||
if (++mndTick % 10 == 1) {
|
if (++mndTick % 10 == 1) {
|
||||||
mTrace("vgId:%d, sync trace msg:%s, %s", syncGetVgId(pMgmt->sync), TMSG_INFO(pMsg->msgType), syncNodeStr);
|
mTrace("vgId:%d, sync trace msg:%s, %s", syncGetVgId(pMgmt->sync), TMSG_INFO(pMsg->msgType), syncNodeStr);
|
||||||
|
@ -552,6 +552,7 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
|
int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
|
||||||
pMsg->info.rsp = rpcMallocCont(contLen);
|
pMsg->info.rsp = rpcMallocCont(contLen);
|
||||||
|
pMsg->info.hasEpSet = 1;
|
||||||
if (pMsg->info.rsp != NULL) {
|
if (pMsg->info.rsp != NULL) {
|
||||||
tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet);
|
tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet);
|
||||||
pMsg->info.rspLen = contLen;
|
pMsg->info.rspLen = contLen;
|
||||||
|
@ -578,7 +579,7 @@ static int32_t mndCheckMsgContent(SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
|
int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
|
||||||
SMnode *pMnode = pMsg->info.node;
|
SMnode * pMnode = pMsg->info.node;
|
||||||
const STraceId *trace = &pMsg->info.traceId;
|
const STraceId *trace = &pMsg->info.traceId;
|
||||||
|
|
||||||
MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
|
MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
|
||||||
|
@ -631,7 +632,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
|
||||||
SMonStbInfo *pStbInfo, SMonGrantInfo *pGrantInfo) {
|
SMonStbInfo *pStbInfo, SMonGrantInfo *pGrantInfo) {
|
||||||
if (mndAcquireRpcRef(pMnode) != 0) return -1;
|
if (mndAcquireRpcRef(pMnode) != 0) return -1;
|
||||||
|
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb * pSdb = pMnode->pSdb;
|
||||||
int64_t ms = taosGetTimestampMs();
|
int64_t ms = taosGetTimestampMs();
|
||||||
|
|
||||||
pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
|
pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
|
||||||
|
@ -712,7 +713,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
|
||||||
pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
|
pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
|
||||||
tstrncpy(desc.status, "unsynced", sizeof(desc.status));
|
tstrncpy(desc.status, "unsynced", sizeof(desc.status));
|
||||||
for (int32_t i = 0; i < pVgroup->replica; ++i) {
|
for (int32_t i = 0; i < pVgroup->replica; ++i) {
|
||||||
SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
|
SVnodeGid * pVgid = &pVgroup->vnodeGid[i];
|
||||||
SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
|
SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
|
||||||
pVnDesc->dnode_id = pVgid->dnodeId;
|
pVnDesc->dnode_id = pVgid->dnodeId;
|
||||||
tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->role), sizeof(pVnDesc->vnode_role));
|
tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->role), sizeof(pVnDesc->vnode_role));
|
||||||
|
|
|
@ -119,7 +119,7 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||||
SVnode *pVnode = pInfo->ahandle;
|
SVnode * pVnode = pInfo->ahandle;
|
||||||
int32_t vgId = pVnode->config.vgId;
|
int32_t vgId = pVnode->config.vgId;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SRpcMsg *pMsg = NULL;
|
SRpcMsg *pMsg = NULL;
|
||||||
|
@ -178,7 +178,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||||
for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
|
for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
|
||||||
vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", vgId, pMsg, i, newEpSet.eps[i].fqdn, newEpSet.eps[i].port);
|
vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", vgId, pMsg, i, newEpSet.eps[i].fqdn, newEpSet.eps[i].port);
|
||||||
}
|
}
|
||||||
|
pMsg->info.hasEpSet = 1;
|
||||||
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
|
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
|
||||||
tmsgSendRedirectRsp(&rsp, &newEpSet);
|
tmsgSendRedirectRsp(&rsp, &newEpSet);
|
||||||
} else {
|
} else {
|
||||||
|
@ -199,7 +199,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||||
SVnode *pVnode = pInfo->ahandle;
|
SVnode * pVnode = pInfo->ahandle;
|
||||||
int32_t vgId = pVnode->config.vgId;
|
int32_t vgId = pVnode->config.vgId;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SRpcMsg *pMsg = NULL;
|
SRpcMsg *pMsg = NULL;
|
||||||
|
@ -240,7 +240,7 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
STraceId *trace = &pMsg->info.traceId;
|
STraceId *trace = &pMsg->info.traceId;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
char *syncNodeStr = sync2SimpleStr(pVnode->sync);
|
char * syncNodeStr = sync2SimpleStr(pVnode->sync);
|
||||||
static int64_t vndTick = 0;
|
static int64_t vndTick = 0;
|
||||||
if (++vndTick % 10 == 1) {
|
if (++vndTick % 10 == 1) {
|
||||||
vGTrace("vgId:%d, sync trace msg:%s, %s", syncGetVgId(pVnode->sync), TMSG_INFO(pMsg->msgType), syncNodeStr);
|
vGTrace("vgId:%d, sync trace msg:%s, %s", syncGetVgId(pVnode->sync), TMSG_INFO(pMsg->msgType), syncNodeStr);
|
||||||
|
@ -375,7 +375,7 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||||
SVnode *pVnode = pFsm->data;
|
SVnode * pVnode = pFsm->data;
|
||||||
SSnapshot snapshot = {0};
|
SSnapshot snapshot = {0};
|
||||||
SyncIndex beginIndex = SYNC_INDEX_INVALID;
|
SyncIndex beginIndex = SYNC_INDEX_INVALID;
|
||||||
char logBuf[256] = {0};
|
char logBuf[256] = {0};
|
||||||
|
|
|
@ -1014,19 +1014,28 @@ void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) {
|
||||||
}
|
}
|
||||||
|
|
||||||
bool cliTryToExtractEpSet(STransMsg* pResp, SEpSet* dst) {
|
bool cliTryToExtractEpSet(STransMsg* pResp, SEpSet* dst) {
|
||||||
if (pResp == NULL || pResp->info.hasEpSet == 0) {
|
if ((pResp == NULL || pResp->info.hasEpSet == 0)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
// rebuild resp msg
|
||||||
|
SEpSet epset;
|
||||||
|
if (tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epset) < 0) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
tDeserializeSEpSet(pResp->pCont, pResp->contLen, dst);
|
|
||||||
int32_t tlen = tSerializeSEpSet(NULL, 0, dst);
|
int32_t tlen = tSerializeSEpSet(NULL, 0, dst);
|
||||||
|
|
||||||
int32_t bufLen = pResp->contLen - tlen;
|
char* buf = NULL;
|
||||||
char* buf = rpcMallocCont(bufLen);
|
int32_t len = pResp->contLen - tlen;
|
||||||
|
if (len != 0) {
|
||||||
memcpy(buf, (char*)pResp->pCont + tlen, bufLen);
|
buf = rpcMallocCont(len);
|
||||||
|
memcpy(buf, (char*)pResp->pCont + tlen, len);
|
||||||
|
}
|
||||||
|
rpcFreeCont(pResp->pCont);
|
||||||
|
|
||||||
pResp->pCont = buf;
|
pResp->pCont = buf;
|
||||||
pResp->contLen = bufLen;
|
pResp->contLen = len;
|
||||||
|
|
||||||
|
*dst = epset;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
||||||
|
|
Loading…
Reference in New Issue