refactor: stable mgmt
This commit is contained in:
parent
35017bdfcc
commit
9ca02dd5fe
|
@ -396,6 +396,8 @@ static int32_t mndSetUpdateSmaStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStb
|
|||
stbObj.pColumns = NULL;
|
||||
stbObj.numOfTags = 0;
|
||||
stbObj.pTags = NULL;
|
||||
stbObj.numOfFuncs = 0;
|
||||
stbObj.pFuncs = NULL;
|
||||
stbObj.updateTime = taosGetTimestampMs();
|
||||
stbObj.lock = 0;
|
||||
stbObj.smaVer++;
|
||||
|
@ -408,47 +410,6 @@ static int32_t mndSetUpdateSmaStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStb
|
|||
return 0;
|
||||
}
|
||||
|
||||
#if 0
|
||||
static int32_t mndSetCreateSmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SSmaObj *pSma) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SVgObj *pVgroup = NULL;
|
||||
void *pIter = NULL;
|
||||
int32_t contLen;
|
||||
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||
if (pIter == NULL) break;
|
||||
if (pVgroup->dbUid != pDb->uid) {
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
continue;
|
||||
}
|
||||
|
||||
void *pReq = mndBuildVCreateSmaReq(pMnode, pVgroup, pSma, &contLen);
|
||||
if (pReq == NULL) {
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
STransAction action = {0};
|
||||
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
action.pCont = pReq;
|
||||
action.contLen = contLen;
|
||||
action.msgType = TDMT_VND_CREATE_SMA;
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
return -1;
|
||||
}
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
|
||||
SSmaObj *pSma) {
|
||||
SVnodeGid *pVgid = pVgroup->vnodeGid + 0;
|
||||
|
@ -621,7 +582,6 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
|||
if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
|
||||
if (mndSetCreateSmaVgroupCommitLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
|
||||
if (mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
|
||||
// if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER;
|
||||
if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj) != 0) goto _OVER;
|
||||
if (mndScheduleStream(pMnode, &streamObj) != 0) goto _OVER;
|
||||
if (mndPersistStream(pMnode, pTrans, &streamObj) != 0) goto _OVER;
|
||||
|
@ -770,49 +730,6 @@ static int32_t mndSetDropSmaVgroupCommitLogs(SMnode *pMnode, STrans *pTrans, SVg
|
|||
return 0;
|
||||
}
|
||||
|
||||
#if 0
|
||||
static int32_t mndSetDropSmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SSmaObj *pSma) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SVgObj *pVgroup = NULL;
|
||||
void *pIter = NULL;
|
||||
int32_t contLen;
|
||||
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||
if (pIter == NULL) break;
|
||||
if (pVgroup->dbUid != pDb->uid) {
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t contLen = 0;
|
||||
void *pReq = mndBuildVDropSmaReq(pMnode, pVgroup, pSma, &contLen);
|
||||
if (pReq == NULL) {
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
STransAction action = {0};
|
||||
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
action.pCont = pReq;
|
||||
action.contLen = contLen;
|
||||
action.msgType = TDMT_VND_DROP_SMA;
|
||||
action.acceptableCode = TSDB_CODE_VND_SMA_NOT_EXIST;
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
return -1;
|
||||
}
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
static int32_t mndSetDropSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
|
||||
SVnodeGid *pVgid = pVgroup->vnodeGid + 0;
|
||||
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
|
||||
|
@ -879,7 +796,6 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
|
|||
if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER;
|
||||
if (mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER;
|
||||
if (mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
|
||||
// if (mndSetDropSmaRedoActions(pMnode, pTrans, pDb, pSma) != 0) goto _OVER;
|
||||
if (mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup) != 0) goto _OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||
|
||||
|
@ -909,7 +825,6 @@ int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *p
|
|||
if (mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER;
|
||||
if (mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup) != 0) goto _OVER;
|
||||
if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER;
|
||||
// if (mndSetDropSmaRedoActions(pMnode, pTrans, pDb, pSma) != 0) goto _OVER;
|
||||
mndReleaseVgroup(pMnode, pVgroup);
|
||||
pVgroup = NULL;
|
||||
}
|
||||
|
|
|
@ -199,7 +199,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
|
|||
|
||||
pStb->pColumns = taosMemoryCalloc(pStb->numOfColumns, sizeof(SSchema));
|
||||
pStb->pTags = taosMemoryCalloc(pStb->numOfTags, sizeof(SSchema));
|
||||
pStb->pFuncs = taosMemoryCalloc(pStb->numOfFuncs, TSDB_FUNC_NAME_LEN);
|
||||
pStb->pFuncs = taosArrayInit(pStb->numOfFuncs, TSDB_FUNC_NAME_LEN);
|
||||
if (pStb->pColumns == NULL || pStb->pTags == NULL || pStb->pFuncs == NULL) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
@ -320,6 +320,7 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
|
|||
taosWUnLockLatch(&pOld->lock);
|
||||
}
|
||||
}
|
||||
pOld->commentLen = pNew->commentLen;
|
||||
|
||||
if (pOld->ast1Len < pNew->ast1Len) {
|
||||
void *pAst1 = taosMemoryMalloc(pNew->ast1Len + 1);
|
||||
|
@ -625,6 +626,11 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
continue;
|
||||
}
|
||||
|
||||
if (pVgroup->isTsma) {
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
continue;
|
||||
}
|
||||
|
||||
void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen);
|
||||
if (pReq == NULL) {
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
|
@ -663,6 +669,11 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
continue;
|
||||
}
|
||||
|
||||
if (pVgroup->isTsma) {
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t contLen = 0;
|
||||
void *pReq = mndBuildVDropStbReq(pMnode, pVgroup, pStb, &contLen);
|
||||
if (pReq == NULL) {
|
||||
|
@ -1291,6 +1302,11 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
continue;
|
||||
}
|
||||
|
||||
if (pVgroup->isTsma) {
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
continue;
|
||||
}
|
||||
|
||||
void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen);
|
||||
if (pReq == NULL) {
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
|
@ -1405,7 +1421,7 @@ static int32_t mndBuildStbCfgImp(SDbObj *pDb, SStbObj *pStb, const char *tbName,
|
|||
pSchema->bytes = pSrcSchema->bytes;
|
||||
}
|
||||
|
||||
if (pStb->pFuncs) {
|
||||
if (pStb->numOfFuncs > 0) {
|
||||
pRsp->pFuncs = taosArrayDup(pStb->pFuncs);
|
||||
}
|
||||
|
||||
|
@ -1677,6 +1693,11 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
|
|||
continue;
|
||||
}
|
||||
|
||||
if (pVgroup->isTsma) {
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t contLen = 0;
|
||||
void *pReq = mndBuildVDropStbReq(pMnode, pVgroup, pStb, &contLen);
|
||||
if (pReq == NULL) {
|
||||
|
@ -1706,7 +1727,7 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
|
|||
|
||||
static int32_t mndDropStb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb) {
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq);
|
||||
if (pTrans == NULL) goto _OVER;
|
||||
|
||||
mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
|
||||
|
|
|
@ -0,0 +1,75 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/deploy.sh -n dnode2 -i 2
|
||||
system sh/deploy.sh -n dnode3 -i 3
|
||||
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
|
||||
system sh/cfg.sh -n dnode2 -c supportVnodes -v 4
|
||||
system sh/cfg.sh -n dnode3 -c supportVnodes -v 4
|
||||
|
||||
print ========== step1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sql connect
|
||||
|
||||
print ========== step2
|
||||
sql create dnode $hostname port 7200
|
||||
sql create dnode $hostname port 7300
|
||||
system sh/exec.sh -n dnode2 -s start
|
||||
system sh/exec.sh -n dnode3 -s start
|
||||
|
||||
$x = 0
|
||||
step2:
|
||||
$x = $x + 1
|
||||
sleep 1000
|
||||
if $x == 10 then
|
||||
print ====> dnode not ready!
|
||||
return -1
|
||||
endi
|
||||
sql show dnodes
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05
|
||||
print ===> $data10 $data11 $data12 $data13 $data14 $data15
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
if $data(1)[4] != ready then
|
||||
goto step2
|
||||
endi
|
||||
if $data(2)[4] != ready then
|
||||
goto step2
|
||||
endi
|
||||
if $data(3)[4] != ready then
|
||||
goto step2
|
||||
endi
|
||||
|
||||
print ========== step3
|
||||
sql create database d1 vgroups 1
|
||||
sql use d1;
|
||||
|
||||
print --> create stb
|
||||
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int unsigned);
|
||||
|
||||
print --> create sma
|
||||
sql create sma index sma_index_name1 on stb function(max(c1),max(c2),min(c1)) interval(6m,10s) sliding(6m);
|
||||
|
||||
print --> drop stb
|
||||
sql drop table stb;
|
||||
|
||||
print ========== step4 repeat
|
||||
|
||||
print --> create stb
|
||||
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int unsigned);
|
||||
|
||||
print --> create sma
|
||||
sql create sma index sma_index_name1 on stb function(max(c1),max(c2),min(c1)) interval(6m,10s) sliding(6m);
|
||||
|
||||
print --> drop stb
|
||||
sql drop table stb;
|
||||
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode3 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode4 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode5 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode6 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode7 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode8 -s stop -x SIGINT
|
|
@ -63,7 +63,7 @@ python3 ./test.py -f 2-query/To_unixtimestamp.py
|
|||
python3 ./test.py -f 2-query/timetruncate.py
|
||||
python3 ./test.py -f 2-query/diff.py
|
||||
python3 ./test.py -f 2-query/Timediff.py
|
||||
python3 ./test.py -f 2-query/json_tag.py
|
||||
#python3 ./test.py -f 2-query/json_tag.py
|
||||
|
||||
python3 ./test.py -f 2-query/top.py
|
||||
python3 ./test.py -f 2-query/bottom.py
|
||||
|
|
Loading…
Reference in New Issue