Merge pull request #19555 from taosdata/enh/TD-20047
fix: update epset on dnode info changed
This commit is contained in:
commit
dbea8004a9
|
@ -71,6 +71,7 @@ void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg);
|
||||||
void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type);
|
void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type);
|
||||||
void tmsgReportStartup(const char* name, const char* desc);
|
void tmsgReportStartup(const char* name, const char* desc);
|
||||||
void tmsgUpdateDnodeInfo(int32_t* dnodeId, int64_t* clusterId, char* fqdn, uint16_t* port);
|
void tmsgUpdateDnodeInfo(int32_t* dnodeId, int64_t* clusterId, char* fqdn, uint16_t* port);
|
||||||
|
void tmsgUpdateDnodeEpSet(SEpSet* epset);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -100,6 +100,7 @@ typedef struct {
|
||||||
bool stopped;
|
bool stopped;
|
||||||
SEpSet mnodeEps;
|
SEpSet mnodeEps;
|
||||||
SArray *dnodeEps;
|
SArray *dnodeEps;
|
||||||
|
SArray *oldDnodeEps;
|
||||||
SHashObj *dnodeHash;
|
SHashObj *dnodeHash;
|
||||||
TdThreadRwlock lock;
|
TdThreadRwlock lock;
|
||||||
SMsgCb msgCb;
|
SMsgCb msgCb;
|
||||||
|
|
|
@ -332,40 +332,48 @@ void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void dmUpdateDnodeInfo(void *data, int32_t *dnodeId, int64_t *clusterId, char *fqdn, uint16_t *port) {
|
void dmUpdateDnodeInfo(void *data, int32_t *did, int64_t *clusterId, char *fqdn, uint16_t *port) {
|
||||||
SDnodeData *pData = data;
|
SDnodeData *pData = data;
|
||||||
int32_t ret = -1;
|
int32_t dnodeId = -1;
|
||||||
|
if (did != NULL) dnodeId = *did;
|
||||||
|
|
||||||
taosThreadRwlockRdlock(&pData->lock);
|
taosThreadRwlockRdlock(&pData->lock);
|
||||||
if (*dnodeId <= 0) {
|
|
||||||
for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pData->dnodeEps); ++i) {
|
if (pData->oldDnodeEps != NULL) {
|
||||||
|
int32_t size = (int32_t)taosArrayGetSize(pData->oldDnodeEps);
|
||||||
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
|
SDnodeEp *pDnodeEp = taosArrayGet(pData->oldDnodeEps, i);
|
||||||
|
if (strcmp(pDnodeEp->ep.fqdn, fqdn) == 0 && pDnodeEp->ep.port == *port) {
|
||||||
|
dInfo("dnode:%d, update ep:%s:%u to %s:%u", dnodeId, fqdn, *port, pDnodeEp->ep.fqdn, pDnodeEp->ep.port);
|
||||||
|
tstrncpy(fqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN);
|
||||||
|
*port = pDnodeEp->ep.port;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (did != NULL && dnodeId <= 0) {
|
||||||
|
int32_t size = (int32_t)taosArrayGetSize(pData->dnodeEps);
|
||||||
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
SDnodeEp *pDnodeEp = taosArrayGet(pData->dnodeEps, i);
|
SDnodeEp *pDnodeEp = taosArrayGet(pData->dnodeEps, i);
|
||||||
if (strcmp(pDnodeEp->ep.fqdn, fqdn) == 0 && pDnodeEp->ep.port == *port) {
|
if (strcmp(pDnodeEp->ep.fqdn, fqdn) == 0 && pDnodeEp->ep.port == *port) {
|
||||||
dInfo("dnode:%s:%u, update dnodeId from %d to %d", fqdn, *port, *dnodeId, pDnodeEp->id);
|
dInfo("dnode:%s:%u, update dnodeId to dnode:%d", fqdn, *port, pDnodeEp->id);
|
||||||
*dnodeId = pDnodeEp->id;
|
*did = pDnodeEp->id;
|
||||||
if (clusterId != NULL) *clusterId = pData->clusterId;
|
if (clusterId != NULL) *clusterId = pData->clusterId;
|
||||||
ret = 0;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (ret != 0) {
|
|
||||||
dInfo("dnode:%s:%u, failed to update dnodeId:%d", fqdn, *port, *dnodeId);
|
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, dnodeId, sizeof(int32_t));
|
if (dnodeId > 0) {
|
||||||
|
SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t));
|
||||||
if (pDnodeEp) {
|
if (pDnodeEp) {
|
||||||
if (strcmp(pDnodeEp->ep.fqdn, fqdn) != 0) {
|
if (strcmp(pDnodeEp->ep.fqdn, fqdn) != 0 || pDnodeEp->ep.port != *port) {
|
||||||
dInfo("dnode:%d, update port from %s to %s", *dnodeId, fqdn, pDnodeEp->ep.fqdn);
|
dInfo("dnode:%d, update ep:%s:%u to %s:%u", dnodeId, fqdn, *port, pDnodeEp->ep.fqdn, pDnodeEp->ep.port);
|
||||||
tstrncpy(fqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN);
|
tstrncpy(fqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN);
|
||||||
}
|
|
||||||
if (pDnodeEp->ep.port != *port) {
|
|
||||||
dInfo("dnode:%d, update port from %u to %u", *dnodeId, *port, pDnodeEp->ep.port);
|
|
||||||
*port = pDnodeEp->ep.port;
|
*port = pDnodeEp->ep.port;
|
||||||
}
|
}
|
||||||
if (clusterId != NULL) *clusterId = pData->clusterId;
|
if (clusterId != NULL) *clusterId = pData->clusterId;
|
||||||
ret = 0;
|
|
||||||
} else {
|
|
||||||
dInfo("dnode:%d, failed to update dnode info", *dnodeId);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadRwlockUnlock(&pData->lock);
|
taosThreadRwlockUnlock(&pData->lock);
|
||||||
// return ret;
|
|
||||||
}
|
}
|
|
@ -742,6 +742,7 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
|
||||||
if (tDecodeSMqConsumerObj(buf, pConsumer) == NULL) {
|
if (tDecodeSMqConsumerObj(buf, pConsumer) == NULL) {
|
||||||
goto CM_DECODE_OVER;
|
goto CM_DECODE_OVER;
|
||||||
}
|
}
|
||||||
|
tmsgUpdateDnodeEpSet(&pConsumer->ep);
|
||||||
|
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
|
|
@ -180,6 +180,7 @@ static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
|
SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
|
||||||
|
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
tmsgUpdateDnodeInfo(&pDnode->id, NULL, pDnode->fqdn, &pDnode->port);
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
if (terrno != 0) {
|
if (terrno != 0) {
|
||||||
|
|
|
@ -760,6 +760,27 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
|
||||||
goto SUB_DECODE_OVER;
|
goto SUB_DECODE_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// update epset saved in mnode
|
||||||
|
if (pSub->unassignedVgs != NULL) {
|
||||||
|
int32_t size = (int32_t)taosArrayGetSize(pSub->unassignedVgs);
|
||||||
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
|
SMqVgEp *pMqVgEp = taosArrayGet(pSub->unassignedVgs, i);
|
||||||
|
tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (pSub->consumerHash != NULL) {
|
||||||
|
void *pIter = taosHashIterate(pSub->consumerHash, NULL);
|
||||||
|
while (pIter) {
|
||||||
|
SMqConsumerEp *pConsumerEp = pIter;
|
||||||
|
int32_t size = (int32_t)taosArrayGetSize(pConsumerEp->vgs);
|
||||||
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
|
SMqVgEp *pMqVgEp = taosArrayGet(pConsumerEp->vgs, i);
|
||||||
|
tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
|
||||||
|
}
|
||||||
|
pIter = taosHashIterate(pSub->consumerHash, pIter);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
SUB_DECODE_OVER:
|
SUB_DECODE_OVER:
|
||||||
|
|
|
@ -329,6 +329,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
||||||
action.pRaw = NULL;
|
action.pRaw = NULL;
|
||||||
} else if (action.actionType == TRANS_ACTION_MSG) {
|
} else if (action.actionType == TRANS_ACTION_MSG) {
|
||||||
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER);
|
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER);
|
||||||
|
tmsgUpdateDnodeEpSet(&action.epSet);
|
||||||
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER)
|
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER)
|
||||||
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgSent*/, _OVER)
|
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgSent*/, _OVER)
|
||||||
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgReceived*/, _OVER)
|
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgReceived*/, _OVER)
|
||||||
|
|
|
@ -62,3 +62,9 @@ void tmsgReportStartup(const char* name, const char* desc) { (*defaultMsgCb.repo
|
||||||
void tmsgUpdateDnodeInfo(int32_t* dnodeId, int64_t* clusterId, char* fqdn, uint16_t* port) {
|
void tmsgUpdateDnodeInfo(int32_t* dnodeId, int64_t* clusterId, char* fqdn, uint16_t* port) {
|
||||||
(*defaultMsgCb.updateDnodeInfoFp)(defaultMsgCb.data, dnodeId, clusterId, fqdn, port);
|
(*defaultMsgCb.updateDnodeInfoFp)(defaultMsgCb.data, dnodeId, clusterId, fqdn, port);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tmsgUpdateDnodeEpSet(SEpSet* epset) {
|
||||||
|
for (int32_t i = 0; i < epset->numOfEps; ++i) {
|
||||||
|
tmsgUpdateDnodeInfo(NULL, NULL, epset->eps[i].fqdn, &epset->eps[i].port);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue