refactor: check db options
This commit is contained in:
parent
76883aa42f
commit
64b0c3a022
|
@ -823,7 +823,7 @@ typedef struct {
|
||||||
SReplica replicas[TSDB_MAX_REPLICA];
|
SReplica replicas[TSDB_MAX_REPLICA];
|
||||||
int32_t numOfRetensions;
|
int32_t numOfRetensions;
|
||||||
SArray* pRetensions; // SRetention
|
SArray* pRetensions; // SRetention
|
||||||
} SCreateVnodeReq, SAlterVnodeReq;
|
} SCreateVnodeReq;
|
||||||
|
|
||||||
int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
|
int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
|
||||||
int32_t tDeserializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
|
int32_t tDeserializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
|
||||||
|
@ -834,11 +834,36 @@ typedef struct {
|
||||||
int32_t dnodeId;
|
int32_t dnodeId;
|
||||||
int64_t dbUid;
|
int64_t dbUid;
|
||||||
char db[TSDB_DB_FNAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
} SDropVnodeReq, SSyncVnodeReq, SCompactVnodeReq;
|
} SDropVnodeReq;
|
||||||
|
|
||||||
int32_t tSerializeSDropVnodeReq(void* buf, int32_t bufLen, SDropVnodeReq* pReq);
|
int32_t tSerializeSDropVnodeReq(void* buf, int32_t bufLen, SDropVnodeReq* pReq);
|
||||||
int32_t tDeserializeSDropVnodeReq(void* buf, int32_t bufLen, SDropVnodeReq* pReq);
|
int32_t tDeserializeSDropVnodeReq(void* buf, int32_t bufLen, SDropVnodeReq* pReq);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int64_t dbUid;
|
||||||
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
|
} SCompactVnodeReq;
|
||||||
|
|
||||||
|
int32_t tSerializeSCompactVnodeReq(void* buf, int32_t bufLen, SCompactVnodeReq* pReq);
|
||||||
|
int32_t tDeserializeSCompactVnodeReq(void* buf, int32_t bufLen, SCompactVnodeReq* pReq);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t vgVersion;
|
||||||
|
int32_t totalBlocks;
|
||||||
|
int32_t daysToKeep0;
|
||||||
|
int32_t daysToKeep1;
|
||||||
|
int32_t daysToKeep2;
|
||||||
|
int8_t walLevel;
|
||||||
|
int8_t strict;
|
||||||
|
int8_t cacheLastRow;
|
||||||
|
int8_t replica;
|
||||||
|
int8_t selfIndex;
|
||||||
|
SReplica replicas[TSDB_MAX_REPLICA];
|
||||||
|
} SAlterVnodeReq;
|
||||||
|
|
||||||
|
int32_t tSerializeSAlterVnodeReq(void* buf, int32_t bufLen, SAlterVnodeReq* pReq);
|
||||||
|
int32_t tDeserializeSAlterVnodeReq(void* buf, int32_t bufLen, SAlterVnodeReq* pReq);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMsgHead header;
|
SMsgHead header;
|
||||||
char dbFName[TSDB_DB_FNAME_LEN];
|
char dbFName[TSDB_DB_FNAME_LEN];
|
||||||
|
|
|
@ -2954,6 +2954,87 @@ int32_t tDeserializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tSerializeSCompactVnodeReq(void *buf, int32_t bufLen, SCompactVnodeReq *pReq) {
|
||||||
|
SCoder encoder = {0};
|
||||||
|
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||||
|
|
||||||
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
|
if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1;
|
||||||
|
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
|
||||||
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
int32_t tlen = encoder.pos;
|
||||||
|
tCoderClear(&encoder);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDeserializeSCompactVnodeReq(void *buf, int32_t bufLen, SCompactVnodeReq *pReq) {
|
||||||
|
SCoder decoder = {0};
|
||||||
|
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
|
||||||
|
|
||||||
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
|
if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1;
|
||||||
|
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
|
||||||
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
|
tCoderClear(&decoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq) {
|
||||||
|
SCoder encoder = {0};
|
||||||
|
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||||
|
|
||||||
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pReq->vgVersion) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pReq->totalBlocks) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pReq->daysToKeep0) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pReq->daysToKeep1) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pReq->daysToKeep2) < 0) return -1;
|
||||||
|
if (tEncodeI8(&encoder, pReq->walLevel) < 0) return -1;
|
||||||
|
if (tEncodeI8(&encoder, pReq->strict) < 0) return -1;
|
||||||
|
if (tEncodeI8(&encoder, pReq->cacheLastRow) < 0) return -1;
|
||||||
|
if (tEncodeI8(&encoder, pReq->replica) < 0) return -1;
|
||||||
|
if (tEncodeI8(&encoder, pReq->selfIndex) < 0) return -1;
|
||||||
|
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||||
|
SReplica *pReplica = &pReq->replicas[i];
|
||||||
|
if (tEncodeSReplica(&encoder, pReplica) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
int32_t tlen = encoder.pos;
|
||||||
|
tCoderClear(&encoder);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDeserializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq) {
|
||||||
|
SCoder decoder = {0};
|
||||||
|
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
|
||||||
|
|
||||||
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pReq->vgVersion) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pReq->totalBlocks) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pReq->daysToKeep0) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pReq->daysToKeep1) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pReq->daysToKeep2) < 0) return -1;
|
||||||
|
if (tDecodeI8(&decoder, &pReq->walLevel) < 0) return -1;
|
||||||
|
if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1;
|
||||||
|
if (tDecodeI8(&decoder, &pReq->cacheLastRow) < 0) return -1;
|
||||||
|
if (tDecodeI8(&decoder, &pReq->replica) < 0) return -1;
|
||||||
|
if (tDecodeI8(&decoder, &pReq->selfIndex) < 0) return -1;
|
||||||
|
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||||
|
SReplica *pReplica = &pReq->replicas[i];
|
||||||
|
if (tDecodeSReplica(&decoder, pReplica) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tEndDecode(&decoder);
|
||||||
|
tCoderClear(&decoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t tSerializeSKillQueryReq(void *buf, int32_t bufLen, SKillQueryReq *pReq) {
|
int32_t tSerializeSKillQueryReq(void *buf, int32_t bufLen, SKillQueryReq *pReq) {
|
||||||
SCoder encoder = {0};
|
SCoder encoder = {0};
|
||||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||||
|
|
|
@ -89,10 +89,7 @@ void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
|
||||||
// vmHandle.c
|
// vmHandle.c
|
||||||
void vmInitMsgHandle(SMgmtWrapper *pWrapper);
|
void vmInitMsgHandle(SMgmtWrapper *pWrapper);
|
||||||
int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq);
|
int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq);
|
||||||
int32_t vmProcessAlterVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq);
|
|
||||||
int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq);
|
int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq);
|
||||||
int32_t vmProcessSyncVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq);
|
|
||||||
int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq);
|
|
||||||
int32_t vmProcessGetMonVmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq);
|
int32_t vmProcessGetMonVmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq);
|
||||||
int32_t vmProcessGetVnodeLoadsReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq);
|
int32_t vmProcessGetVnodeLoadsReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq);
|
||||||
void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo);
|
void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo);
|
||||||
|
|
|
@ -203,48 +203,6 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vmProcessAlterVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
|
||||||
SRpcMsg *pReq = &pMsg->rpcMsg;
|
|
||||||
SAlterVnodeReq alterReq = {0};
|
|
||||||
if (tDeserializeSCreateVnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) {
|
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
dDebug("vgId:%d, alter vnode req is received", alterReq.vgId);
|
|
||||||
|
|
||||||
SVnodeCfg vnodeCfg = {0};
|
|
||||||
vmGenerateVnodeCfg(&alterReq, &vnodeCfg);
|
|
||||||
|
|
||||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, alterReq.vgId);
|
|
||||||
if (pVnode == NULL) {
|
|
||||||
dDebug("vgId:%d, failed to alter vnode since %s", alterReq.vgId, terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (alterReq.vgVersion == pVnode->vgVersion) {
|
|
||||||
vmReleaseVnode(pMgmt, pVnode);
|
|
||||||
dDebug("vgId:%d, no need to alter vnode cfg for version unchanged ", alterReq.vgId);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (vnodeAlter(pVnode->pImpl, &vnodeCfg) != 0) {
|
|
||||||
dError("vgId:%d, failed to alter vnode since %s", alterReq.vgId, terrstr());
|
|
||||||
vmReleaseVnode(pMgmt, pVnode);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t oldVersion = pVnode->vgVersion;
|
|
||||||
pVnode->vgVersion = alterReq.vgVersion;
|
|
||||||
int32_t code = vmWriteVnodesToFile(pMgmt);
|
|
||||||
if (code != 0) {
|
|
||||||
pVnode->vgVersion = oldVersion;
|
|
||||||
}
|
|
||||||
|
|
||||||
vmReleaseVnode(pMgmt, pVnode);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
SRpcMsg *pReq = &pMsg->rpcMsg;
|
SRpcMsg *pReq = &pMsg->rpcMsg;
|
||||||
SDropVnodeReq dropReq = {0};
|
SDropVnodeReq dropReq = {0};
|
||||||
|
@ -276,100 +234,52 @@ int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vmProcessSyncVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
|
||||||
SRpcMsg *pReq = &pMsg->rpcMsg;
|
|
||||||
SSyncVnodeReq syncReq = {0};
|
|
||||||
tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &syncReq);
|
|
||||||
|
|
||||||
int32_t vgId = syncReq.vgId;
|
|
||||||
dDebug("vgId:%d, sync vnode req is received", vgId);
|
|
||||||
|
|
||||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
|
|
||||||
if (pVnode == NULL) {
|
|
||||||
dDebug("vgId:%d, failed to sync since %s", vgId, terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (vnodeSync(pVnode->pImpl) != 0) {
|
|
||||||
dError("vgId:%d, failed to sync vnode since %s", vgId, terrstr());
|
|
||||||
vmReleaseVnode(pMgmt, pVnode);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
vmReleaseVnode(pMgmt, pVnode);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
|
||||||
SRpcMsg *pReq = &pMsg->rpcMsg;
|
|
||||||
SCompactVnodeReq compatcReq = {0};
|
|
||||||
tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &compatcReq);
|
|
||||||
|
|
||||||
int32_t vgId = compatcReq.vgId;
|
|
||||||
dDebug("vgId:%d, compact vnode req is received", vgId);
|
|
||||||
|
|
||||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
|
|
||||||
if (pVnode == NULL) {
|
|
||||||
dDebug("vgId:%d, failed to compact since %s", vgId, terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (vnodeCompact(pVnode->pImpl) != 0) {
|
|
||||||
dError("vgId:%d, failed to compact vnode since %s", vgId, terrstr());
|
|
||||||
vmReleaseVnode(pMgmt, pVnode);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
vmReleaseVnode(pMgmt, pVnode);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void vmInitMsgHandle(SMgmtWrapper *pWrapper) {
|
void vmInitMsgHandle(SMgmtWrapper *pWrapper) {
|
||||||
dmSetMsgHandle(pWrapper, TDMT_MON_VM_INFO, vmProcessMonitorMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_MON_VM_INFO, vmProcessMonitorMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_MON_VM_LOAD, vmProcessMonitorMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_MON_VM_LOAD, vmProcessMonitorMsg, DEFAULT_HANDLE);
|
||||||
|
|
||||||
// Requests handled by VNODE
|
// Requests handled by VNODE
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)vmProcessQueryMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY, vmProcessQueryMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)vmProcessQueryMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, vmProcessQueryMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_FETCH, vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, (NodeMsgFp)vmProcessQueryMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, vmProcessQueryMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, (NodeMsgFp)vmProcessQueryMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, vmProcessQueryMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_RES_READY, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_RES_READY, vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CANCEL_CONN, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CANCEL_CONN, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_CONSUME, vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_PIPE_EXEC, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_PIPE_EXEC, vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_MERGE_EXEC, (NodeMsgFp)vmProcessMergeMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_MERGE_EXEC, vmProcessMergeMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_WRITE_EXEC, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_WRITE_EXEC, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||||
|
|
||||||
|
dmSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
|
dmSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
|
|
||||||
dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE);
|
|
||||||
dmSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE);
|
|
||||||
dmSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE);
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,18 +42,9 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
|
||||||
case TDMT_DND_CREATE_VNODE:
|
case TDMT_DND_CREATE_VNODE:
|
||||||
code = vmProcessCreateVnodeReq(pMgmt, pMsg);
|
code = vmProcessCreateVnodeReq(pMgmt, pMsg);
|
||||||
break;
|
break;
|
||||||
case TDMT_DND_ALTER_VNODE:
|
|
||||||
code = vmProcessAlterVnodeReq(pMgmt, pMsg);
|
|
||||||
break;
|
|
||||||
case TDMT_DND_DROP_VNODE:
|
case TDMT_DND_DROP_VNODE:
|
||||||
code = vmProcessDropVnodeReq(pMgmt, pMsg);
|
code = vmProcessDropVnodeReq(pMgmt, pMsg);
|
||||||
break;
|
break;
|
||||||
case TDMT_DND_SYNC_VNODE:
|
|
||||||
code = vmProcessSyncVnodeReq(pMgmt, pMsg);
|
|
||||||
break;
|
|
||||||
case TDMT_DND_COMPACT_VNODE:
|
|
||||||
code = vmProcessCompactVnodeReq(pMgmt, pMsg);
|
|
||||||
break;
|
|
||||||
default:
|
default:
|
||||||
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||||
dError("msg:%p, not processed in vnode-mgmt queue", pMsg);
|
dError("msg:%p, not processed in vnode-mgmt queue", pMsg);
|
||||||
|
|
|
@ -614,7 +614,7 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
|
static int32_t mndSetAlterDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
|
||||||
SSdbRaw *pRedoRaw = mndDbActionEncode(pOld);
|
SSdbRaw *pRedoRaw = mndDbActionEncode(pOld);
|
||||||
if (pRedoRaw == NULL) return -1;
|
if (pRedoRaw == NULL) return -1;
|
||||||
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
|
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
|
||||||
|
@ -623,7 +623,7 @@ static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pO
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetUpdateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
|
static int32_t mndSetAlterDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
|
||||||
SSdbRaw *pCommitRaw = mndDbActionEncode(pNew);
|
SSdbRaw *pCommitRaw = mndDbActionEncode(pNew);
|
||||||
if (pCommitRaw == NULL) return -1;
|
if (pCommitRaw == NULL) return -1;
|
||||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
||||||
|
@ -632,7 +632,60 @@ static int32_t mndSetUpdateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndBuildUpdateVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
|
void *mndBuildAlterVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
|
||||||
|
SAlterVnodeReq alterReq = {0};
|
||||||
|
alterReq.vgVersion = pVgroup->version;
|
||||||
|
alterReq.totalBlocks = pDb->cfg.totalBlocks;
|
||||||
|
alterReq.daysToKeep0 = pDb->cfg.daysToKeep0;
|
||||||
|
alterReq.daysToKeep1 = pDb->cfg.daysToKeep1;
|
||||||
|
alterReq.daysToKeep2 = pDb->cfg.daysToKeep2;
|
||||||
|
alterReq.walLevel = pDb->cfg.walLevel;
|
||||||
|
alterReq.strict = pDb->cfg.strict;
|
||||||
|
alterReq.cacheLastRow = pDb->cfg.cacheLastRow;
|
||||||
|
alterReq.replica = pVgroup->replica;
|
||||||
|
alterReq.selfIndex = -1;
|
||||||
|
|
||||||
|
for (int32_t v = 0; v < pVgroup->replica; ++v) {
|
||||||
|
SReplica *pReplica = &alterReq.replicas[v];
|
||||||
|
SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
|
||||||
|
SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
|
||||||
|
if (pVgidDnode == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pReplica->id = pVgidDnode->id;
|
||||||
|
pReplica->port = pVgidDnode->port;
|
||||||
|
memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
|
||||||
|
mndReleaseDnode(pMnode, pVgidDnode);
|
||||||
|
|
||||||
|
if (pDnode->id == pVgid->dnodeId) {
|
||||||
|
alterReq.selfIndex = v;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (alterReq.selfIndex == -1) {
|
||||||
|
terrno = TSDB_CODE_MND_APP_ERROR;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t contLen = tSerializeSAlterVnodeReq(NULL, 0, &alterReq);
|
||||||
|
if (contLen < 0) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *pReq = taosMemoryMalloc(contLen);
|
||||||
|
if (pReq == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
tSerializeSAlterVnodeReq(pReq, contLen, &alterReq);
|
||||||
|
*pContLen = contLen;
|
||||||
|
return pReq;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndBuilAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
|
||||||
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
|
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
|
||||||
STransAction action = {0};
|
STransAction action = {0};
|
||||||
SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
|
SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
|
||||||
|
@ -643,7 +696,7 @@ static int32_t mndBuildUpdateVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj
|
||||||
mndReleaseDnode(pMnode, pDnode);
|
mndReleaseDnode(pMnode, pDnode);
|
||||||
|
|
||||||
int32_t contLen = 0;
|
int32_t contLen = 0;
|
||||||
void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
|
void *pReq = mndBuildAlterVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
|
||||||
if (pReq == NULL) return -1;
|
if (pReq == NULL) return -1;
|
||||||
|
|
||||||
action.pCont = pReq;
|
action.pCont = pReq;
|
||||||
|
@ -658,7 +711,7 @@ static int32_t mndBuildUpdateVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
|
static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
|
|
||||||
|
@ -668,7 +721,7 @@ static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
if (pVgroup->dbUid == pNew->uid) {
|
if (pVgroup->dbUid == pNew->uid) {
|
||||||
if (mndBuildUpdateVgroupAction(pMnode, pTrans, pNew, pVgroup) != 0) {
|
if (mndBuilAlterVgroupAction(pMnode, pTrans, pNew, pVgroup) != 0) {
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -681,17 +734,17 @@ static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndUpdateDb(SMnode *pMnode, SNodeMsg *pReq, SDbObj *pOld, SDbObj *pNew) {
|
static int32_t mndAlterDb(SMnode *pMnode, SNodeMsg *pReq, SDbObj *pOld, SDbObj *pNew) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_ALTER_DB, &pReq->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_ALTER_DB, &pReq->rpcMsg);
|
||||||
if (pTrans == NULL) goto UPDATE_DB_OVER;
|
if (pTrans == NULL) goto UPDATE_DB_OVER;
|
||||||
|
|
||||||
mDebug("trans:%d, used to update db:%s", pTrans->id, pOld->name);
|
mDebug("trans:%d, used to alter db:%s", pTrans->id, pOld->name);
|
||||||
|
|
||||||
mndTransSetDbInfo(pTrans, pOld);
|
mndTransSetDbInfo(pTrans, pOld);
|
||||||
if (mndSetUpdateDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER;
|
if (mndSetAlterDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER;
|
||||||
if (mndSetUpdateDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER;
|
if (mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER;
|
||||||
if (mndSetUpdateDbRedoActions(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER;
|
if (mndSetAlterDbRedoActions(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER;
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto UPDATE_DB_OVER;
|
if (mndTransPrepare(pMnode, pTrans) != 0) goto UPDATE_DB_OVER;
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
@ -740,7 +793,7 @@ static int32_t mndProcessAlterDbReq(SNodeMsg *pReq) {
|
||||||
|
|
||||||
dbObj.cfgVersion++;
|
dbObj.cfgVersion++;
|
||||||
dbObj.updateTime = taosGetTimestampMs();
|
dbObj.updateTime = taosGetTimestampMs();
|
||||||
code = mndUpdateDb(pMnode, pReq, pDb, &dbObj);
|
code = mndAlterDb(pMnode, pReq, pDb, &dbObj);
|
||||||
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||||
|
|
||||||
ALTER_DB_OVER:
|
ALTER_DB_OVER:
|
||||||
|
|
Loading…
Reference in New Issue