feat:write meta from tmq to taosd
This commit is contained in:
parent
b4cadcbef2
commit
77cb2c0b84
|
@ -45,6 +45,7 @@ static int32_t mndProcessTableMetaReq(SRpcMsg *pReq);
|
||||||
static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||||
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter);
|
||||||
static int32_t mndProcessTableCfgReq(SRpcMsg *pReq);
|
static int32_t mndProcessTableCfgReq(SRpcMsg *pReq);
|
||||||
|
static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp);
|
||||||
|
|
||||||
int32_t mndInitStb(SMnode *pMnode) {
|
int32_t mndInitStb(SMnode *pMnode) {
|
||||||
SSdbTable table = {
|
SSdbTable table = {
|
||||||
|
@ -854,6 +855,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
|
||||||
SStbObj *pStb = NULL;
|
SStbObj *pStb = NULL;
|
||||||
SDbObj *pDb = NULL;
|
SDbObj *pDb = NULL;
|
||||||
SMCreateStbReq createReq = {0};
|
SMCreateStbReq createReq = {0};
|
||||||
|
bool isAlter = false;
|
||||||
|
|
||||||
if (tDeserializeSMCreateStbReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
|
if (tDeserializeSMCreateStbReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
@ -889,6 +891,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
|
||||||
code = 0;
|
code = 0;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
} else if ((tagDelta == 1 || colDelta == 1) && (verDelta == 1)) {
|
} else if ((tagDelta == 1 || colDelta == 1) && (verDelta == 1)) {
|
||||||
|
isAlter = true;
|
||||||
mInfo("stb:%s, schema version is only increased by 1 number, do alter operation", createReq.name);
|
mInfo("stb:%s, schema version is only increased by 1 number, do alter operation", createReq.name);
|
||||||
} else {
|
} else {
|
||||||
mError("stb:%s, schema version increase more than 1 number, error is returned", createReq.name);
|
mError("stb:%s, schema version increase more than 1 number, error is returned", createReq.name);
|
||||||
|
@ -929,7 +932,12 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (isAlter) {
|
||||||
|
bool needRsp = false;
|
||||||
|
code = mndAlterStbImp(pMnode, pReq, pDb, pStb, needRsp);
|
||||||
|
} else {
|
||||||
code = mndCreateStb(pMnode, pReq, &createReq, pDb);
|
code = mndCreateStb(pMnode, pReq, &createReq, pDb);
|
||||||
|
}
|
||||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
|
@ -1495,14 +1503,13 @@ static int32_t mndBuildStbCfg(SMnode *pMnode, const char *dbFName, const char *t
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, SStbObj *pObj, void **pCont,
|
static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, SStbObj *pObj, void **pCont, int32_t *pLen) {
|
||||||
int32_t *pLen) {
|
|
||||||
int32_t ret;
|
int32_t ret;
|
||||||
SEncoder ec = {0};
|
SEncoder ec = {0};
|
||||||
uint32_t contLen = 0;
|
uint32_t contLen = 0;
|
||||||
SMAlterStbRsp alterRsp = {0};
|
SMAlterStbRsp alterRsp = {0};
|
||||||
SName name = {0};
|
SName name = {0};
|
||||||
tNameFromString(&name, pAlter->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
tNameFromString(&name, pObj->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||||
|
|
||||||
alterRsp.pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp));
|
alterRsp.pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp));
|
||||||
if (NULL == alterRsp.pMeta) {
|
if (NULL == alterRsp.pMeta) {
|
||||||
|
@ -1535,10 +1542,36 @@ static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, S
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp) {
|
||||||
|
int32_t code = -1;
|
||||||
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq);
|
||||||
|
if (pTrans == NULL) goto _OVER;
|
||||||
|
|
||||||
|
mDebug("trans:%d, used to alter stb:%s", pTrans->id, pStb->name);
|
||||||
|
mndTransSetDbName(pTrans, pDb->name, NULL);
|
||||||
|
|
||||||
|
if (needRsp) {
|
||||||
|
void *pCont = NULL;
|
||||||
|
int32_t contLen = 0;
|
||||||
|
if (mndBuildSMAlterStbRsp(pDb, pStb, &pCont, &contLen) != 0) goto _OVER;
|
||||||
|
mndTransSetRpcRsp(pTrans, pCont, contLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
|
||||||
|
if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
|
||||||
|
if (mndSetAlterStbRedoActions(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
|
||||||
|
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||||
|
|
||||||
|
code = 0;
|
||||||
|
|
||||||
|
_OVER:
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *pAlter, SDbObj *pDb, SStbObj *pOld) {
|
static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *pAlter, SDbObj *pDb, SStbObj *pOld) {
|
||||||
bool needRsp = true;
|
bool needRsp = true;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
STrans *pTrans = NULL;
|
|
||||||
SField *pField0 = NULL;
|
SField *pField0 = NULL;
|
||||||
|
|
||||||
SStbObj stbObj = {0};
|
SStbObj stbObj = {0};
|
||||||
|
@ -1587,30 +1620,9 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code != 0) goto _OVER;
|
if (code != 0) goto _OVER;
|
||||||
|
code = mndAlterStbImp(pMnode, pReq, pDb, &stbObj, needRsp);
|
||||||
code = -1;
|
|
||||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq);
|
|
||||||
if (pTrans == NULL) goto _OVER;
|
|
||||||
|
|
||||||
mDebug("trans:%d, used to alter stb:%s", pTrans->id, pAlter->name);
|
|
||||||
mndTransSetDbName(pTrans, pDb->name, NULL);
|
|
||||||
|
|
||||||
if (needRsp) {
|
|
||||||
void *pCont = NULL;
|
|
||||||
int32_t contLen = 0;
|
|
||||||
if (mndBuildSMAlterStbRsp(pDb, pAlter, &stbObj, &pCont, &contLen) != 0) goto _OVER;
|
|
||||||
mndTransSetRpcRsp(pTrans, pCont, contLen);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER;
|
|
||||||
if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER;
|
|
||||||
if (mndSetAlterStbRedoActions(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER;
|
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
|
||||||
|
|
||||||
code = 0;
|
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
mndTransDrop(pTrans);
|
|
||||||
taosMemoryFreeClear(stbObj.pTags);
|
taosMemoryFreeClear(stbObj.pTags);
|
||||||
taosMemoryFreeClear(stbObj.pColumns);
|
taosMemoryFreeClear(stbObj.pColumns);
|
||||||
return code;
|
return code;
|
||||||
|
|
Loading…
Reference in New Issue