fix/move-status-to-new-queue

This commit is contained in:
dmchen 2025-02-27 11:02:18 +08:00
parent 2d93259f1a
commit 031f176aa1
8 changed files with 33 additions and 7 deletions

View File

@ -31,6 +31,7 @@ typedef enum {
QUERY_QUEUE, QUERY_QUEUE,
FETCH_QUEUE, FETCH_QUEUE,
READ_QUEUE, READ_QUEUE,
STATUS_QUEUE,
WRITE_QUEUE, WRITE_QUEUE,
APPLY_QUEUE, APPLY_QUEUE,
SYNC_QUEUE, SYNC_QUEUE,

View File

@ -267,7 +267,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
code = code =
rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusInterval * 5 * 1000); rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusInterval * 5 * 1000);
if (code != 0) { if (code != 0) {
dError("failed to send status req since %s", tstrerror(code)); dError("failed to SendRecv with timeout %d status req since %s", tsStatusInterval * 5 * 1000, tstrerror(code));
return; return;
} }
@ -275,7 +275,8 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
dmRotateMnodeEpSet(pMgmt->pData); dmRotateMnodeEpSet(pMgmt->pData);
char tbuf[512]; char tbuf[512];
dmEpSetToStr(tbuf, sizeof(tbuf), &epSet); dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
dError("failed to send status req since %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code), tbuf, epSet.inUse); dInfo("Rotate mnode ep set since failed to SendRecv status req %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code),
tbuf, epSet.inUse);
} else { } else {
if (epUpdated == 1) { if (epUpdated == 1) {
dmSetMnodeEpSet(pMgmt->pData, &epSet); dmSetMnodeEpSet(pMgmt->pData, &epSet);
@ -403,7 +404,7 @@ void dmSendConfigReq(SDnodeMgmt *pMgmt) {
code = code =
rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusInterval * 5 * 1000); rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusInterval * 5 * 1000);
if (code != 0) { if (code != 0) {
dError("failed to send status req since %s", tstrerror(code)); dError("failed to SendRecv config req with timeout %d since %s", tsStatusInterval * 5 * 1000, tstrerror(code));
return; return;
} }
if (rpcRsp.code != 0) { if (rpcRsp.code != 0) {

View File

@ -32,6 +32,7 @@ typedef struct SMnodeMgmt {
SSingleWorker queryWorker; SSingleWorker queryWorker;
SSingleWorker fetchWorker; SSingleWorker fetchWorker;
SSingleWorker readWorker; SSingleWorker readWorker;
SSingleWorker statusWorker;
SSingleWorker writeWorker; SSingleWorker writeWorker;
SSingleWorker arbWorker; SSingleWorker arbWorker;
SSingleWorker syncWorker; SSingleWorker syncWorker;
@ -58,6 +59,7 @@ int32_t mmPutMsgToArbQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToSyncRdQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutMsgToSyncRdQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToStatusQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToFetchQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutMsgToFetchQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc); int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc);

View File

@ -203,7 +203,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_QUERY, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_QUERY, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_CONN, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_CONN, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutMsgToStatusQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_NOTIFY, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_NOTIFY, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;

View File

@ -133,6 +133,10 @@ int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg); return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg);
} }
int32_t mmPutMsgToStatusQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutMsgToWorker(pMgmt, &pMgmt->statusWorker, pMsg);
}
int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t code = 0; int32_t code = 0;
if (NULL == pMgmt->pMnode) { if (NULL == pMgmt->pMnode) {
@ -172,6 +176,9 @@ int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
case READ_QUEUE: case READ_QUEUE:
pWorker = &pMgmt->readWorker; pWorker = &pMgmt->readWorker;
break; break;
case STATUS_QUEUE:
pWorker = &pMgmt->statusWorker;
break;
case ARB_QUEUE: case ARB_QUEUE:
pWorker = &pMgmt->arbWorker; pWorker = &pMgmt->arbWorker;
break; break;
@ -246,6 +253,18 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
return code; return code;
} }
SSingleWorkerCfg stautsCfg = {
.min = 1,
.max = 1,
.name = "mnode-status",
.fp = (FItem)mmProcessRpcMsg,
.param = pMgmt,
};
if ((code = tSingleWorkerInit(&pMgmt->statusWorker, &stautsCfg)) != 0) {
dError("failed to start mnode-status worker since %s", tstrerror(code));
return code;
}
SSingleWorkerCfg wCfg = { SSingleWorkerCfg wCfg = {
.min = 1, .min = 1,
.max = 1, .max = 1,
@ -304,6 +323,7 @@ void mmStopWorker(SMnodeMgmt *pMgmt) {
tSingleWorkerCleanup(&pMgmt->queryWorker); tSingleWorkerCleanup(&pMgmt->queryWorker);
tSingleWorkerCleanup(&pMgmt->fetchWorker); tSingleWorkerCleanup(&pMgmt->fetchWorker);
tSingleWorkerCleanup(&pMgmt->readWorker); tSingleWorkerCleanup(&pMgmt->readWorker);
tSingleWorkerCleanup(&pMgmt->statusWorker);
tSingleWorkerCleanup(&pMgmt->writeWorker); tSingleWorkerCleanup(&pMgmt->writeWorker);
tSingleWorkerCleanup(&pMgmt->arbWorker); tSingleWorkerCleanup(&pMgmt->arbWorker);
tSingleWorkerCleanup(&pMgmt->syncWorker); tSingleWorkerCleanup(&pMgmt->syncWorker);

View File

@ -71,6 +71,7 @@ int32_t qmPutRpcMsgToQueue(SQnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
code = taosWriteQitem(pMgmt->queryWorker.queue, pMsg); code = taosWriteQitem(pMgmt->queryWorker.queue, pMsg);
return code; return code;
case READ_QUEUE: case READ_QUEUE:
case STATUS_QUEUE:
case FETCH_QUEUE: case FETCH_QUEUE:
dTrace("msg:%p, is created and will put into qnode-fetch queue, len:%d", pMsg, pRpc->contLen); dTrace("msg:%p, is created and will put into qnode-fetch queue, len:%d", pMsg, pRpc->contLen);
code = taosWriteQitem(pMgmt->fetchWorker.queue, pMsg); code = taosWriteQitem(pMgmt->fetchWorker.queue, pMsg);

View File

@ -65,7 +65,8 @@ class MndTestTrans2 : public ::testing::Test {
msgCb.sendRspFp = sendRsp; msgCb.sendRspFp = sendRsp;
msgCb.queueFps[SYNC_QUEUE] = putToQueue; msgCb.queueFps[SYNC_QUEUE] = putToQueue;
msgCb.queueFps[WRITE_QUEUE] = putToQueue; msgCb.queueFps[WRITE_QUEUE] = putToQueue;
msgCb.queueFps[READ_QUEUE] = putToQueue; msgCb.queueFps[READ_QUEUE] = putToQueue;
msgCb.queueFps[STATUS_QUEUE] = putToQueue;
msgCb.mgmt = (SMgmtWrapper *)(&msgCb); // hack msgCb.mgmt = (SMgmtWrapper *)(&msgCb); // hack
tmsgSetDefault(&msgCb); tmsgSetDefault(&msgCb);

View File

@ -50,8 +50,8 @@ extern "C" {
#define sNError(pNode, ...) if (sDebugFlag & DEBUG_ERROR) { syncPrintNodeLog("SYN ERROR ", DEBUG_ERROR, 255, true, pNode, __VA_ARGS__); } #define sNError(pNode, ...) if (sDebugFlag & DEBUG_ERROR) { syncPrintNodeLog("SYN ERROR ", DEBUG_ERROR, 255, true, pNode, __VA_ARGS__); }
#define sNWarn(pNode, ...) if (sDebugFlag & DEBUG_WARN) { syncPrintNodeLog("SYN WARN ", DEBUG_WARN, 255, true, pNode, __VA_ARGS__); } #define sNWarn(pNode, ...) if (sDebugFlag & DEBUG_WARN) { syncPrintNodeLog("SYN WARN ", DEBUG_WARN, 255, true, pNode, __VA_ARGS__); }
#define sNInfo(pNode, ...) if (sDebugFlag & DEBUG_INFO) { syncPrintNodeLog("SYN ", DEBUG_INFO, 255, true, pNode, __VA_ARGS__); } #define sNInfo(pNode, ...) if (sDebugFlag & DEBUG_INFO) { syncPrintNodeLog("SYN ", DEBUG_INFO, 255, true, pNode, __VA_ARGS__); }
#define sNDebug(pNode, ...) if (sDebugFlag & DEBUG_DEBUG) { syncPrintNodeLog("SYN ", DEBUG_DEBUG, sDebugFlag, false, pNode, __VA_ARGS__); } #define sNDebug(pNode, ...) if (sDebugFlag & DEBUG_DEBUG) { syncPrintNodeLog("SYN ", DEBUG_DEBUG, sDebugFlag, true, pNode, __VA_ARGS__); }
#define sNTrace(pNode, ...) if (sDebugFlag & DEBUG_TRACE) { syncPrintNodeLog("SYN ", DEBUG_TRACE, sDebugFlag, false, pNode, __VA_ARGS__); } #define sNTrace(pNode, ...) if (sDebugFlag & DEBUG_TRACE) { syncPrintNodeLog("SYN ", DEBUG_TRACE, sDebugFlag, true, pNode, __VA_ARGS__); }
#define sSFatal(pSender, ...) if (sDebugFlag & DEBUG_FATAL) { syncPrintSnapshotSenderLog("SYN FATAL ", DEBUG_FATAL, 255, pSender, __VA_ARGS__); } #define sSFatal(pSender, ...) if (sDebugFlag & DEBUG_FATAL) { syncPrintSnapshotSenderLog("SYN FATAL ", DEBUG_FATAL, 255, pSender, __VA_ARGS__); }
#define sSError(pSender, ...) if (sDebugFlag & DEBUG_ERROR) { syncPrintSnapshotSenderLog("SYN ERROR ", DEBUG_ERROR, 255, pSender, __VA_ARGS__); } #define sSError(pSender, ...) if (sDebugFlag & DEBUG_ERROR) { syncPrintSnapshotSenderLog("SYN ERROR ", DEBUG_ERROR, 255, pSender, __VA_ARGS__); }