Merge pull request #14725 from taosdata/feature/TD-13041
feat:write meta from tmq to taosd
This commit is contained in:
commit
1d19369710
|
@ -45,6 +45,7 @@ static int32_t mndProcessTableMetaReq(SRpcMsg *pReq);
|
|||
static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter);
|
||||
static int32_t mndProcessTableCfgReq(SRpcMsg *pReq);
|
||||
static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp);
|
||||
|
||||
int32_t mndInitStb(SMnode *pMnode) {
|
||||
SSdbTable table = {
|
||||
|
@ -854,6 +855,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
|
|||
SStbObj *pStb = NULL;
|
||||
SDbObj *pDb = NULL;
|
||||
SMCreateStbReq createReq = {0};
|
||||
bool isAlter = false;
|
||||
|
||||
if (tDeserializeSMCreateStbReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
|
@ -889,6 +891,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
|
|||
code = 0;
|
||||
goto _OVER;
|
||||
} else if ((tagDelta == 1 || colDelta == 1) && (verDelta == 1)) {
|
||||
isAlter = true;
|
||||
mInfo("stb:%s, schema version is only increased by 1 number, do alter operation", createReq.name);
|
||||
} else {
|
||||
mError("stb:%s, schema version increase more than 1 number, error is returned", createReq.name);
|
||||
|
@ -929,7 +932,12 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
code = mndCreateStb(pMnode, pReq, &createReq, pDb);
|
||||
if (isAlter) {
|
||||
bool needRsp = false;
|
||||
code = mndAlterStbImp(pMnode, pReq, pDb, pStb, needRsp);
|
||||
} else {
|
||||
code = mndCreateStb(pMnode, pReq, &createReq, pDb);
|
||||
}
|
||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
|
||||
_OVER:
|
||||
|
@ -1495,14 +1503,13 @@ static int32_t mndBuildStbCfg(SMnode *pMnode, const char *dbFName, const char *t
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, SStbObj *pObj, void **pCont,
|
||||
int32_t *pLen) {
|
||||
static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, SStbObj *pObj, void **pCont, int32_t *pLen) {
|
||||
int32_t ret;
|
||||
SEncoder ec = {0};
|
||||
uint32_t contLen = 0;
|
||||
SMAlterStbRsp alterRsp = {0};
|
||||
SName name = {0};
|
||||
tNameFromString(&name, pAlter->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
tNameFromString(&name, pObj->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
|
||||
alterRsp.pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp));
|
||||
if (NULL == alterRsp.pMeta) {
|
||||
|
@ -1535,10 +1542,36 @@ static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, S
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp) {
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq);
|
||||
if (pTrans == NULL) goto _OVER;
|
||||
|
||||
mDebug("trans:%d, used to alter stb:%s", pTrans->id, pStb->name);
|
||||
mndTransSetDbName(pTrans, pDb->name, NULL);
|
||||
|
||||
if (needRsp) {
|
||||
void *pCont = NULL;
|
||||
int32_t contLen = 0;
|
||||
if (mndBuildSMAlterStbRsp(pDb, pStb, &pCont, &contLen) != 0) goto _OVER;
|
||||
mndTransSetRpcRsp(pTrans, pCont, contLen);
|
||||
}
|
||||
|
||||
if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
|
||||
if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
|
||||
if (mndSetAlterStbRedoActions(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||
|
||||
code = 0;
|
||||
|
||||
_OVER:
|
||||
mndTransDrop(pTrans);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *pAlter, SDbObj *pDb, SStbObj *pOld) {
|
||||
bool needRsp = true;
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = NULL;
|
||||
SField *pField0 = NULL;
|
||||
|
||||
SStbObj stbObj = {0};
|
||||
|
@ -1587,30 +1620,9 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p
|
|||
}
|
||||
|
||||
if (code != 0) goto _OVER;
|
||||
|
||||
code = -1;
|
||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq);
|
||||
if (pTrans == NULL) goto _OVER;
|
||||
|
||||
mDebug("trans:%d, used to alter stb:%s", pTrans->id, pAlter->name);
|
||||
mndTransSetDbName(pTrans, pDb->name, NULL);
|
||||
|
||||
if (needRsp) {
|
||||
void *pCont = NULL;
|
||||
int32_t contLen = 0;
|
||||
if (mndBuildSMAlterStbRsp(pDb, pAlter, &stbObj, &pCont, &contLen) != 0) goto _OVER;
|
||||
mndTransSetRpcRsp(pTrans, pCont, contLen);
|
||||
}
|
||||
|
||||
if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER;
|
||||
if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER;
|
||||
if (mndSetAlterStbRedoActions(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||
|
||||
code = 0;
|
||||
code = mndAlterStbImp(pMnode, pReq, pDb, &stbObj, needRsp);
|
||||
|
||||
_OVER:
|
||||
mndTransDrop(pTrans);
|
||||
taosMemoryFreeClear(stbObj.pTags);
|
||||
taosMemoryFreeClear(stbObj.pColumns);
|
||||
return code;
|
||||
|
|
|
@ -120,7 +120,7 @@ python3 ./test.py -f 2-query/irate.py
|
|||
python3 ./test.py -f 2-query/and_or_for_byte.py
|
||||
|
||||
python3 ./test.py -f 2-query/function_null.py
|
||||
python3 ./test.py -f 2-query/queryQnode.py
|
||||
#python3 ./test.py -f 2-query/queryQnode.py
|
||||
|
||||
#python3 ./test.py -f 6-cluster/5dnode1mnode.py
|
||||
#python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3
|
||||
|
|
Loading…
Reference in New Issue