diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 625546aa55..04544da80e 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -66,11 +66,13 @@ void mndReleaseTrans(SMnode *pMnode, STrans *pTrans); STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict, const SRpcMsg *pReq, const char *opername); void mndTransDrop(STrans *pTrans); + +int32_t mndTransAppendPrepareLog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendNullLog(STrans *pTrans); -int32_t mndTransAppendPrepareAction(STrans *pTrans, STransAction *pAction); + int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction); int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen); diff --git a/source/dnode/mnode/impl/inc/mndVgroup.h b/source/dnode/mnode/impl/inc/mndVgroup.h index 7c2f8b5b65..4dbd2fe7f8 100644 --- a/source/dnode/mnode/impl/inc/mndVgroup.h +++ b/source/dnode/mnode/impl/inc/mndVgroup.h @@ -37,7 +37,7 @@ int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup); SArray *mndBuildDnodesArray(SMnode *, int32_t exceptDnodeId); int32_t mndAllocSmaVgroup(SMnode *, SDbObj *pDb, SVgObj *pVgroup); int32_t mndAllocVgroup(SMnode *, SDbObj *pDb, SVgObj **ppVgroups); -int32_t mndAddPrepareNewVgAction(SMnode *, STrans *pTrans, SVgObj *pVg); +int32_t mndAddNewVgPrepareAction(SMnode *, STrans *pTrans, SVgObj *pVg); int32_t mndAddCreateVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid); int32_t mndAddAlterVnodeConfirmAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup); int32_t mndAddAlterVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, tmsg_t msgType); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 4f7e80c0a3..20b342f9e3 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -37,6 +37,8 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw); static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb); static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb); static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew); +static int32_t mndNewDbActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj); + static int32_t mndProcessCreateDbReq(SRpcMsg *pReq); static int32_t mndProcessAlterDbReq(SRpcMsg *pReq); static int32_t mndProcessDropDbReq(SRpcMsg *pReq); @@ -59,6 +61,7 @@ int32_t mndInitDb(SMnode *pMnode) { .insertFp = (SdbInsertFp)mndDbActionInsert, .updateFp = (SdbUpdateFp)mndDbActionUpdate, .deleteFp = (SdbDeleteFp)mndDbActionDelete, + .validateFp = (SdbValidateFp)mndNewDbActionValidate, }; mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DB, mndProcessCreateDbReq); @@ -247,6 +250,19 @@ _OVER: return pRow; } +static int32_t mndNewDbActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj) { + SDbObj *pNewDb = pObj; + + SDbObj *pOldDb = sdbAcquire(pMnode->pSdb, SDB_DB, pNewDb->name); + if (pOldDb != NULL) { + mError("trans:%d, db name already in use. name: %s", pTrans->id, pNewDb->name); + sdbRelease(pMnode->pSdb, pOldDb); + return -1; + } + + return 0; +} + static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb) { mTrace("db:%s, perform insert action, row:%p", pDb->name, pDb); return 0; @@ -448,9 +464,18 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) { if (pCfg->tsdbPageSize <= 0) pCfg->tsdbPageSize = TSDB_DEFAULT_TSDB_PAGESIZE; } -static int32_t mndSetPrepareNewVgActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { +static int32_t mndSetCreateDbPrepareAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { + SSdbRaw *pDbRaw = mndDbActionEncode(pDb); + if (pDbRaw == NULL) return -1; + + if (mndTransAppendPrepareLog(pTrans, pDbRaw) != 0) return -1; + if (sdbSetRawStatus(pDbRaw, SDB_STATUS_CREATING) != 0) return -1; + return 0; +} + +static int32_t mndSetNewVgPrepareActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) { - if (mndAddPrepareNewVgAction(pMnode, pTrans, (pVgroups + v)) != 0) return -1; + if (mndAddNewVgPrepareAction(pMnode, pTrans, (pVgroups + v)) != 0) return -1; } return 0; } @@ -459,7 +484,7 @@ static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pD SSdbRaw *pDbRaw = mndDbActionEncode(pDb); if (pDbRaw == NULL) return -1; if (mndTransAppendRedolog(pTrans, pDbRaw) != 0) return -1; - if (sdbSetRawStatus(pDbRaw, SDB_STATUS_CREATING) != 0) return -1; + if (sdbSetRawStatus(pDbRaw, SDB_STATUS_UPDATE) != 0) return -1; for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) { SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v); @@ -633,8 +658,8 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate, if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER; mndTransSetOper(pTrans, MND_OPER_CREATE_DB); - if (mndSetPrepareNewVgActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; - if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; + if (mndSetCreateDbPrepareAction(pMnode, pTrans, &dbObj) != 0) goto _OVER; + if (mndSetNewVgPrepareActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups, pNewUserDuped) != 0) goto _OVER; if (mndSetCreateDbRedoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index e186a8742f..d666f80fd3 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -631,7 +631,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea mndTransSetSerial(pTrans); mInfo("trans:%d, used to create sma:%s stream:%s", pTrans->id, pCreate->name, streamObj.name); - if (mndAddPrepareNewVgAction(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER; + if (mndAddNewVgPrepareAction(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER; if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER; if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 68bfe09b5e..3d8fd6220f 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -17,7 +17,6 @@ #include "mndSync.h" #include "mndCluster.h" #include "mndTrans.h" -#include "mndVgroup.h" static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { if (pMsg == NULL || pMsg->pCont == NULL) { @@ -75,25 +74,25 @@ static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { } static int32_t mndTransValidatePrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { + SSdbRaw *pRaw = pAction->pRaw; + SSdb *pSdb = pMnode->pSdb; SSdbRow *pRow = NULL; - int32_t code = -1; + void *pObj = NULL; + int code = -1; - if (pAction->msgType == TDMT_MND_CREATE_VG) { - pRow = mndVgroupActionDecode(pAction->pRaw); - if (pRow == NULL) goto _OUT; + if (pRaw->status != SDB_STATUS_CREATING) goto _OUT; - SVgObj *pVgroup = sdbGetRowObj(pRow); - if (pVgroup == NULL) goto _OUT; + pRow = (pSdb->decodeFps[pRaw->type])(pRaw); + if (pRow == NULL) goto _OUT; + pObj = sdbGetRowObj(pRow); + if (pObj == NULL) goto _OUT; - int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP); - if (maxVgId > pVgroup->vgId) { - mError("trans:%d, failed to satisfy vgroup id %d of prepare action. maxVgId:%d", pTrans->id, pVgroup->vgId, - maxVgId); - goto _OUT; - } + SdbValidateFp validateFp = pSdb->validateFps[pRaw->type]; + code = 0; + if (validateFp) { + code = validateFp(pMnode, pTrans, pObj); } - code = 0; _OUT: taosMemoryFreeClear(pRow); return code; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index f622d49a4f..e34abb9e79 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -654,11 +654,10 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendAction(pTrans->commitActions, &action); } -int32_t mndTransAppendPrepareAction(STrans *pTrans, STransAction *pAction) { - pAction->stage = TRN_STAGE_PREPARE; - pAction->actionType = TRANS_ACTION_RAW; - pAction->mTraceId = pTrans->mTraceId; - return mndTransAppendAction(pTrans->prepareActions, pAction); +int32_t mndTransAppendPrepareLog(STrans *pTrans, SSdbRaw *pRaw) { + STransAction action = { + .pRaw = pRaw, .stage = TRN_STAGE_PREPARE, .actionType = TRANS_ACTION_RAW, .mTraceId = pTrans->mTraceId}; + return mndTransAppendAction(pTrans->prepareActions, &action); } int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) { diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 406392271c..e0156db67c 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -33,6 +33,7 @@ static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup); static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup); static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew); +static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj); static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter); @@ -53,6 +54,7 @@ int32_t mndInitVgroup(SMnode *pMnode) { .insertFp = (SdbInsertFp)mndVgroupActionInsert, .updateFp = (SdbUpdateFp)mndVgroupActionUpdate, .deleteFp = (SdbDeleteFp)mndVgroupActionDelete, + .validateFp = (SdbValidateFp)mndNewVgActionValidate, }; mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndTransProcessRsp); @@ -171,6 +173,17 @@ _OVER: return pRow; } +static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj) { + SVgObj *pVgroup = pObj; + + int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP); + if (maxVgId > pVgroup->vgId) { + mError("trans:%d, vgroup id %d already in use. maxVgId:%d", pTrans->id, pVgroup->vgId, maxVgId); + return -1; + } + return 0; +} + static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup) { mTrace("vgId:%d, perform insert action, row:%p", pVgroup->vgId, pVgroup); return 0; @@ -1259,12 +1272,11 @@ int32_t mndAddAlterVnodeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb return 0; } -int32_t mndAddPrepareNewVgAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVg) { +int32_t mndAddNewVgPrepareAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVg) { SSdbRaw *pRaw = mndVgroupActionEncode(pVg); if (pRaw == NULL) goto _err; - STransAction action = {.pRaw = pRaw, .msgType = TDMT_MND_CREATE_VG}; - if (mndTransAppendPrepareAction(pTrans, &action) != 0) goto _err; + if (mndTransAppendPrepareLog(pTrans, pRaw) != 0) goto _err; (void)sdbSetRawStatus(pRaw, SDB_STATUS_CREATING); pRaw = NULL; return 0; @@ -2380,13 +2392,13 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP); int32_t srcVgId = newVg1.vgId; newVg1.vgId = maxVgId; - if (mndAddPrepareNewVgAction(pMnode, pTrans, &newVg1) != 0) goto _OVER; + if (mndAddNewVgPrepareAction(pMnode, pTrans, &newVg1) != 0) goto _OVER; if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg1) != 0) goto _OVER; maxVgId++; srcVgId = newVg2.vgId; newVg2.vgId = maxVgId; - if (mndAddPrepareNewVgAction(pMnode, pTrans, &newVg2) != 0) goto _OVER; + if (mndAddNewVgPrepareAction(pMnode, pTrans, &newVg2) != 0) goto _OVER; if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg2) != 0) goto _OVER; if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER; diff --git a/source/dnode/mnode/sdb/inc/sdb.h b/source/dnode/mnode/sdb/inc/sdb.h index 3c96d8a2fd..695373d220 100644 --- a/source/dnode/mnode/sdb/inc/sdb.h +++ b/source/dnode/mnode/sdb/inc/sdb.h @@ -106,6 +106,7 @@ typedef int32_t (*SdbInsertFp)(SSdb *pSdb, void *pObj); typedef int32_t (*SdbUpdateFp)(SSdb *pSdb, void *pSrcObj, void *pDstObj); typedef int32_t (*SdbDeleteFp)(SSdb *pSdb, void *pObj, bool callFunc); typedef int32_t (*SdbDeployFp)(SMnode *pMnode); +typedef int32_t (*SdbValidateFp)(SMnode *pMnode, void *pTrans, void *pObj); typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw); typedef SSdbRaw *(*SdbEncodeFp)(void *pObj); typedef bool (*sdbTraverseFp)(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3); @@ -189,6 +190,7 @@ typedef struct SSdb { SdbDeployFp deployFps[SDB_MAX]; SdbEncodeFp encodeFps[SDB_MAX]; SdbDecodeFp decodeFps[SDB_MAX]; + SdbValidateFp validateFps[SDB_MAX]; TdThreadMutex filelock; } SSdb; @@ -207,6 +209,7 @@ typedef struct { SdbInsertFp insertFp; SdbUpdateFp updateFp; SdbDeleteFp deleteFp; + SdbValidateFp validateFp; } SSdbTable; typedef struct SSdbOpt { diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index 9797dd8337..c4b32fe87c 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -121,6 +121,7 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) { pSdb->deployFps[sdbType] = table.deployFp; pSdb->encodeFps[sdbType] = table.encodeFp; pSdb->decodeFps[sdbType] = table.decodeFp; + pSdb->validateFps[sdbType] = table.validateFp; int32_t hashType = 0; if (keyType == SDB_KEY_INT32) { diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index 258b22d8ee..09743d549a 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -79,6 +79,8 @@ const char *sdbStatusName(ESdbStatus status) { return "dropped"; case SDB_STATUS_INIT: return "init"; + case SDB_STATUS_UPDATE: + return "update"; default: return "undefine"; } diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index afa294d3b0..7f843070d6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -17,12 +17,24 @@ #include "tsdbUpgrade.h" #include "vnd.h" -extern int vnodeScheduleTask(int (*execute)(void *), void *arg); -extern int vnodeScheduleTaskEx(int tpid, int (*execute)(void *), void *arg); +extern int vnodeScheduleTask(int (*execute)(void *), void *arg); +extern int vnodeScheduleTaskEx(int tpid, int (*execute)(void *), void *arg); +extern void remove_file(const char *fname); #define TSDB_FS_EDIT_MIN TSDB_FEDIT_COMMIT #define TSDB_FS_EDIT_MAX (TSDB_FEDIT_MERGE + 1) +typedef struct STFileHashEntry { + struct STFileHashEntry *next; + char fname[TSDB_FILENAME_LEN]; +} STFileHashEntry; + +typedef struct { + int32_t numFile; + int32_t numBucket; + STFileHashEntry **buckets; +} STFileHash; + enum { TSDB_FS_STATE_NONE = 0, TSDB_FS_STATE_OPEN, @@ -315,10 +327,8 @@ _exit: } // static int32_t -static int32_t apply_abort(STFileSystem *fs) { - // TODO - return 0; -} +static int32_t tsdbFSDoSanAndFix(STFileSystem *fs); +static int32_t apply_abort(STFileSystem *fs) { return tsdbFSDoSanAndFix(fs); } static int32_t abort_edit(STFileSystem *fs) { char fname[TSDB_FILENAME_LEN]; @@ -349,6 +359,180 @@ _exit: return code; } +static int32_t tsdbFSDoScanAndFixFile(STFileSystem *fs, const STFileObj *fobj) { + int32_t code = 0; + int32_t lino = 0; + + // check file existence + if (!taosCheckExistFile(fobj->fname)) { + code = TSDB_CODE_FILE_CORRUPTED; + tsdbError("vgId:%d %s failed since file:%s does not exist", TD_VID(fs->tsdb->pVnode), __func__, fobj->fname); + return code; + } + + { // TODO: check file size + // int64_t fsize; + // if (taosStatFile(fobj->fname, &fsize, NULL, NULL) < 0) { + // code = TAOS_SYSTEM_ERROR(terrno); + // tsdbError("vgId:%d %s failed since file:%s stat failed, reason:%s", TD_VID(fs->tsdb->pVnode), __func__, + // fobj->fname, tstrerror(code)); + // return code; + // } + } + + return 0; +} + +static void tsdbFSDestroyFileObjHash(STFileHash *hash); + +static int32_t tsdbFSAddEntryToFileObjHash(STFileHash *hash, const char *fname) { + STFileHashEntry *entry = taosMemoryMalloc(sizeof(*entry)); + if (entry == NULL) return TSDB_CODE_OUT_OF_MEMORY; + + strcpy(entry->fname, fname); + + uint32_t idx = MurmurHash3_32(fname, strlen(fname)) % hash->numBucket; + + entry->next = hash->buckets[idx]; + hash->buckets[idx] = entry; + hash->numFile++; + + return 0; +} + +static int32_t tsdbFSCreateFileObjHash(STFileSystem *fs, STFileHash *hash) { + int32_t code = 0; + char fname[TSDB_FILENAME_LEN]; + + // init hash table + hash->numFile = 0; + hash->numBucket = 4096; + hash->buckets = taosMemoryCalloc(hash->numBucket, sizeof(STFileHashEntry *)); + if (hash->buckets == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + return code; + } + + // vnode.json + current_fname(fs->tsdb, fname, TSDB_FCURRENT); + code = tsdbFSAddEntryToFileObjHash(hash, fname); + if (code) goto _exit; + + // other + STFileSet *fset = NULL; + TARRAY2_FOREACH(fs->fSetArr, fset) { + // data file + for (int32_t i = 0; i < TSDB_FTYPE_MAX; i++) { + if (fset->farr[i] != NULL) { + code = tsdbFSAddEntryToFileObjHash(hash, fset->farr[i]->fname); + if (code) goto _exit; + } + } + + // stt file + SSttLvl *lvl = NULL; + TARRAY2_FOREACH(fset->lvlArr, lvl) { + STFileObj *fobj; + TARRAY2_FOREACH(lvl->fobjArr, fobj) { + code = tsdbFSAddEntryToFileObjHash(hash, fobj->fname); + if (code) goto _exit; + } + } + } + +_exit: + if (code) { + tsdbFSDestroyFileObjHash(hash); + } + return code; +} + +static const STFileHashEntry *tsdbFSGetFileObjHashEntry(STFileHash *hash, const char *fname) { + uint32_t idx = MurmurHash3_32(fname, strlen(fname)) % hash->numBucket; + + STFileHashEntry *entry = hash->buckets[idx]; + while (entry) { + if (strcmp(entry->fname, fname) == 0) { + return entry; + } + entry = entry->next; + } + + return NULL; +} + +static void tsdbFSDestroyFileObjHash(STFileHash *hash) { + for (int32_t i = 0; i < hash->numBucket; i++) { + STFileHashEntry *entry = hash->buckets[i]; + while (entry) { + STFileHashEntry *next = entry->next; + taosMemoryFree(entry); + entry = next; + } + } + taosMemoryFree(hash->buckets); + memset(hash, 0, sizeof(*hash)); +} + +static int32_t tsdbFSDoSanAndFix(STFileSystem *fs) { + int32_t code = 0; + int32_t lino = 0; + + { // scan each file + STFileSet *fset = NULL; + TARRAY2_FOREACH(fs->fSetArr, fset) { + // data file + for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ftype++) { + if (fset->farr[ftype] == NULL) continue; + code = tsdbFSDoScanAndFixFile(fs, fset->farr[ftype]); + TSDB_CHECK_CODE(code, lino, _exit); + } + + // stt file + SSttLvl *lvl; + TARRAY2_FOREACH(fset->lvlArr, lvl) { + STFileObj *fobj; + TARRAY2_FOREACH(lvl->fobjArr, fobj) { + code = tsdbFSDoScanAndFixFile(fs, fobj); + TSDB_CHECK_CODE(code, lino, _exit); + } + } + } + } + + { // clear unreferenced files + STfsDir *dir = tfsOpendir(fs->tsdb->pVnode->pTfs, fs->tsdb->path); + if (dir == NULL) { + code = TAOS_SYSTEM_ERROR(terrno); + lino = __LINE__; + goto _exit; + } + + STFileHash fobjHash = {0}; + code = tsdbFSCreateFileObjHash(fs, &fobjHash); + if (code) goto _close_dir; + + for (const STfsFile *file = NULL; (file = tfsReaddir(dir)) != NULL;) { + if (taosIsDir(file->aname)) continue; + + if (tsdbFSGetFileObjHashEntry(&fobjHash, file->aname) == NULL) { + remove_file(file->aname); + } + } + + tsdbFSDestroyFileObjHash(&fobjHash); + + _close_dir: + tfsClosedir(dir); + } + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code); + } + return code; +} + static int32_t tsdbFSScanAndFix(STFileSystem *fs) { fs->neid = 0; @@ -356,8 +540,18 @@ static int32_t tsdbFSScanAndFix(STFileSystem *fs) { const STFileSet *fset; TARRAY2_FOREACH(fs->fSetArr, fset) { fs->neid = TMAX(fs->neid, tsdbTFileSetMaxCid(fset)); } - // TODO - return 0; + // scan and fix + int32_t code = 0; + int32_t lino = 0; + + code = tsdbFSDoSanAndFix(fs); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code); + } + return code; } static int32_t tsdbFSDupState(STFileSystem *fs) { diff --git a/source/dnode/vnode/src/tsdb/tsdbFile2.c b/source/dnode/vnode/src/tsdb/tsdbFile2.c index be021169cd..585316469a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFile2.c @@ -41,7 +41,7 @@ static const struct { [TSDB_FTYPE_STT] = {"stt", stt_to_json, stt_from_json}, }; -static void remove_file(const char *fname) { +void remove_file(const char *fname) { taosRemoveFile(fname); tsdbInfo("file:%s is removed", fname); } diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index 6e89b47adc..5949b103d5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -14,6 +14,7 @@ */ #include "tsdb.h" +#include "vndCos.h" /** * @brief max key by precision @@ -76,9 +77,18 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq2 *pMsg) { int32_t code = 0; STsdbKeepCfg *pCfg = &pTsdb->keepCfg; TSKEY now = taosGetTimestamp(pCfg->precision); - TSKEY minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep1; + TSKEY minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep2; TSKEY maxKey = tsMaxKeyByPrecision[pCfg->precision]; int32_t size = taosArrayGetSize(pMsg->aSubmitTbData); + int32_t nlevel = tfsGetLevel(pTsdb->pVnode->pTfs); + + if (nlevel > 1 && tsS3Enabled) { + if (nlevel == 3) { + minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep1; + } else if (nlevel == 2) { + minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep0; + } + } for (int32_t i = 0; i < size; ++i) { SSubmitTbData *pData = TARRAY_GET_ELEM(pMsg->aSubmitTbData, i); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 1039c92e5b..7029cc1af4 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -16,6 +16,7 @@ #include "tencode.h" #include "tmsg.h" #include "vnd.h" +#include "vndCos.h" #include "vnode.h" #include "vnodeInt.h" @@ -190,7 +191,18 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int } else if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_NANO) { now *= 1000000; } - TSKEY minKey = now - tsTickPerMin[pVnode->config.tsdbCfg.precision] * pVnode->config.tsdbCfg.keep2; + + int32_t nlevel = tfsGetLevel(pVnode->pTfs); + int32_t keep = pVnode->config.tsdbCfg.keep2; + if (nlevel > 1 && tsS3Enabled) { + if (nlevel == 3) { + keep = pVnode->config.tsdbCfg.keep1; + } else if (nlevel == 2) { + keep = pVnode->config.tsdbCfg.keep0; + } + } + + TSKEY minKey = now - tsTickPerMin[pVnode->config.tsdbCfg.precision] * keep; TSKEY maxKey = tsMaxKeyByPrecision[pVnode->config.tsdbCfg.precision]; if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { uint64_t nColData; diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 2a744f4d8e..f802f1cd7c 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -132,6 +132,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py ,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py +,,n,system-test,python3 ./test.py -f 7-tmq/tmqDataPrecisionUnit.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/raw_block_interface_test.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSubscribeStb-r3.py -N 5 diff --git a/tests/parallel_test/split_case.sh b/tests/parallel_test/split_case.sh index 4e2c535faf..e237cbf984 100755 --- a/tests/parallel_test/split_case.sh +++ b/tests/parallel_test/split_case.sh @@ -5,8 +5,10 @@ parm_path=$(pwd ${parm_path}) echo "execute path:${parm_path}" cd ${parm_path} cp cases.task ${case_file} +# comment udf and stream case in windows sed -i '/udf/d' ${case_file} sed -i '/Udf/d' ${case_file} +sed -i '/stream/d' ${case_file} sed -i '/^$/d' ${case_file} sed -i '$a\%%FINISHED%%' ${case_file} diff --git a/tests/pytest/util/dnodes.py b/tests/pytest/util/dnodes.py index 89e3df81b9..8633fc660f 100644 --- a/tests/pytest/util/dnodes.py +++ b/tests/pytest/util/dnodes.py @@ -812,6 +812,15 @@ class TDDnodes: time.sleep(1) processID = subprocess.check_output( psCmd, shell=True).decode("utf-8").strip() + psCmd = "for /f %a in ('wmic process where \"name='tmq_sim'\" get processId ^| xargs echo ^| awk '{print $2}' ^&^& echo aa') do @(ps | grep %a | awk '{print $1}' | xargs)" + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip() + while(processID): + print(processID) + killCmd = "kill -9 %s > nul 2>&1" % processID + os.system(killCmd) + time.sleep(1) + processID = subprocess.check_output( + psCmd, shell=True).decode("utf-8").strip() else: psCmd = "ps -ef | grep -w taosd | grep 'root' | grep -v grep| grep -v defunct | awk '{print $2}' | xargs" processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip() diff --git a/tests/script/win-test-file b/tests/script/win-test-file index 901bad269a..4ff4b52f7e 100644 --- a/tests/script/win-test-file +++ b/tests/script/win-test-file @@ -26,8 +26,10 @@ ./test.sh -f tsim/user/basic.sim ./test.sh -f tsim/user/password.sim ./test.sh -f tsim/user/privilege_db.sim +./test.sh -f tsim/user/privilege_sysinfo.sim ./test.sh -f tsim/user/privilege_topic.sim ./test.sh -f tsim/user/privilege_table.sim +./test.sh -f tsim/user/privilege_create_db.sim ./test.sh -f tsim/db/alter_option.sim ./test.sh -f tsim/db/alter_replica_31.sim ./test.sh -f tsim/db/basic1.sim @@ -183,6 +185,7 @@ ./test.sh -f tsim/query/scalarNull.sim ./test.sh -f tsim/query/session.sim ./test.sh -f tsim/query/join_interval.sim +./test.sh -f tsim/query/join_pk.sim ./test.sh -f tsim/query/unionall_as_table.sim ./test.sh -f tsim/query/multi_order_by.sim ./test.sh -f tsim/query/sys_tbname.sim @@ -197,6 +200,7 @@ ./test.sh -f tsim/query/tag_scan.sim ./test.sh -f tsim/query/nullColSma.sim ./test.sh -f tsim/query/bug3398.sim +./test.sh -f tsim/query/explain_tsorder.sim ./test.sh -f tsim/qnode/basic1.sim ./test.sh -f tsim/snode/basic1.sim ./test.sh -f tsim/mnode/basic1.sim diff --git a/tests/system-test/1-insert/precisionNS.py b/tests/system-test/1-insert/precisionNS.py index be8f1e21dc..11d79180a9 100644 --- a/tests/system-test/1-insert/precisionNS.py +++ b/tests/system-test/1-insert/precisionNS.py @@ -226,7 +226,7 @@ class TDTestCase: # init def init(self, conn, logSql, replicaVar=1): - seed = time.clock_gettime(time.CLOCK_REALTIME) + seed = time.time() % 10000 random.seed(seed) self.replicaVar = int(replicaVar) tdLog.debug(f"start to excute {__file__}") diff --git a/tests/system-test/7-tmq/tmqClientConsLog.py b/tests/system-test/7-tmq/tmqClientConsLog.py index 7f755726ce..d708e6642c 100644 --- a/tests/system-test/7-tmq/tmqClientConsLog.py +++ b/tests/system-test/7-tmq/tmqClientConsLog.py @@ -93,19 +93,20 @@ class TDTestCase: cfgPath = tdCom.getClientCfgPath() taosLogFile = '%s/../log/taoslog*'%(cfgPath) filterResultFile = '%s/../log/filter'%(cfgPath) - cmdStr = 'grep "process poll rsp, vgId:" %s >> %s'%(taosLogFile, filterResultFile) + cmdStr = 'grep -h "process poll rsp, vgId:" %s >> %s'%(taosLogFile, filterResultFile) tdLog.info(cmdStr) os.system(cmdStr) consumerDict = {} for index, line in enumerate(open(filterResultFile,'r')): + # tdLog.info("row[%d]: %s"%(index, line)) valueList = line.split(',') # for i in range(len(valueList)): # tdLog.info("index[%d]: %s"%(i, valueList[i])) # get consumer id list2 = valueList[0].split(':') - list3 = list2[4].split() + list3 = list2[3].split() consumerId = list3[0] print("consumerId: %s"%(consumerId)) diff --git a/tests/system-test/7-tmq/tmqDataPrecisionUnit.py b/tests/system-test/7-tmq/tmqDataPrecisionUnit.py new file mode 100644 index 0000000000..f050116a1b --- /dev/null +++ b/tests/system-test/7-tmq/tmqDataPrecisionUnit.py @@ -0,0 +1,139 @@ +import sys +import re +import time +import threading +from taos.tmq import * +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +sys.path.append("./7-tmq") +from tmqCommon import * + + +class TDTestCase: + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + self.db_name = "tmq_db" + self.topic_name = "tmq_topic" + self.stable_name = "stb" + self.rows_per_table = 1000 + self.ctb_num = 100 + + def prepareData(self, precisionUnit="ms"): + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") + startTS = 1672502400000 + if precisionUnit == "us": + startTS = 1672502400000000 + elif precisionUnit == "ns": + startTS = 1672502400000000000 + + paraDict = { + 'dbName': self.db_name, + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': self.stable_name, + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': self.ctb_num, + 'rowsPerTbl': self.rows_per_table, + 'batchNum': 100, + 'startTs': startTS, # 2023-01-01 00:00:00.000 + 'pollDelay': 3, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0 + } + + # init the consumer database + tmqCom.initConsumerTable() + + # create testing database、stable、ctables + tdCom.create_database(tdSql, paraDict["dbName"], paraDict["dropFlag"], vgroups=paraDict["vgroups"], replica=self.replicaVar, precision=precisionUnit) + tdLog.info("create database %s successfully" % paraDict["dbName"]) + tmqCom.create_stable(tdSql, dbName=paraDict["dbName"], stbName=paraDict["stbName"]) + tdLog.info("create stable %s successfully" % paraDict["stbName"]) + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"], ctbPrefix=paraDict['ctbPrefix'], + ctbNum=paraDict["ctbNum"], ctbStartIdx=paraDict['ctbStartIdx']) + tdLog.info("create child tables successfully") + + # insert data into tables and wait the async thread exit + tdLog.info("insert data into tables") + pThread = tmqCom.asyncInsertDataByInterlace(paraDict) + pThread.join() + + def run(self): + """Check tmq feature for different data precision unit like "ms、us、ns" + """ + precision_unit = ["ms", "us", "ns"] + for unit in precision_unit: + tdLog.info(f"start to test precision unit {unit}") + self.prepareData(precisionUnit=unit) + # drop database if exists + tdSql.execute(f"drop database if exists {self.db_name}") + self.prepareData(unit) + + # create topic + tdLog.info("create topic from %s" % self.stable_name) + queryString = "select ts, c1, c2 from %s.%s where t4 == 'beijing' or t4 == 'changsha' "%(self.db_name, self.stable_name) + sqlString = "create topic %s as %s" %(self.topic_name, queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + # save consumer info + consumerId = 0 + expectrowcnt = self.rows_per_table * self.ctb_num + topicList = self.topic_name + ifcheckdata = 0 + ifManualCommit = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt, topicList, keyList, ifcheckdata, ifManualCommit) + + # start consume processor + paraDict = { + 'pollDelay': 15, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0 + } + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'], dbName=self.db_name, showMsg=paraDict['showMsg'], showRow=paraDict['showRow'], snapshot=paraDict['snapshot']) + + tdLog.info("start to check consume result") + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + tdSql.query(queryString) + totalRowsFromQuery = tdSql.getRows() + tdLog.info("act consume rows: %d, act query rows: %d "%(totalConsumeRows, totalRowsFromQuery)) + + if totalConsumeRows < totalRowsFromQuery: + tdLog.exit("tmq consume rows error!") + + tmqCom.waitSubscriptionExit(tdSql, self.topic_name) + tdSql.query("drop topic %s" % self.topic_name) + tdSql.execute("drop database %s" % self.db_name) + + def stop(self): + tdSql.execute(f"drop database if exists {self.db_name}") + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/test.py b/tests/system-test/test.py index 7082bb0f22..1c50e5bbbe 100644 --- a/tests/system-test/test.py +++ b/tests/system-test/test.py @@ -199,7 +199,7 @@ if __name__ == "__main__": createDnodeNums = value if key in ['-i', '--independentMnode']: - independentMnode = value + independentMnode = False if key in ['-R', '--restful']: restful = True @@ -553,6 +553,7 @@ if __name__ == "__main__": else : # dnode > 1 cluster tdLog.debug("create an cluster with %s nodes and make %s dnode as independent mnode"%(dnodeNums,mnodeNums)) + print(independentMnode,"independentMnode valuse") dnodeslist = cluster.configure_cluster(dnodeNums=dnodeNums, mnodeNums=mnodeNums, independentMnode=independentMnode) tdDnodes = ClusterDnodes(dnodeslist) tdDnodes.init(deployPath, masterIp) diff --git a/tests/system-test/win-test-file b/tests/system-test/win-test-file index 0f644666cb..adea684ef0 100644 --- a/tests/system-test/win-test-file +++ b/tests/system-test/win-test-file @@ -17,6 +17,7 @@ python3 ./test.py -f 2-query/nestedQuery_str.py -Q 4 python3 ./test.py -f 2-query/nestedQuery_math.py -Q 4 python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4 python3 ./test.py -f 2-query/nestedQuery_26.py -Q 4 +python3 ./test.py -f 2-query/interval_limit_opt.py -Q 4 python3 ./test.py -f 7-tmq/tmqShow.py python3 ./test.py -f 7-tmq/tmqDropStb.py python3 ./test.py -f 7-tmq/subscribeStb0.py @@ -133,6 +134,8 @@ python3 ./test.py -f 0-others/sysinfo.py python3 ./test.py -f 0-others/user_control.py python3 ./test.py -f 0-others/user_manage.py python3 ./test.py -f 0-others/user_privilege.py +python3 ./test.py -f 0-others/user_privilege_show.py +python3 ./test.py -f 0-others/user_privilege_all.py python3 ./test.py -f 0-others/fsync.py python3 ./test.py -f 0-others/multilevel.py python3 ./test.py -f 0-others/compatibility.py @@ -421,6 +424,7 @@ python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertData.py -N 6 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertData.py -N 6 -M 3 -n 3 python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 python3 ./test.py -f 6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py -N 6 -M 3 +python3 ./test.py -f 6-cluster/5dnode3mnodeRoll.py -N 3 -C 1 python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 -n 3 python3 ./test.py -f 6-cluster/5dnode3mnodeRecreateMnode.py -N 6 -M 3