Merge branch 'enh/3.0' into enh/triggerCheckPoint2
This commit is contained in:
commit
1531e2d7d3
|
@ -66,11 +66,13 @@ void mndReleaseTrans(SMnode *pMnode, STrans *pTrans);
|
||||||
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict, const SRpcMsg *pReq,
|
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict, const SRpcMsg *pReq,
|
||||||
const char *opername);
|
const char *opername);
|
||||||
void mndTransDrop(STrans *pTrans);
|
void mndTransDrop(STrans *pTrans);
|
||||||
|
|
||||||
|
int32_t mndTransAppendPrepareLog(STrans *pTrans, SSdbRaw *pRaw);
|
||||||
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw);
|
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw);
|
||||||
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw);
|
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw);
|
||||||
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw);
|
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw);
|
||||||
int32_t mndTransAppendNullLog(STrans *pTrans);
|
int32_t mndTransAppendNullLog(STrans *pTrans);
|
||||||
int32_t mndTransAppendPrepareAction(STrans *pTrans, STransAction *pAction);
|
|
||||||
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction);
|
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction);
|
||||||
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
|
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
|
||||||
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
|
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
|
||||||
|
|
|
@ -37,7 +37,7 @@ int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup);
|
||||||
SArray *mndBuildDnodesArray(SMnode *, int32_t exceptDnodeId);
|
SArray *mndBuildDnodesArray(SMnode *, int32_t exceptDnodeId);
|
||||||
int32_t mndAllocSmaVgroup(SMnode *, SDbObj *pDb, SVgObj *pVgroup);
|
int32_t mndAllocSmaVgroup(SMnode *, SDbObj *pDb, SVgObj *pVgroup);
|
||||||
int32_t mndAllocVgroup(SMnode *, SDbObj *pDb, SVgObj **ppVgroups);
|
int32_t mndAllocVgroup(SMnode *, SDbObj *pDb, SVgObj **ppVgroups);
|
||||||
int32_t mndAddPrepareNewVgAction(SMnode *, STrans *pTrans, SVgObj *pVg);
|
int32_t mndAddNewVgPrepareAction(SMnode *, STrans *pTrans, SVgObj *pVg);
|
||||||
int32_t mndAddCreateVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid);
|
int32_t mndAddCreateVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid);
|
||||||
int32_t mndAddAlterVnodeConfirmAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup);
|
int32_t mndAddAlterVnodeConfirmAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup);
|
||||||
int32_t mndAddAlterVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, tmsg_t msgType);
|
int32_t mndAddAlterVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, tmsg_t msgType);
|
||||||
|
|
|
@ -37,6 +37,8 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw);
|
||||||
static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb);
|
static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb);
|
||||||
static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb);
|
static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb);
|
||||||
static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew);
|
static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew);
|
||||||
|
static int32_t mndNewDbActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj);
|
||||||
|
|
||||||
static int32_t mndProcessCreateDbReq(SRpcMsg *pReq);
|
static int32_t mndProcessCreateDbReq(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessAlterDbReq(SRpcMsg *pReq);
|
static int32_t mndProcessAlterDbReq(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessDropDbReq(SRpcMsg *pReq);
|
static int32_t mndProcessDropDbReq(SRpcMsg *pReq);
|
||||||
|
@ -59,6 +61,7 @@ int32_t mndInitDb(SMnode *pMnode) {
|
||||||
.insertFp = (SdbInsertFp)mndDbActionInsert,
|
.insertFp = (SdbInsertFp)mndDbActionInsert,
|
||||||
.updateFp = (SdbUpdateFp)mndDbActionUpdate,
|
.updateFp = (SdbUpdateFp)mndDbActionUpdate,
|
||||||
.deleteFp = (SdbDeleteFp)mndDbActionDelete,
|
.deleteFp = (SdbDeleteFp)mndDbActionDelete,
|
||||||
|
.validateFp = (SdbValidateFp)mndNewDbActionValidate,
|
||||||
};
|
};
|
||||||
|
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DB, mndProcessCreateDbReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DB, mndProcessCreateDbReq);
|
||||||
|
@ -247,6 +250,19 @@ _OVER:
|
||||||
return pRow;
|
return pRow;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mndNewDbActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj) {
|
||||||
|
SDbObj *pNewDb = pObj;
|
||||||
|
|
||||||
|
SDbObj *pOldDb = sdbAcquire(pMnode->pSdb, SDB_DB, pNewDb->name);
|
||||||
|
if (pOldDb != NULL) {
|
||||||
|
mError("trans:%d, db name already in use. name: %s", pTrans->id, pNewDb->name);
|
||||||
|
sdbRelease(pMnode->pSdb, pOldDb);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb) {
|
static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb) {
|
||||||
mTrace("db:%s, perform insert action, row:%p", pDb->name, pDb);
|
mTrace("db:%s, perform insert action, row:%p", pDb->name, pDb);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -448,9 +464,18 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
|
||||||
if (pCfg->tsdbPageSize <= 0) pCfg->tsdbPageSize = TSDB_DEFAULT_TSDB_PAGESIZE;
|
if (pCfg->tsdbPageSize <= 0) pCfg->tsdbPageSize = TSDB_DEFAULT_TSDB_PAGESIZE;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetPrepareNewVgActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
|
static int32_t mndSetCreateDbPrepareAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
||||||
|
SSdbRaw *pDbRaw = mndDbActionEncode(pDb);
|
||||||
|
if (pDbRaw == NULL) return -1;
|
||||||
|
|
||||||
|
if (mndTransAppendPrepareLog(pTrans, pDbRaw) != 0) return -1;
|
||||||
|
if (sdbSetRawStatus(pDbRaw, SDB_STATUS_CREATING) != 0) return -1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndSetNewVgPrepareActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
|
||||||
for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) {
|
for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) {
|
||||||
if (mndAddPrepareNewVgAction(pMnode, pTrans, (pVgroups + v)) != 0) return -1;
|
if (mndAddNewVgPrepareAction(pMnode, pTrans, (pVgroups + v)) != 0) return -1;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -459,7 +484,7 @@ static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pD
|
||||||
SSdbRaw *pDbRaw = mndDbActionEncode(pDb);
|
SSdbRaw *pDbRaw = mndDbActionEncode(pDb);
|
||||||
if (pDbRaw == NULL) return -1;
|
if (pDbRaw == NULL) return -1;
|
||||||
if (mndTransAppendRedolog(pTrans, pDbRaw) != 0) return -1;
|
if (mndTransAppendRedolog(pTrans, pDbRaw) != 0) return -1;
|
||||||
if (sdbSetRawStatus(pDbRaw, SDB_STATUS_CREATING) != 0) return -1;
|
if (sdbSetRawStatus(pDbRaw, SDB_STATUS_UPDATE) != 0) return -1;
|
||||||
|
|
||||||
for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) {
|
for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) {
|
||||||
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v);
|
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v);
|
||||||
|
@ -633,8 +658,8 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
|
||||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
|
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
|
||||||
|
|
||||||
mndTransSetOper(pTrans, MND_OPER_CREATE_DB);
|
mndTransSetOper(pTrans, MND_OPER_CREATE_DB);
|
||||||
if (mndSetPrepareNewVgActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
|
if (mndSetCreateDbPrepareAction(pMnode, pTrans, &dbObj) != 0) goto _OVER;
|
||||||
if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
|
if (mndSetNewVgPrepareActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
|
||||||
if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
|
if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
|
||||||
if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups, pNewUserDuped) != 0) goto _OVER;
|
if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups, pNewUserDuped) != 0) goto _OVER;
|
||||||
if (mndSetCreateDbRedoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
|
if (mndSetCreateDbRedoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
|
||||||
|
|
|
@ -631,7 +631,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
||||||
|
|
||||||
mndTransSetSerial(pTrans);
|
mndTransSetSerial(pTrans);
|
||||||
mInfo("trans:%d, used to create sma:%s stream:%s", pTrans->id, pCreate->name, streamObj.name);
|
mInfo("trans:%d, used to create sma:%s stream:%s", pTrans->id, pCreate->name, streamObj.name);
|
||||||
if (mndAddPrepareNewVgAction(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
|
if (mndAddNewVgPrepareAction(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
|
||||||
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
|
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
|
||||||
if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
|
if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
|
||||||
if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
|
if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
#include "mndSync.h"
|
#include "mndSync.h"
|
||||||
#include "mndCluster.h"
|
#include "mndCluster.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
#include "mndVgroup.h"
|
|
||||||
|
|
||||||
static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
|
static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
|
||||||
if (pMsg == NULL || pMsg->pCont == NULL) {
|
if (pMsg == NULL || pMsg->pCont == NULL) {
|
||||||
|
@ -75,25 +74,25 @@ static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndTransValidatePrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
|
static int32_t mndTransValidatePrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
|
||||||
|
SSdbRaw *pRaw = pAction->pRaw;
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
SSdbRow *pRow = NULL;
|
SSdbRow *pRow = NULL;
|
||||||
int32_t code = -1;
|
void *pObj = NULL;
|
||||||
|
int code = -1;
|
||||||
|
|
||||||
if (pAction->msgType == TDMT_MND_CREATE_VG) {
|
if (pRaw->status != SDB_STATUS_CREATING) goto _OUT;
|
||||||
pRow = mndVgroupActionDecode(pAction->pRaw);
|
|
||||||
if (pRow == NULL) goto _OUT;
|
|
||||||
|
|
||||||
SVgObj *pVgroup = sdbGetRowObj(pRow);
|
pRow = (pSdb->decodeFps[pRaw->type])(pRaw);
|
||||||
if (pVgroup == NULL) goto _OUT;
|
if (pRow == NULL) goto _OUT;
|
||||||
|
pObj = sdbGetRowObj(pRow);
|
||||||
|
if (pObj == NULL) goto _OUT;
|
||||||
|
|
||||||
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
|
SdbValidateFp validateFp = pSdb->validateFps[pRaw->type];
|
||||||
if (maxVgId > pVgroup->vgId) {
|
code = 0;
|
||||||
mError("trans:%d, failed to satisfy vgroup id %d of prepare action. maxVgId:%d", pTrans->id, pVgroup->vgId,
|
if (validateFp) {
|
||||||
maxVgId);
|
code = validateFp(pMnode, pTrans, pObj);
|
||||||
goto _OUT;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
code = 0;
|
|
||||||
_OUT:
|
_OUT:
|
||||||
taosMemoryFreeClear(pRow);
|
taosMemoryFreeClear(pRow);
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -654,11 +654,10 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) {
|
||||||
return mndTransAppendAction(pTrans->commitActions, &action);
|
return mndTransAppendAction(pTrans->commitActions, &action);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndTransAppendPrepareAction(STrans *pTrans, STransAction *pAction) {
|
int32_t mndTransAppendPrepareLog(STrans *pTrans, SSdbRaw *pRaw) {
|
||||||
pAction->stage = TRN_STAGE_PREPARE;
|
STransAction action = {
|
||||||
pAction->actionType = TRANS_ACTION_RAW;
|
.pRaw = pRaw, .stage = TRN_STAGE_PREPARE, .actionType = TRANS_ACTION_RAW, .mTraceId = pTrans->mTraceId};
|
||||||
pAction->mTraceId = pTrans->mTraceId;
|
return mndTransAppendAction(pTrans->prepareActions, &action);
|
||||||
return mndTransAppendAction(pTrans->prepareActions, pAction);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) {
|
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) {
|
||||||
|
|
|
@ -33,6 +33,7 @@
|
||||||
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
|
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
|
||||||
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
|
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
|
||||||
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew);
|
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew);
|
||||||
|
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj);
|
||||||
|
|
||||||
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||||
static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
|
||||||
|
@ -53,6 +54,7 @@ int32_t mndInitVgroup(SMnode *pMnode) {
|
||||||
.insertFp = (SdbInsertFp)mndVgroupActionInsert,
|
.insertFp = (SdbInsertFp)mndVgroupActionInsert,
|
||||||
.updateFp = (SdbUpdateFp)mndVgroupActionUpdate,
|
.updateFp = (SdbUpdateFp)mndVgroupActionUpdate,
|
||||||
.deleteFp = (SdbDeleteFp)mndVgroupActionDelete,
|
.deleteFp = (SdbDeleteFp)mndVgroupActionDelete,
|
||||||
|
.validateFp = (SdbValidateFp)mndNewVgActionValidate,
|
||||||
};
|
};
|
||||||
|
|
||||||
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndTransProcessRsp);
|
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndTransProcessRsp);
|
||||||
|
@ -171,6 +173,17 @@ _OVER:
|
||||||
return pRow;
|
return pRow;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj) {
|
||||||
|
SVgObj *pVgroup = pObj;
|
||||||
|
|
||||||
|
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
|
||||||
|
if (maxVgId > pVgroup->vgId) {
|
||||||
|
mError("trans:%d, vgroup id %d already in use. maxVgId:%d", pTrans->id, pVgroup->vgId, maxVgId);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup) {
|
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup) {
|
||||||
mTrace("vgId:%d, perform insert action, row:%p", pVgroup->vgId, pVgroup);
|
mTrace("vgId:%d, perform insert action, row:%p", pVgroup->vgId, pVgroup);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1259,12 +1272,11 @@ int32_t mndAddAlterVnodeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndAddPrepareNewVgAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVg) {
|
int32_t mndAddNewVgPrepareAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVg) {
|
||||||
SSdbRaw *pRaw = mndVgroupActionEncode(pVg);
|
SSdbRaw *pRaw = mndVgroupActionEncode(pVg);
|
||||||
if (pRaw == NULL) goto _err;
|
if (pRaw == NULL) goto _err;
|
||||||
|
|
||||||
STransAction action = {.pRaw = pRaw, .msgType = TDMT_MND_CREATE_VG};
|
if (mndTransAppendPrepareLog(pTrans, pRaw) != 0) goto _err;
|
||||||
if (mndTransAppendPrepareAction(pTrans, &action) != 0) goto _err;
|
|
||||||
(void)sdbSetRawStatus(pRaw, SDB_STATUS_CREATING);
|
(void)sdbSetRawStatus(pRaw, SDB_STATUS_CREATING);
|
||||||
pRaw = NULL;
|
pRaw = NULL;
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -2380,13 +2392,13 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro
|
||||||
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
|
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
|
||||||
int32_t srcVgId = newVg1.vgId;
|
int32_t srcVgId = newVg1.vgId;
|
||||||
newVg1.vgId = maxVgId;
|
newVg1.vgId = maxVgId;
|
||||||
if (mndAddPrepareNewVgAction(pMnode, pTrans, &newVg1) != 0) goto _OVER;
|
if (mndAddNewVgPrepareAction(pMnode, pTrans, &newVg1) != 0) goto _OVER;
|
||||||
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg1) != 0) goto _OVER;
|
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg1) != 0) goto _OVER;
|
||||||
|
|
||||||
maxVgId++;
|
maxVgId++;
|
||||||
srcVgId = newVg2.vgId;
|
srcVgId = newVg2.vgId;
|
||||||
newVg2.vgId = maxVgId;
|
newVg2.vgId = maxVgId;
|
||||||
if (mndAddPrepareNewVgAction(pMnode, pTrans, &newVg2) != 0) goto _OVER;
|
if (mndAddNewVgPrepareAction(pMnode, pTrans, &newVg2) != 0) goto _OVER;
|
||||||
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg2) != 0) goto _OVER;
|
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg2) != 0) goto _OVER;
|
||||||
|
|
||||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER;
|
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER;
|
||||||
|
|
|
@ -106,6 +106,7 @@ typedef int32_t (*SdbInsertFp)(SSdb *pSdb, void *pObj);
|
||||||
typedef int32_t (*SdbUpdateFp)(SSdb *pSdb, void *pSrcObj, void *pDstObj);
|
typedef int32_t (*SdbUpdateFp)(SSdb *pSdb, void *pSrcObj, void *pDstObj);
|
||||||
typedef int32_t (*SdbDeleteFp)(SSdb *pSdb, void *pObj, bool callFunc);
|
typedef int32_t (*SdbDeleteFp)(SSdb *pSdb, void *pObj, bool callFunc);
|
||||||
typedef int32_t (*SdbDeployFp)(SMnode *pMnode);
|
typedef int32_t (*SdbDeployFp)(SMnode *pMnode);
|
||||||
|
typedef int32_t (*SdbValidateFp)(SMnode *pMnode, void *pTrans, void *pObj);
|
||||||
typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw);
|
typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw);
|
||||||
typedef SSdbRaw *(*SdbEncodeFp)(void *pObj);
|
typedef SSdbRaw *(*SdbEncodeFp)(void *pObj);
|
||||||
typedef bool (*sdbTraverseFp)(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3);
|
typedef bool (*sdbTraverseFp)(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3);
|
||||||
|
@ -189,6 +190,7 @@ typedef struct SSdb {
|
||||||
SdbDeployFp deployFps[SDB_MAX];
|
SdbDeployFp deployFps[SDB_MAX];
|
||||||
SdbEncodeFp encodeFps[SDB_MAX];
|
SdbEncodeFp encodeFps[SDB_MAX];
|
||||||
SdbDecodeFp decodeFps[SDB_MAX];
|
SdbDecodeFp decodeFps[SDB_MAX];
|
||||||
|
SdbValidateFp validateFps[SDB_MAX];
|
||||||
TdThreadMutex filelock;
|
TdThreadMutex filelock;
|
||||||
} SSdb;
|
} SSdb;
|
||||||
|
|
||||||
|
@ -207,6 +209,7 @@ typedef struct {
|
||||||
SdbInsertFp insertFp;
|
SdbInsertFp insertFp;
|
||||||
SdbUpdateFp updateFp;
|
SdbUpdateFp updateFp;
|
||||||
SdbDeleteFp deleteFp;
|
SdbDeleteFp deleteFp;
|
||||||
|
SdbValidateFp validateFp;
|
||||||
} SSdbTable;
|
} SSdbTable;
|
||||||
|
|
||||||
typedef struct SSdbOpt {
|
typedef struct SSdbOpt {
|
||||||
|
|
|
@ -121,6 +121,7 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) {
|
||||||
pSdb->deployFps[sdbType] = table.deployFp;
|
pSdb->deployFps[sdbType] = table.deployFp;
|
||||||
pSdb->encodeFps[sdbType] = table.encodeFp;
|
pSdb->encodeFps[sdbType] = table.encodeFp;
|
||||||
pSdb->decodeFps[sdbType] = table.decodeFp;
|
pSdb->decodeFps[sdbType] = table.decodeFp;
|
||||||
|
pSdb->validateFps[sdbType] = table.validateFp;
|
||||||
|
|
||||||
int32_t hashType = 0;
|
int32_t hashType = 0;
|
||||||
if (keyType == SDB_KEY_INT32) {
|
if (keyType == SDB_KEY_INT32) {
|
||||||
|
|
|
@ -79,6 +79,8 @@ const char *sdbStatusName(ESdbStatus status) {
|
||||||
return "dropped";
|
return "dropped";
|
||||||
case SDB_STATUS_INIT:
|
case SDB_STATUS_INIT:
|
||||||
return "init";
|
return "init";
|
||||||
|
case SDB_STATUS_UPDATE:
|
||||||
|
return "update";
|
||||||
default:
|
default:
|
||||||
return "undefine";
|
return "undefine";
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,12 +17,24 @@
|
||||||
#include "tsdbUpgrade.h"
|
#include "tsdbUpgrade.h"
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
|
|
||||||
extern int vnodeScheduleTask(int (*execute)(void *), void *arg);
|
extern int vnodeScheduleTask(int (*execute)(void *), void *arg);
|
||||||
extern int vnodeScheduleTaskEx(int tpid, int (*execute)(void *), void *arg);
|
extern int vnodeScheduleTaskEx(int tpid, int (*execute)(void *), void *arg);
|
||||||
|
extern void remove_file(const char *fname);
|
||||||
|
|
||||||
#define TSDB_FS_EDIT_MIN TSDB_FEDIT_COMMIT
|
#define TSDB_FS_EDIT_MIN TSDB_FEDIT_COMMIT
|
||||||
#define TSDB_FS_EDIT_MAX (TSDB_FEDIT_MERGE + 1)
|
#define TSDB_FS_EDIT_MAX (TSDB_FEDIT_MERGE + 1)
|
||||||
|
|
||||||
|
typedef struct STFileHashEntry {
|
||||||
|
struct STFileHashEntry *next;
|
||||||
|
char fname[TSDB_FILENAME_LEN];
|
||||||
|
} STFileHashEntry;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t numFile;
|
||||||
|
int32_t numBucket;
|
||||||
|
STFileHashEntry **buckets;
|
||||||
|
} STFileHash;
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
TSDB_FS_STATE_NONE = 0,
|
TSDB_FS_STATE_NONE = 0,
|
||||||
TSDB_FS_STATE_OPEN,
|
TSDB_FS_STATE_OPEN,
|
||||||
|
@ -315,10 +327,8 @@ _exit:
|
||||||
}
|
}
|
||||||
|
|
||||||
// static int32_t
|
// static int32_t
|
||||||
static int32_t apply_abort(STFileSystem *fs) {
|
static int32_t tsdbFSDoSanAndFix(STFileSystem *fs);
|
||||||
// TODO
|
static int32_t apply_abort(STFileSystem *fs) { return tsdbFSDoSanAndFix(fs); }
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t abort_edit(STFileSystem *fs) {
|
static int32_t abort_edit(STFileSystem *fs) {
|
||||||
char fname[TSDB_FILENAME_LEN];
|
char fname[TSDB_FILENAME_LEN];
|
||||||
|
@ -349,6 +359,180 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbFSDoScanAndFixFile(STFileSystem *fs, const STFileObj *fobj) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
// check file existence
|
||||||
|
if (!taosCheckExistFile(fobj->fname)) {
|
||||||
|
code = TSDB_CODE_FILE_CORRUPTED;
|
||||||
|
tsdbError("vgId:%d %s failed since file:%s does not exist", TD_VID(fs->tsdb->pVnode), __func__, fobj->fname);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
{ // TODO: check file size
|
||||||
|
// int64_t fsize;
|
||||||
|
// if (taosStatFile(fobj->fname, &fsize, NULL, NULL) < 0) {
|
||||||
|
// code = TAOS_SYSTEM_ERROR(terrno);
|
||||||
|
// tsdbError("vgId:%d %s failed since file:%s stat failed, reason:%s", TD_VID(fs->tsdb->pVnode), __func__,
|
||||||
|
// fobj->fname, tstrerror(code));
|
||||||
|
// return code;
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void tsdbFSDestroyFileObjHash(STFileHash *hash);
|
||||||
|
|
||||||
|
static int32_t tsdbFSAddEntryToFileObjHash(STFileHash *hash, const char *fname) {
|
||||||
|
STFileHashEntry *entry = taosMemoryMalloc(sizeof(*entry));
|
||||||
|
if (entry == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
||||||
|
strcpy(entry->fname, fname);
|
||||||
|
|
||||||
|
uint32_t idx = MurmurHash3_32(fname, strlen(fname)) % hash->numBucket;
|
||||||
|
|
||||||
|
entry->next = hash->buckets[idx];
|
||||||
|
hash->buckets[idx] = entry;
|
||||||
|
hash->numFile++;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbFSCreateFileObjHash(STFileSystem *fs, STFileHash *hash) {
|
||||||
|
int32_t code = 0;
|
||||||
|
char fname[TSDB_FILENAME_LEN];
|
||||||
|
|
||||||
|
// init hash table
|
||||||
|
hash->numFile = 0;
|
||||||
|
hash->numBucket = 4096;
|
||||||
|
hash->buckets = taosMemoryCalloc(hash->numBucket, sizeof(STFileHashEntry *));
|
||||||
|
if (hash->buckets == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
// vnode.json
|
||||||
|
current_fname(fs->tsdb, fname, TSDB_FCURRENT);
|
||||||
|
code = tsdbFSAddEntryToFileObjHash(hash, fname);
|
||||||
|
if (code) goto _exit;
|
||||||
|
|
||||||
|
// other
|
||||||
|
STFileSet *fset = NULL;
|
||||||
|
TARRAY2_FOREACH(fs->fSetArr, fset) {
|
||||||
|
// data file
|
||||||
|
for (int32_t i = 0; i < TSDB_FTYPE_MAX; i++) {
|
||||||
|
if (fset->farr[i] != NULL) {
|
||||||
|
code = tsdbFSAddEntryToFileObjHash(hash, fset->farr[i]->fname);
|
||||||
|
if (code) goto _exit;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// stt file
|
||||||
|
SSttLvl *lvl = NULL;
|
||||||
|
TARRAY2_FOREACH(fset->lvlArr, lvl) {
|
||||||
|
STFileObj *fobj;
|
||||||
|
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
|
||||||
|
code = tsdbFSAddEntryToFileObjHash(hash, fobj->fname);
|
||||||
|
if (code) goto _exit;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
tsdbFSDestroyFileObjHash(hash);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static const STFileHashEntry *tsdbFSGetFileObjHashEntry(STFileHash *hash, const char *fname) {
|
||||||
|
uint32_t idx = MurmurHash3_32(fname, strlen(fname)) % hash->numBucket;
|
||||||
|
|
||||||
|
STFileHashEntry *entry = hash->buckets[idx];
|
||||||
|
while (entry) {
|
||||||
|
if (strcmp(entry->fname, fname) == 0) {
|
||||||
|
return entry;
|
||||||
|
}
|
||||||
|
entry = entry->next;
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void tsdbFSDestroyFileObjHash(STFileHash *hash) {
|
||||||
|
for (int32_t i = 0; i < hash->numBucket; i++) {
|
||||||
|
STFileHashEntry *entry = hash->buckets[i];
|
||||||
|
while (entry) {
|
||||||
|
STFileHashEntry *next = entry->next;
|
||||||
|
taosMemoryFree(entry);
|
||||||
|
entry = next;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
taosMemoryFree(hash->buckets);
|
||||||
|
memset(hash, 0, sizeof(*hash));
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbFSDoSanAndFix(STFileSystem *fs) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
{ // scan each file
|
||||||
|
STFileSet *fset = NULL;
|
||||||
|
TARRAY2_FOREACH(fs->fSetArr, fset) {
|
||||||
|
// data file
|
||||||
|
for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ftype++) {
|
||||||
|
if (fset->farr[ftype] == NULL) continue;
|
||||||
|
code = tsdbFSDoScanAndFixFile(fs, fset->farr[ftype]);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
// stt file
|
||||||
|
SSttLvl *lvl;
|
||||||
|
TARRAY2_FOREACH(fset->lvlArr, lvl) {
|
||||||
|
STFileObj *fobj;
|
||||||
|
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
|
||||||
|
code = tsdbFSDoScanAndFixFile(fs, fobj);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
{ // clear unreferenced files
|
||||||
|
STfsDir *dir = tfsOpendir(fs->tsdb->pVnode->pTfs, fs->tsdb->path);
|
||||||
|
if (dir == NULL) {
|
||||||
|
code = TAOS_SYSTEM_ERROR(terrno);
|
||||||
|
lino = __LINE__;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
STFileHash fobjHash = {0};
|
||||||
|
code = tsdbFSCreateFileObjHash(fs, &fobjHash);
|
||||||
|
if (code) goto _close_dir;
|
||||||
|
|
||||||
|
for (const STfsFile *file = NULL; (file = tfsReaddir(dir)) != NULL;) {
|
||||||
|
if (taosIsDir(file->aname)) continue;
|
||||||
|
|
||||||
|
if (tsdbFSGetFileObjHashEntry(&fobjHash, file->aname) == NULL) {
|
||||||
|
remove_file(file->aname);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tsdbFSDestroyFileObjHash(&fobjHash);
|
||||||
|
|
||||||
|
_close_dir:
|
||||||
|
tfsClosedir(dir);
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tsdbFSScanAndFix(STFileSystem *fs) {
|
static int32_t tsdbFSScanAndFix(STFileSystem *fs) {
|
||||||
fs->neid = 0;
|
fs->neid = 0;
|
||||||
|
|
||||||
|
@ -356,8 +540,18 @@ static int32_t tsdbFSScanAndFix(STFileSystem *fs) {
|
||||||
const STFileSet *fset;
|
const STFileSet *fset;
|
||||||
TARRAY2_FOREACH(fs->fSetArr, fset) { fs->neid = TMAX(fs->neid, tsdbTFileSetMaxCid(fset)); }
|
TARRAY2_FOREACH(fs->fSetArr, fset) { fs->neid = TMAX(fs->neid, tsdbTFileSetMaxCid(fset)); }
|
||||||
|
|
||||||
// TODO
|
// scan and fix
|
||||||
return 0;
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
code = tsdbFSDoSanAndFix(fs);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbFSDupState(STFileSystem *fs) {
|
static int32_t tsdbFSDupState(STFileSystem *fs) {
|
||||||
|
|
|
@ -41,7 +41,7 @@ static const struct {
|
||||||
[TSDB_FTYPE_STT] = {"stt", stt_to_json, stt_from_json},
|
[TSDB_FTYPE_STT] = {"stt", stt_to_json, stt_from_json},
|
||||||
};
|
};
|
||||||
|
|
||||||
static void remove_file(const char *fname) {
|
void remove_file(const char *fname) {
|
||||||
taosRemoveFile(fname);
|
taosRemoveFile(fname);
|
||||||
tsdbInfo("file:%s is removed", fname);
|
tsdbInfo("file:%s is removed", fname);
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
|
#include "vndCos.h"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief max key by precision
|
* @brief max key by precision
|
||||||
|
@ -76,9 +77,18 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq2 *pMsg) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
STsdbKeepCfg *pCfg = &pTsdb->keepCfg;
|
STsdbKeepCfg *pCfg = &pTsdb->keepCfg;
|
||||||
TSKEY now = taosGetTimestamp(pCfg->precision);
|
TSKEY now = taosGetTimestamp(pCfg->precision);
|
||||||
TSKEY minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep1;
|
TSKEY minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep2;
|
||||||
TSKEY maxKey = tsMaxKeyByPrecision[pCfg->precision];
|
TSKEY maxKey = tsMaxKeyByPrecision[pCfg->precision];
|
||||||
int32_t size = taosArrayGetSize(pMsg->aSubmitTbData);
|
int32_t size = taosArrayGetSize(pMsg->aSubmitTbData);
|
||||||
|
int32_t nlevel = tfsGetLevel(pTsdb->pVnode->pTfs);
|
||||||
|
|
||||||
|
if (nlevel > 1 && tsS3Enabled) {
|
||||||
|
if (nlevel == 3) {
|
||||||
|
minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep1;
|
||||||
|
} else if (nlevel == 2) {
|
||||||
|
minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
SSubmitTbData *pData = TARRAY_GET_ELEM(pMsg->aSubmitTbData, i);
|
SSubmitTbData *pData = TARRAY_GET_ELEM(pMsg->aSubmitTbData, i);
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
#include "tencode.h"
|
#include "tencode.h"
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
|
#include "vndCos.h"
|
||||||
#include "vnode.h"
|
#include "vnode.h"
|
||||||
#include "vnodeInt.h"
|
#include "vnodeInt.h"
|
||||||
|
|
||||||
|
@ -190,7 +191,18 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int
|
||||||
} else if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_NANO) {
|
} else if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_NANO) {
|
||||||
now *= 1000000;
|
now *= 1000000;
|
||||||
}
|
}
|
||||||
TSKEY minKey = now - tsTickPerMin[pVnode->config.tsdbCfg.precision] * pVnode->config.tsdbCfg.keep2;
|
|
||||||
|
int32_t nlevel = tfsGetLevel(pVnode->pTfs);
|
||||||
|
int32_t keep = pVnode->config.tsdbCfg.keep2;
|
||||||
|
if (nlevel > 1 && tsS3Enabled) {
|
||||||
|
if (nlevel == 3) {
|
||||||
|
keep = pVnode->config.tsdbCfg.keep1;
|
||||||
|
} else if (nlevel == 2) {
|
||||||
|
keep = pVnode->config.tsdbCfg.keep0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TSKEY minKey = now - tsTickPerMin[pVnode->config.tsdbCfg.precision] * keep;
|
||||||
TSKEY maxKey = tsMaxKeyByPrecision[pVnode->config.tsdbCfg.precision];
|
TSKEY maxKey = tsMaxKeyByPrecision[pVnode->config.tsdbCfg.precision];
|
||||||
if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
|
if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
|
||||||
uint64_t nColData;
|
uint64_t nColData;
|
||||||
|
|
|
@ -132,6 +132,7 @@
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py
|
||||||
,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py
|
,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py
|
||||||
|
,,n,system-test,python3 ./test.py -f 7-tmq/tmqDataPrecisionUnit.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/raw_block_interface_test.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/raw_block_interface_test.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSubscribeStb-r3.py -N 5
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSubscribeStb-r3.py -N 5
|
||||||
|
|
|
@ -5,8 +5,10 @@ parm_path=$(pwd ${parm_path})
|
||||||
echo "execute path:${parm_path}"
|
echo "execute path:${parm_path}"
|
||||||
cd ${parm_path}
|
cd ${parm_path}
|
||||||
cp cases.task ${case_file}
|
cp cases.task ${case_file}
|
||||||
|
# comment udf and stream case in windows
|
||||||
sed -i '/udf/d' ${case_file}
|
sed -i '/udf/d' ${case_file}
|
||||||
sed -i '/Udf/d' ${case_file}
|
sed -i '/Udf/d' ${case_file}
|
||||||
|
sed -i '/stream/d' ${case_file}
|
||||||
sed -i '/^$/d' ${case_file}
|
sed -i '/^$/d' ${case_file}
|
||||||
sed -i '$a\%%FINISHED%%' ${case_file}
|
sed -i '$a\%%FINISHED%%' ${case_file}
|
||||||
|
|
||||||
|
|
|
@ -812,6 +812,15 @@ class TDDnodes:
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
processID = subprocess.check_output(
|
processID = subprocess.check_output(
|
||||||
psCmd, shell=True).decode("utf-8").strip()
|
psCmd, shell=True).decode("utf-8").strip()
|
||||||
|
psCmd = "for /f %a in ('wmic process where \"name='tmq_sim'\" get processId ^| xargs echo ^| awk '{print $2}' ^&^& echo aa') do @(ps | grep %a | awk '{print $1}' | xargs)"
|
||||||
|
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip()
|
||||||
|
while(processID):
|
||||||
|
print(processID)
|
||||||
|
killCmd = "kill -9 %s > nul 2>&1" % processID
|
||||||
|
os.system(killCmd)
|
||||||
|
time.sleep(1)
|
||||||
|
processID = subprocess.check_output(
|
||||||
|
psCmd, shell=True).decode("utf-8").strip()
|
||||||
else:
|
else:
|
||||||
psCmd = "ps -ef | grep -w taosd | grep 'root' | grep -v grep| grep -v defunct | awk '{print $2}' | xargs"
|
psCmd = "ps -ef | grep -w taosd | grep 'root' | grep -v grep| grep -v defunct | awk '{print $2}' | xargs"
|
||||||
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip()
|
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip()
|
||||||
|
|
|
@ -26,8 +26,10 @@
|
||||||
./test.sh -f tsim/user/basic.sim
|
./test.sh -f tsim/user/basic.sim
|
||||||
./test.sh -f tsim/user/password.sim
|
./test.sh -f tsim/user/password.sim
|
||||||
./test.sh -f tsim/user/privilege_db.sim
|
./test.sh -f tsim/user/privilege_db.sim
|
||||||
|
./test.sh -f tsim/user/privilege_sysinfo.sim
|
||||||
./test.sh -f tsim/user/privilege_topic.sim
|
./test.sh -f tsim/user/privilege_topic.sim
|
||||||
./test.sh -f tsim/user/privilege_table.sim
|
./test.sh -f tsim/user/privilege_table.sim
|
||||||
|
./test.sh -f tsim/user/privilege_create_db.sim
|
||||||
./test.sh -f tsim/db/alter_option.sim
|
./test.sh -f tsim/db/alter_option.sim
|
||||||
./test.sh -f tsim/db/alter_replica_31.sim
|
./test.sh -f tsim/db/alter_replica_31.sim
|
||||||
./test.sh -f tsim/db/basic1.sim
|
./test.sh -f tsim/db/basic1.sim
|
||||||
|
@ -183,6 +185,7 @@
|
||||||
./test.sh -f tsim/query/scalarNull.sim
|
./test.sh -f tsim/query/scalarNull.sim
|
||||||
./test.sh -f tsim/query/session.sim
|
./test.sh -f tsim/query/session.sim
|
||||||
./test.sh -f tsim/query/join_interval.sim
|
./test.sh -f tsim/query/join_interval.sim
|
||||||
|
./test.sh -f tsim/query/join_pk.sim
|
||||||
./test.sh -f tsim/query/unionall_as_table.sim
|
./test.sh -f tsim/query/unionall_as_table.sim
|
||||||
./test.sh -f tsim/query/multi_order_by.sim
|
./test.sh -f tsim/query/multi_order_by.sim
|
||||||
./test.sh -f tsim/query/sys_tbname.sim
|
./test.sh -f tsim/query/sys_tbname.sim
|
||||||
|
@ -197,6 +200,7 @@
|
||||||
./test.sh -f tsim/query/tag_scan.sim
|
./test.sh -f tsim/query/tag_scan.sim
|
||||||
./test.sh -f tsim/query/nullColSma.sim
|
./test.sh -f tsim/query/nullColSma.sim
|
||||||
./test.sh -f tsim/query/bug3398.sim
|
./test.sh -f tsim/query/bug3398.sim
|
||||||
|
./test.sh -f tsim/query/explain_tsorder.sim
|
||||||
./test.sh -f tsim/qnode/basic1.sim
|
./test.sh -f tsim/qnode/basic1.sim
|
||||||
./test.sh -f tsim/snode/basic1.sim
|
./test.sh -f tsim/snode/basic1.sim
|
||||||
./test.sh -f tsim/mnode/basic1.sim
|
./test.sh -f tsim/mnode/basic1.sim
|
||||||
|
|
|
@ -226,7 +226,7 @@ class TDTestCase:
|
||||||
|
|
||||||
# init
|
# init
|
||||||
def init(self, conn, logSql, replicaVar=1):
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
seed = time.clock_gettime(time.CLOCK_REALTIME)
|
seed = time.time() % 10000
|
||||||
random.seed(seed)
|
random.seed(seed)
|
||||||
self.replicaVar = int(replicaVar)
|
self.replicaVar = int(replicaVar)
|
||||||
tdLog.debug(f"start to excute {__file__}")
|
tdLog.debug(f"start to excute {__file__}")
|
||||||
|
|
|
@ -93,19 +93,20 @@ class TDTestCase:
|
||||||
cfgPath = tdCom.getClientCfgPath()
|
cfgPath = tdCom.getClientCfgPath()
|
||||||
taosLogFile = '%s/../log/taoslog*'%(cfgPath)
|
taosLogFile = '%s/../log/taoslog*'%(cfgPath)
|
||||||
filterResultFile = '%s/../log/filter'%(cfgPath)
|
filterResultFile = '%s/../log/filter'%(cfgPath)
|
||||||
cmdStr = 'grep "process poll rsp, vgId:" %s >> %s'%(taosLogFile, filterResultFile)
|
cmdStr = 'grep -h "process poll rsp, vgId:" %s >> %s'%(taosLogFile, filterResultFile)
|
||||||
tdLog.info(cmdStr)
|
tdLog.info(cmdStr)
|
||||||
os.system(cmdStr)
|
os.system(cmdStr)
|
||||||
|
|
||||||
consumerDict = {}
|
consumerDict = {}
|
||||||
for index, line in enumerate(open(filterResultFile,'r')):
|
for index, line in enumerate(open(filterResultFile,'r')):
|
||||||
|
|
||||||
# tdLog.info("row[%d]: %s"%(index, line))
|
# tdLog.info("row[%d]: %s"%(index, line))
|
||||||
valueList = line.split(',')
|
valueList = line.split(',')
|
||||||
# for i in range(len(valueList)):
|
# for i in range(len(valueList)):
|
||||||
# tdLog.info("index[%d]: %s"%(i, valueList[i]))
|
# tdLog.info("index[%d]: %s"%(i, valueList[i]))
|
||||||
# get consumer id
|
# get consumer id
|
||||||
list2 = valueList[0].split(':')
|
list2 = valueList[0].split(':')
|
||||||
list3 = list2[4].split()
|
list3 = list2[3].split()
|
||||||
consumerId = list3[0]
|
consumerId = list3[0]
|
||||||
print("consumerId: %s"%(consumerId))
|
print("consumerId: %s"%(consumerId))
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,139 @@
|
||||||
|
import sys
|
||||||
|
import re
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
from taos.tmq import *
|
||||||
|
from util.log import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.dnodes import *
|
||||||
|
from util.common import *
|
||||||
|
sys.path.append("./7-tmq")
|
||||||
|
from tmqCommon import *
|
||||||
|
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
|
self.replicaVar = int(replicaVar)
|
||||||
|
tdLog.debug(f"start to excute {__file__}")
|
||||||
|
tdSql.init(conn.cursor(), False)
|
||||||
|
|
||||||
|
self.db_name = "tmq_db"
|
||||||
|
self.topic_name = "tmq_topic"
|
||||||
|
self.stable_name = "stb"
|
||||||
|
self.rows_per_table = 1000
|
||||||
|
self.ctb_num = 100
|
||||||
|
|
||||||
|
def prepareData(self, precisionUnit="ms"):
|
||||||
|
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
|
||||||
|
startTS = 1672502400000
|
||||||
|
if precisionUnit == "us":
|
||||||
|
startTS = 1672502400000000
|
||||||
|
elif precisionUnit == "ns":
|
||||||
|
startTS = 1672502400000000000
|
||||||
|
|
||||||
|
paraDict = {
|
||||||
|
'dbName': self.db_name,
|
||||||
|
'dropFlag': 1,
|
||||||
|
'event': '',
|
||||||
|
'vgroups': 4,
|
||||||
|
'stbName': self.stable_name,
|
||||||
|
'colPrefix': 'c',
|
||||||
|
'tagPrefix': 't',
|
||||||
|
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
|
||||||
|
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
||||||
|
'ctbPrefix': 'ctb',
|
||||||
|
'ctbStartIdx': 0,
|
||||||
|
'ctbNum': self.ctb_num,
|
||||||
|
'rowsPerTbl': self.rows_per_table,
|
||||||
|
'batchNum': 100,
|
||||||
|
'startTs': startTS, # 2023-01-01 00:00:00.000
|
||||||
|
'pollDelay': 3,
|
||||||
|
'showMsg': 1,
|
||||||
|
'showRow': 1,
|
||||||
|
'snapshot': 0
|
||||||
|
}
|
||||||
|
|
||||||
|
# init the consumer database
|
||||||
|
tmqCom.initConsumerTable()
|
||||||
|
|
||||||
|
# create testing database、stable、ctables
|
||||||
|
tdCom.create_database(tdSql, paraDict["dbName"], paraDict["dropFlag"], vgroups=paraDict["vgroups"], replica=self.replicaVar, precision=precisionUnit)
|
||||||
|
tdLog.info("create database %s successfully" % paraDict["dbName"])
|
||||||
|
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"], stbName=paraDict["stbName"])
|
||||||
|
tdLog.info("create stable %s successfully" % paraDict["stbName"])
|
||||||
|
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"], ctbPrefix=paraDict['ctbPrefix'],
|
||||||
|
ctbNum=paraDict["ctbNum"], ctbStartIdx=paraDict['ctbStartIdx'])
|
||||||
|
tdLog.info("create child tables successfully")
|
||||||
|
|
||||||
|
# insert data into tables and wait the async thread exit
|
||||||
|
tdLog.info("insert data into tables")
|
||||||
|
pThread = tmqCom.asyncInsertDataByInterlace(paraDict)
|
||||||
|
pThread.join()
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
"""Check tmq feature for different data precision unit like "ms、us、ns"
|
||||||
|
"""
|
||||||
|
precision_unit = ["ms", "us", "ns"]
|
||||||
|
for unit in precision_unit:
|
||||||
|
tdLog.info(f"start to test precision unit {unit}")
|
||||||
|
self.prepareData(precisionUnit=unit)
|
||||||
|
# drop database if exists
|
||||||
|
tdSql.execute(f"drop database if exists {self.db_name}")
|
||||||
|
self.prepareData(unit)
|
||||||
|
|
||||||
|
# create topic
|
||||||
|
tdLog.info("create topic from %s" % self.stable_name)
|
||||||
|
queryString = "select ts, c1, c2 from %s.%s where t4 == 'beijing' or t4 == 'changsha' "%(self.db_name, self.stable_name)
|
||||||
|
sqlString = "create topic %s as %s" %(self.topic_name, queryString)
|
||||||
|
tdLog.info("create topic sql: %s"%sqlString)
|
||||||
|
tdSql.execute(sqlString)
|
||||||
|
|
||||||
|
# save consumer info
|
||||||
|
consumerId = 0
|
||||||
|
expectrowcnt = self.rows_per_table * self.ctb_num
|
||||||
|
topicList = self.topic_name
|
||||||
|
ifcheckdata = 0
|
||||||
|
ifManualCommit = 0
|
||||||
|
keyList = 'group.id:cgrp1,\
|
||||||
|
enable.auto.commit:false,\
|
||||||
|
auto.commit.interval.ms:6000,\
|
||||||
|
auto.offset.reset:earliest'
|
||||||
|
tmqCom.insertConsumerInfo(consumerId, expectrowcnt, topicList, keyList, ifcheckdata, ifManualCommit)
|
||||||
|
|
||||||
|
# start consume processor
|
||||||
|
paraDict = {
|
||||||
|
'pollDelay': 15,
|
||||||
|
'showMsg': 1,
|
||||||
|
'showRow': 1,
|
||||||
|
'snapshot': 0
|
||||||
|
}
|
||||||
|
tdLog.info("start consume processor")
|
||||||
|
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'], dbName=self.db_name, showMsg=paraDict['showMsg'], showRow=paraDict['showRow'], snapshot=paraDict['snapshot'])
|
||||||
|
|
||||||
|
tdLog.info("start to check consume result")
|
||||||
|
expectRows = 1
|
||||||
|
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||||
|
totalConsumeRows = 0
|
||||||
|
for i in range(expectRows):
|
||||||
|
totalConsumeRows += resultList[i]
|
||||||
|
|
||||||
|
tdSql.query(queryString)
|
||||||
|
totalRowsFromQuery = tdSql.getRows()
|
||||||
|
tdLog.info("act consume rows: %d, act query rows: %d "%(totalConsumeRows, totalRowsFromQuery))
|
||||||
|
|
||||||
|
if totalConsumeRows < totalRowsFromQuery:
|
||||||
|
tdLog.exit("tmq consume rows error!")
|
||||||
|
|
||||||
|
tmqCom.waitSubscriptionExit(tdSql, self.topic_name)
|
||||||
|
tdSql.query("drop topic %s" % self.topic_name)
|
||||||
|
tdSql.execute("drop database %s" % self.db_name)
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.execute(f"drop database if exists {self.db_name}")
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success(f"{__file__} successfully executed")
|
||||||
|
|
||||||
|
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -199,7 +199,7 @@ if __name__ == "__main__":
|
||||||
createDnodeNums = value
|
createDnodeNums = value
|
||||||
|
|
||||||
if key in ['-i', '--independentMnode']:
|
if key in ['-i', '--independentMnode']:
|
||||||
independentMnode = value
|
independentMnode = False
|
||||||
|
|
||||||
if key in ['-R', '--restful']:
|
if key in ['-R', '--restful']:
|
||||||
restful = True
|
restful = True
|
||||||
|
@ -553,6 +553,7 @@ if __name__ == "__main__":
|
||||||
else :
|
else :
|
||||||
# dnode > 1 cluster
|
# dnode > 1 cluster
|
||||||
tdLog.debug("create an cluster with %s nodes and make %s dnode as independent mnode"%(dnodeNums,mnodeNums))
|
tdLog.debug("create an cluster with %s nodes and make %s dnode as independent mnode"%(dnodeNums,mnodeNums))
|
||||||
|
print(independentMnode,"independentMnode valuse")
|
||||||
dnodeslist = cluster.configure_cluster(dnodeNums=dnodeNums, mnodeNums=mnodeNums, independentMnode=independentMnode)
|
dnodeslist = cluster.configure_cluster(dnodeNums=dnodeNums, mnodeNums=mnodeNums, independentMnode=independentMnode)
|
||||||
tdDnodes = ClusterDnodes(dnodeslist)
|
tdDnodes = ClusterDnodes(dnodeslist)
|
||||||
tdDnodes.init(deployPath, masterIp)
|
tdDnodes.init(deployPath, masterIp)
|
||||||
|
|
|
@ -17,6 +17,7 @@ python3 ./test.py -f 2-query/nestedQuery_str.py -Q 4
|
||||||
python3 ./test.py -f 2-query/nestedQuery_math.py -Q 4
|
python3 ./test.py -f 2-query/nestedQuery_math.py -Q 4
|
||||||
python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4
|
python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4
|
||||||
python3 ./test.py -f 2-query/nestedQuery_26.py -Q 4
|
python3 ./test.py -f 2-query/nestedQuery_26.py -Q 4
|
||||||
|
python3 ./test.py -f 2-query/interval_limit_opt.py -Q 4
|
||||||
python3 ./test.py -f 7-tmq/tmqShow.py
|
python3 ./test.py -f 7-tmq/tmqShow.py
|
||||||
python3 ./test.py -f 7-tmq/tmqDropStb.py
|
python3 ./test.py -f 7-tmq/tmqDropStb.py
|
||||||
python3 ./test.py -f 7-tmq/subscribeStb0.py
|
python3 ./test.py -f 7-tmq/subscribeStb0.py
|
||||||
|
@ -133,6 +134,8 @@ python3 ./test.py -f 0-others/sysinfo.py
|
||||||
python3 ./test.py -f 0-others/user_control.py
|
python3 ./test.py -f 0-others/user_control.py
|
||||||
python3 ./test.py -f 0-others/user_manage.py
|
python3 ./test.py -f 0-others/user_manage.py
|
||||||
python3 ./test.py -f 0-others/user_privilege.py
|
python3 ./test.py -f 0-others/user_privilege.py
|
||||||
|
python3 ./test.py -f 0-others/user_privilege_show.py
|
||||||
|
python3 ./test.py -f 0-others/user_privilege_all.py
|
||||||
python3 ./test.py -f 0-others/fsync.py
|
python3 ./test.py -f 0-others/fsync.py
|
||||||
python3 ./test.py -f 0-others/multilevel.py
|
python3 ./test.py -f 0-others/multilevel.py
|
||||||
python3 ./test.py -f 0-others/compatibility.py
|
python3 ./test.py -f 0-others/compatibility.py
|
||||||
|
@ -421,6 +424,7 @@ python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertData.py -N 6 -M 3
|
||||||
python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertData.py -N 6 -M 3 -n 3
|
python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertData.py -N 6 -M 3 -n 3
|
||||||
python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3
|
python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3
|
||||||
python3 ./test.py -f 6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py -N 6 -M 3
|
python3 ./test.py -f 6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py -N 6 -M 3
|
||||||
|
python3 ./test.py -f 6-cluster/5dnode3mnodeRoll.py -N 3 -C 1
|
||||||
python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6
|
python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6
|
||||||
python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 -n 3
|
python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 -n 3
|
||||||
python3 ./test.py -f 6-cluster/5dnode3mnodeRecreateMnode.py -N 6 -M 3
|
python3 ./test.py -f 6-cluster/5dnode3mnodeRecreateMnode.py -N 6 -M 3
|
||||||
|
|
Loading…
Reference in New Issue