From 64b0c3a02202c7c462f9653e1bbdd756f6fdb7de Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 20 Apr 2022 14:21:44 +0800 Subject: [PATCH] refactor: check db options --- include/common/tmsg.h | 29 +++- source/common/src/tmsg.c | 81 ++++++++++ source/dnode/mgmt/mgmt_vnode/inc/vmInt.h | 3 - source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 170 +++++--------------- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 9 -- source/dnode/mnode/impl/src/mndDb.c | 77 +++++++-- 6 files changed, 213 insertions(+), 156 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index f773aabd70..27911bbcda 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -823,7 +823,7 @@ typedef struct { SReplica replicas[TSDB_MAX_REPLICA]; int32_t numOfRetensions; SArray* pRetensions; // SRetention -} SCreateVnodeReq, SAlterVnodeReq; +} SCreateVnodeReq; int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq); int32_t tDeserializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq); @@ -834,11 +834,36 @@ typedef struct { int32_t dnodeId; int64_t dbUid; char db[TSDB_DB_FNAME_LEN]; -} SDropVnodeReq, SSyncVnodeReq, SCompactVnodeReq; +} SDropVnodeReq; int32_t tSerializeSDropVnodeReq(void* buf, int32_t bufLen, SDropVnodeReq* pReq); int32_t tDeserializeSDropVnodeReq(void* buf, int32_t bufLen, SDropVnodeReq* pReq); +typedef struct { + int64_t dbUid; + char db[TSDB_DB_FNAME_LEN]; +} SCompactVnodeReq; + +int32_t tSerializeSCompactVnodeReq(void* buf, int32_t bufLen, SCompactVnodeReq* pReq); +int32_t tDeserializeSCompactVnodeReq(void* buf, int32_t bufLen, SCompactVnodeReq* pReq); + +typedef struct { + int32_t vgVersion; + int32_t totalBlocks; + int32_t daysToKeep0; + int32_t daysToKeep1; + int32_t daysToKeep2; + int8_t walLevel; + int8_t strict; + int8_t cacheLastRow; + int8_t replica; + int8_t selfIndex; + SReplica replicas[TSDB_MAX_REPLICA]; +} SAlterVnodeReq; + +int32_t tSerializeSAlterVnodeReq(void* buf, int32_t bufLen, SAlterVnodeReq* pReq); +int32_t tDeserializeSAlterVnodeReq(void* buf, int32_t bufLen, SAlterVnodeReq* pReq); + typedef struct { SMsgHead header; char dbFName[TSDB_DB_FNAME_LEN]; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index e5cd334e46..dd8ded0347 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2954,6 +2954,87 @@ int32_t tDeserializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq return 0; } +int32_t tSerializeSCompactVnodeReq(void *buf, int32_t bufLen, SCompactVnodeReq *pReq) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->db) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSCompactVnodeReq(void *buf, int32_t bufLen, SCompactVnodeReq *pReq) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1; + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} + + +int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI32(&encoder, pReq->vgVersion) < 0) return -1; + if (tEncodeI32(&encoder, pReq->totalBlocks) < 0) return -1; + if (tEncodeI32(&encoder, pReq->daysToKeep0) < 0) return -1; + if (tEncodeI32(&encoder, pReq->daysToKeep1) < 0) return -1; + if (tEncodeI32(&encoder, pReq->daysToKeep2) < 0) return -1; + if (tEncodeI8(&encoder, pReq->walLevel) < 0) return -1; + if (tEncodeI8(&encoder, pReq->strict) < 0) return -1; + if (tEncodeI8(&encoder, pReq->cacheLastRow) < 0) return -1; + if (tEncodeI8(&encoder, pReq->replica) < 0) return -1; + if (tEncodeI8(&encoder, pReq->selfIndex) < 0) return -1; + for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { + SReplica *pReplica = &pReq->replicas[i]; + if (tEncodeSReplica(&encoder, pReplica) < 0) return -1; + } + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->vgVersion) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->totalBlocks) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->daysToKeep0) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->daysToKeep1) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->daysToKeep2) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->walLevel) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->cacheLastRow) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->replica) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->selfIndex) < 0) return -1; + for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { + SReplica *pReplica = &pReq->replicas[i]; + if (tDecodeSReplica(&decoder, pReplica) < 0) return -1; + } + + tEndDecode(&decoder); + tCoderClear(&decoder); + return 0; +} + + int32_t tSerializeSKillQueryReq(void *buf, int32_t bufLen, SKillQueryReq *pReq) { SCoder encoder = {0}; tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 491c68b010..02be03ebba 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -89,10 +89,7 @@ void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode); // vmHandle.c void vmInitMsgHandle(SMgmtWrapper *pWrapper); int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq); -int32_t vmProcessAlterVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq); int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq); -int32_t vmProcessSyncVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq); -int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq); int32_t vmProcessGetMonVmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq); int32_t vmProcessGetVnodeLoadsReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq); void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 4b59afabd3..c2626d1aaa 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -203,48 +203,6 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return 0; } -int32_t vmProcessAlterVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - SRpcMsg *pReq = &pMsg->rpcMsg; - SAlterVnodeReq alterReq = {0}; - if (tDeserializeSCreateVnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; - } - - dDebug("vgId:%d, alter vnode req is received", alterReq.vgId); - - SVnodeCfg vnodeCfg = {0}; - vmGenerateVnodeCfg(&alterReq, &vnodeCfg); - - SVnodeObj *pVnode = vmAcquireVnode(pMgmt, alterReq.vgId); - if (pVnode == NULL) { - dDebug("vgId:%d, failed to alter vnode since %s", alterReq.vgId, terrstr()); - return -1; - } - - if (alterReq.vgVersion == pVnode->vgVersion) { - vmReleaseVnode(pMgmt, pVnode); - dDebug("vgId:%d, no need to alter vnode cfg for version unchanged ", alterReq.vgId); - return 0; - } - - if (vnodeAlter(pVnode->pImpl, &vnodeCfg) != 0) { - dError("vgId:%d, failed to alter vnode since %s", alterReq.vgId, terrstr()); - vmReleaseVnode(pMgmt, pVnode); - return -1; - } - - int32_t oldVersion = pVnode->vgVersion; - pVnode->vgVersion = alterReq.vgVersion; - int32_t code = vmWriteVnodesToFile(pMgmt); - if (code != 0) { - pVnode->vgVersion = oldVersion; - } - - vmReleaseVnode(pMgmt, pVnode); - return code; -} - int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { SRpcMsg *pReq = &pMsg->rpcMsg; SDropVnodeReq dropReq = {0}; @@ -276,100 +234,52 @@ int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return 0; } -int32_t vmProcessSyncVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - SRpcMsg *pReq = &pMsg->rpcMsg; - SSyncVnodeReq syncReq = {0}; - tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &syncReq); - - int32_t vgId = syncReq.vgId; - dDebug("vgId:%d, sync vnode req is received", vgId); - - SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId); - if (pVnode == NULL) { - dDebug("vgId:%d, failed to sync since %s", vgId, terrstr()); - return -1; - } - - if (vnodeSync(pVnode->pImpl) != 0) { - dError("vgId:%d, failed to sync vnode since %s", vgId, terrstr()); - vmReleaseVnode(pMgmt, pVnode); - return -1; - } - - vmReleaseVnode(pMgmt, pVnode); - return 0; -} - -int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - SRpcMsg *pReq = &pMsg->rpcMsg; - SCompactVnodeReq compatcReq = {0}; - tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &compatcReq); - - int32_t vgId = compatcReq.vgId; - dDebug("vgId:%d, compact vnode req is received", vgId); - - SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId); - if (pVnode == NULL) { - dDebug("vgId:%d, failed to compact since %s", vgId, terrstr()); - return -1; - } - - if (vnodeCompact(pVnode->pImpl) != 0) { - dError("vgId:%d, failed to compact vnode since %s", vgId, terrstr()); - vmReleaseVnode(pMgmt, pVnode); - return -1; - } - - vmReleaseVnode(pMgmt, pVnode); - return 0; -} - void vmInitMsgHandle(SMgmtWrapper *pWrapper) { dmSetMsgHandle(pWrapper, TDMT_MON_VM_INFO, vmProcessMonitorMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MON_VM_LOAD, vmProcessMonitorMsg, DEFAULT_HANDLE); // Requests handled by VNODE - dmSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)vmProcessQueryMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)vmProcessQueryMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, (NodeMsgFp)vmProcessQueryMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, (NodeMsgFp)vmProcessQueryMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_RES_READY, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CANCEL_CONN, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_TASK_PIPE_EXEC, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_TASK_MERGE_EXEC, (NodeMsgFp)vmProcessMergeMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_TASK_WRITE_EXEC, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, vmProcessWriteMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_QUERY, vmProcessQueryMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, vmProcessQueryMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_FETCH, vmProcessFetchMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, vmProcessFetchMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, vmProcessWriteMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, vmProcessWriteMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, vmProcessFetchMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, vmProcessFetchMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, vmProcessQueryMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, vmProcessQueryMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, vmProcessWriteMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, vmProcessWriteMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessWriteMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_RES_READY, vmProcessFetchMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, vmProcessFetchMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, vmProcessFetchMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, vmProcessFetchMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, vmProcessWriteMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, vmProcessWriteMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, vmProcessWriteMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, vmProcessWriteMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, vmProcessWriteMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, vmProcessWriteMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, vmProcessWriteMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, vmProcessWriteMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, vmProcessWriteMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, vmProcessWriteMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CANCEL_CONN, vmProcessWriteMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessFetchMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_CONSUME, vmProcessFetchMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, vmProcessWriteMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, vmProcessFetchMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_TASK_PIPE_EXEC, vmProcessFetchMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_TASK_MERGE_EXEC, vmProcessMergeMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_TASK_WRITE_EXEC, vmProcessWriteMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, vmProcessFetchMsg, DEFAULT_HANDLE); + + dmSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, vmProcessWriteMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, vmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE); } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 777f9eb36e..dce5f1b422 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -42,18 +42,9 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { case TDMT_DND_CREATE_VNODE: code = vmProcessCreateVnodeReq(pMgmt, pMsg); break; - case TDMT_DND_ALTER_VNODE: - code = vmProcessAlterVnodeReq(pMgmt, pMsg); - break; case TDMT_DND_DROP_VNODE: code = vmProcessDropVnodeReq(pMgmt, pMsg); break; - case TDMT_DND_SYNC_VNODE: - code = vmProcessSyncVnodeReq(pMgmt, pMsg); - break; - case TDMT_DND_COMPACT_VNODE: - code = vmProcessCompactVnodeReq(pMgmt, pMsg); - break; default: terrno = TSDB_CODE_MSG_NOT_PROCESSED; dError("msg:%p, not processed in vnode-mgmt queue", pMsg); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 11b44ea316..3372532995 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -614,7 +614,7 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) { return terrno; } -static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) { +static int32_t mndSetAlterDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) { SSdbRaw *pRedoRaw = mndDbActionEncode(pOld); if (pRedoRaw == NULL) return -1; if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; @@ -623,7 +623,7 @@ static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pO return 0; } -static int32_t mndSetUpdateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) { +static int32_t mndSetAlterDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) { SSdbRaw *pCommitRaw = mndDbActionEncode(pNew); if (pCommitRaw == NULL) return -1; if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; @@ -632,7 +632,60 @@ static int32_t mndSetUpdateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj * return 0; } -static int32_t mndBuildUpdateVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) { +void *mndBuildAlterVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) { + SAlterVnodeReq alterReq = {0}; + alterReq.vgVersion = pVgroup->version; + alterReq.totalBlocks = pDb->cfg.totalBlocks; + alterReq.daysToKeep0 = pDb->cfg.daysToKeep0; + alterReq.daysToKeep1 = pDb->cfg.daysToKeep1; + alterReq.daysToKeep2 = pDb->cfg.daysToKeep2; + alterReq.walLevel = pDb->cfg.walLevel; + alterReq.strict = pDb->cfg.strict; + alterReq.cacheLastRow = pDb->cfg.cacheLastRow; + alterReq.replica = pVgroup->replica; + alterReq.selfIndex = -1; + + for (int32_t v = 0; v < pVgroup->replica; ++v) { + SReplica *pReplica = &alterReq.replicas[v]; + SVnodeGid *pVgid = &pVgroup->vnodeGid[v]; + SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); + if (pVgidDnode == NULL) { + return NULL; + } + + pReplica->id = pVgidDnode->id; + pReplica->port = pVgidDnode->port; + memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN); + mndReleaseDnode(pMnode, pVgidDnode); + + if (pDnode->id == pVgid->dnodeId) { + alterReq.selfIndex = v; + } + } + + if (alterReq.selfIndex == -1) { + terrno = TSDB_CODE_MND_APP_ERROR; + return NULL; + } + + int32_t contLen = tSerializeSAlterVnodeReq(NULL, 0, &alterReq); + if (contLen < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + void *pReq = taosMemoryMalloc(contLen); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + tSerializeSAlterVnodeReq(pReq, contLen, &alterReq); + *pContLen = contLen; + return pReq; +} + +static int32_t mndBuilAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) { for (int32_t vn = 0; vn < pVgroup->replica; ++vn) { STransAction action = {0}; SVnodeGid *pVgid = pVgroup->vnodeGid + vn; @@ -643,7 +696,7 @@ static int32_t mndBuildUpdateVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj mndReleaseDnode(pMnode, pDnode); int32_t contLen = 0; - void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen); + void *pReq = mndBuildAlterVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen); if (pReq == NULL) return -1; action.pCont = pReq; @@ -658,7 +711,7 @@ static int32_t mndBuildUpdateVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj return 0; } -static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) { +static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) { SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; @@ -668,7 +721,7 @@ static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj if (pIter == NULL) break; if (pVgroup->dbUid == pNew->uid) { - if (mndBuildUpdateVgroupAction(pMnode, pTrans, pNew, pVgroup) != 0) { + if (mndBuilAlterVgroupAction(pMnode, pTrans, pNew, pVgroup) != 0) { sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pVgroup); return -1; @@ -681,17 +734,17 @@ static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj return 0; } -static int32_t mndUpdateDb(SMnode *pMnode, SNodeMsg *pReq, SDbObj *pOld, SDbObj *pNew) { +static int32_t mndAlterDb(SMnode *pMnode, SNodeMsg *pReq, SDbObj *pOld, SDbObj *pNew) { int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_ALTER_DB, &pReq->rpcMsg); if (pTrans == NULL) goto UPDATE_DB_OVER; - mDebug("trans:%d, used to update db:%s", pTrans->id, pOld->name); + mDebug("trans:%d, used to alter db:%s", pTrans->id, pOld->name); mndTransSetDbInfo(pTrans, pOld); - if (mndSetUpdateDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER; - if (mndSetUpdateDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER; - if (mndSetUpdateDbRedoActions(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER; + if (mndSetAlterDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER; + if (mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER; + if (mndSetAlterDbRedoActions(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto UPDATE_DB_OVER; code = 0; @@ -740,7 +793,7 @@ static int32_t mndProcessAlterDbReq(SNodeMsg *pReq) { dbObj.cfgVersion++; dbObj.updateTime = taosGetTimestampMs(); - code = mndUpdateDb(pMnode, pReq, pDb, &dbObj); + code = mndAlterDb(pMnode, pReq, pDb, &dbObj); if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; ALTER_DB_OVER: