diff --git a/include/util/tdef.h b/include/util/tdef.h index 5befa6a67f..7c7413a421 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -287,7 +287,7 @@ typedef enum ELogicConditionType { #define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta #define TSDB_MIN_VNODES_PER_DB 1 -#define TSDB_MAX_VNODES_PER_DB 4096 +#define TSDB_MAX_VNODES_PER_DB 1024 #define TSDB_DEFAULT_VN_PER_DB 2 #define TSDB_MIN_BUFFER_PER_VNODE 3 // unit MB #define TSDB_MAX_BUFFER_PER_VNODE 16384 // unit MB diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 455f204542..b8844390d2 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -108,13 +108,11 @@ static const SSysDbTableSchema userFuncSchema[] = { }; static const SSysDbTableSchema userIdxSchema[] = { + {.name = "index_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "index_database", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "index_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "column_name", .bytes = SYSTABLE_SCH_COL_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "index_type", .bytes = 10, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "index_extensions", .bytes = 256, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, + {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, }; static const SSysDbTableSchema userStbsSchema[] = { @@ -313,7 +311,7 @@ static const SSysDbTableSchema querySchema[] = { {.name = "query_id", .bytes = TSDB_QUERY_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "req_id", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT}, {.name = "connId", .bytes = 4, .type = TSDB_DATA_TYPE_UINT}, - {.name = "app", .bytes = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "app", .bytes = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "user", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "end_point", .bytes = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, @@ -329,17 +327,17 @@ static const SSysDbTableSchema appSchema[] = { {.name = "app_id", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT}, {.name = "ip", .bytes = TSDB_IPv4ADDR_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "name", .bytes = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "start_time", .bytes = 8 , .type = TSDB_DATA_TYPE_TIMESTAMP}, - {.name = "insert_req", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT}, - {.name = "insert_row", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT}, - {.name = "insert_time", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT}, - {.name = "insert_bytes", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT}, - {.name = "fetch_bytes", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT}, - {.name = "query_time", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT}, - {.name = "show_query", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT}, - {.name = "total_req", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT}, - {.name = "current_req", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT}, + {.name = "name", .bytes = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, + {.name = "insert_req", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT}, + {.name = "insert_row", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT}, + {.name = "insert_time", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT}, + {.name = "insert_bytes", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT}, + {.name = "fetch_bytes", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT}, + {.name = "query_time", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT}, + {.name = "show_query", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT}, + {.name = "total_req", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT}, + {.name = "current_req", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT}, {.name = "last_access", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, }; @@ -353,8 +351,7 @@ static const SSysTableMeta perfsMeta[] = { {TSDB_PERFS_TABLE_TRANS, transSchema, tListLen(transSchema)}, {TSDB_PERFS_TABLE_SMAS, smaSchema, tListLen(smaSchema)}, {TSDB_PERFS_TABLE_STREAMS, streamSchema, tListLen(streamSchema)}, - {TSDB_PERFS_TABLE_APPS, appSchema, tListLen(appSchema)} -}; + {TSDB_PERFS_TABLE_APPS, appSchema, tListLen(appSchema)}}; void getInfosDbMeta(const SSysTableMeta** pInfosTableMeta, size_t* size) { *pInfosTableMeta = infosMeta; diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index c7db4b8e1c..4f958de121 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -549,18 +549,33 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) { terrno = TSDB_CODE_MND_DB_OPTION_UNCHANGED; if (pAlter->buffer > 0 && pAlter->buffer != pDb->cfg.buffer) { +#if 1 + terrno = TSDB_CODE_OPS_NOT_SUPPORT; + return terrno; +#else pDb->cfg.buffer = pAlter->buffer; terrno = 0; +#endif } if (pAlter->pages > 0 && pAlter->pages != pDb->cfg.pages) { +#if 1 + terrno = TSDB_CODE_OPS_NOT_SUPPORT; + return terrno; +#else pDb->cfg.pages = pAlter->pages; terrno = 0; +#endif } if (pAlter->pageSize > 0 && pAlter->pageSize != pDb->cfg.pageSize) { +#if 1 + terrno = TSDB_CODE_OPS_NOT_SUPPORT; + return terrno; +#else pDb->cfg.pageSize = pAlter->pageSize; terrno = 0; +#endif } if (pAlter->daysPerFile > 0 && pAlter->daysPerFile != pDb->cfg.daysPerFile) { @@ -594,8 +609,12 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) { } if (pAlter->strict >= 0 && pAlter->strict != pDb->cfg.strict) { +#if 1 + terrno = TSDB_CODE_OPS_NOT_SUPPORT; +#else pDb->cfg.strict = pAlter->strict; terrno = 0; +#endif } if (pAlter->cacheLastRow >= 0 && pAlter->cacheLastRow != pDb->cfg.cacheLastRow) { @@ -604,9 +623,13 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) { } if (pAlter->replications > 0 && pAlter->replications != pDb->cfg.replications) { +#if 1 + terrno = TSDB_CODE_OPS_NOT_SUPPORT; +#else pDb->cfg.replications = pAlter->replications; pDb->vgVersion++; terrno = 0; +#endif } return terrno; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 5e78d0f434..8198445753 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -758,6 +758,11 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) { } } + if (numOfVnodes > 0) { + terrno = TSDB_CODE_OPS_NOT_SUPPORT; + goto _OVER; + } + code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, numOfVnodes); if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index 6014adbe95..2a8cbc4425 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -71,7 +71,7 @@ static int32_t convertToRetrieveType(char *name, int32_t len) { } else if (strncasecmp(name, TSDB_INS_TABLE_USER_FUNCTIONS, len) == 0) { type = TSDB_MGMT_TABLE_FUNC; } else if (strncasecmp(name, TSDB_INS_TABLE_USER_INDEXES, len) == 0) { - // type = TSDB_MGMT_TABLE_INDEX; + type = TSDB_MGMT_TABLE_INDEX; } else if (strncasecmp(name, TSDB_INS_TABLE_USER_STABLES, len) == 0) { type = TSDB_MGMT_TABLE_STB; } else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, len) == 0) { diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 6aea1f41d1..da5b8cb48e 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -1147,29 +1147,32 @@ static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc SName smaName = {0}; tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + char n1[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_TO_VARSTR(n1, (char *)tNameGetTableName(&smaName)); - char n[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_TO_VARSTR(n, (char *)tNameGetTableName(&smaName)); - cols++; + char n2[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_TO_VARSTR(n2, (char *)mndGetDbStr(pDb->name)); SName stbName = {0}; tNameFromString(&stbName, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - - char n1[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_TO_VARSTR(n1, (char *)tNameGetTableName(&stbName)); + char n3[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_TO_VARSTR(n3, (char *)tNameGetTableName(&stbName)); SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)n, false); - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)&pSma->createdTime, false); - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)n1, false); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)n2, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)n3, false); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)&pSma->dstVgId, false); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pSma->createdTime, false); + numOfRows++; sdbRelease(pSdb, pSma); } diff --git a/source/dnode/mnode/impl/src/mndSnode.c b/source/dnode/mnode/impl/src/mndSnode.c index 2dd8592bf8..d18a233d29 100644 --- a/source/dnode/mnode/impl/src/mndSnode.c +++ b/source/dnode/mnode/impl/src/mndSnode.c @@ -273,6 +273,9 @@ _OVER: } static int32_t mndProcessCreateSnodeReq(SRpcMsg *pReq) { +#if 1 + return TSDB_CODE_OPS_NOT_SUPPORT; +#else SMnode *pMnode = pReq->info.node; int32_t code = -1; SSnodeObj *pObj = NULL; @@ -315,6 +318,7 @@ _OVER: mndReleaseSnode(pMnode, pObj); mndReleaseDnode(pMnode, pDnode); return code; +#endif } static int32_t mndSetDropSnodeRedoLogs(STrans *pTrans, SSnodeObj *pObj) { @@ -386,9 +390,12 @@ _OVER: } static int32_t mndProcessDropSnodeReq(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; - int32_t code = -1; - SSnodeObj *pObj = NULL; +#if 1 + return TSDB_CODE_OPS_NOT_SUPPORT; +#else + SMnode *pMnode = pReq->info.node; + int32_t code = -1; + SSnodeObj *pObj = NULL; SMDropSnodeReq dropReq = {0}; if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { @@ -422,6 +429,7 @@ _OVER: mndReleaseSnode(pMnode, pObj); return code; +#endif } static int32_t mndRetrieveSnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 08e108803a..c2125f75f8 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -247,7 +247,6 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name)); pObj->status = 0; - // TODO pObj->igExpired = pCreate->igExpired; pObj->trigger = pCreate->triggerType; pObj->triggerParam = pCreate->maxDelay; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 5ca945624d..c53e831e84 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -1219,6 +1219,9 @@ _OVER: } static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) { +#if 1 + return TSDB_CODE_OPS_NOT_SUPPORT; +#else SMnode *pMnode = pReq->info.node; SDnodeObj *pNew1 = NULL; SDnodeObj *pNew2 = NULL; @@ -1412,6 +1415,7 @@ _OVER: mndReleaseDb(pMnode, pDb); return code; +#endif } int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SArray *pArray) { @@ -1711,6 +1715,9 @@ _OVER: } static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) { +#if 1 + return TSDB_CODE_OPS_NOT_SUPPORT; +#else SMnode *pMnode = pReq->info.node; int32_t code = -1; SArray *pArray = NULL; @@ -1759,6 +1766,7 @@ _OVER: taosArrayDestroy(pArray); return code; +#endif } bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; } \ No newline at end of file diff --git a/source/dnode/mnode/impl/test/db/db.cpp b/source/dnode/mnode/impl/test/db/db.cpp index a1bab5d1d4..a3d129c7c4 100644 --- a/source/dnode/mnode/impl/test/db/db.cpp +++ b/source/dnode/mnode/impl/test/db/db.cpp @@ -93,7 +93,7 @@ TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) { SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_DB, pReq, contLen); ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, 0); + ASSERT_EQ(pRsp->code, TSDB_CODE_OPS_NOT_SUPPORT); } test.SendShowReq(TSDB_MGMT_TABLE_DB, "user_databases", ""); diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index a95eb395c2..7f7b3fa885 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -62,12 +62,10 @@ struct STSmaStat { struct SRSmaStat { SSma *pSma; - int64_t refId; // shared by persistence/fetch tasks - void *tmrHandle; // for persistence task - tmr_h tmrId; // for persistence task - int32_t tmrSeconds; // for persistence task - int8_t triggerStat; // for persistence task - int8_t runningStat; // for persistence task + int64_t refId; // shared by fetch tasks + void *tmrHandle; // shared by fetch tasks + int8_t triggerStat; // shared by fetch tasks + int8_t runningStat; // for persistence task SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo; }; @@ -82,7 +80,6 @@ struct SSmaStat { #define SMA_TSMA_STAT(s) (&(s)->tsmaStat) #define SMA_RSMA_STAT(s) (&(s)->rsmaStat) #define RSMA_INFO_HASH(r) ((r)->rsmaInfoHash) -#define RSMA_TMR_ID(r) ((r)->tmrId) #define RSMA_TMR_HANDLE(r) ((r)->tmrHandle) #define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat) #define RSMA_RUNNING_STAT(r) (&(r)->runningStat) @@ -185,9 +182,11 @@ static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) { static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType); void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType); void *tdFreeRSmaInfo(SRSmaInfo *pInfo); +int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat); int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName); int32_t tdProcessRSmaRestoreImpl(SSma *pSma); + int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg); int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg); int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days); @@ -244,8 +243,8 @@ void tdUpdateTFileMagic(STFile *pTFile, void *pCksm); void tdCloseTFile(STFile *pTFile); void tdDestroyTFile(STFile *pTFile); -void tdGetVndFileName(int32_t vgId, const char *dname, const char *fname, int64_t version, char *outputName); -void tdGetVndDirName(int32_t vgId, const char *dname, char *outputName); +void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t version, char *outputName); +void tdGetVndDirName(int32_t vgId,const char *pdname, const char *dname, bool endWithSep, char *outputName); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 8023567a40..4f81e9d62a 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -163,8 +163,8 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool // sma int32_t smaOpen(SVnode* pVnode); -int32_t smaCloseEnv(SSma* pSma); -int32_t smaCloseEx(SSma* pSma); +int32_t smaClose(SSma* pSma); +int32_t smaBegin(SSma* pSma); int32_t smaPreCommit(SSma* pSma); int32_t smaCommit(SSma* pSma); int32_t smaPostCommit(SSma* pSma); diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index b98f9bc866..30299e8792 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -43,13 +43,48 @@ int32_t smaCommit(SSma *pSma) { return tdProcessRSmaCommitImpl(pSma); } */ int32_t smaPostCommit(SSma *pSma) { return tdProcessRSmaPostCommitImpl(pSma); } +/** + * @brief set rsma trigger stat active + * + * @param pSma + * @return int32_t + */ +int32_t smaBegin(SSma *pSma) { + SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); + if (!pSmaEnv) { + return TSDB_CODE_SUCCESS; + } + + SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); + SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); + + int8_t rsmaTriggerStat = + atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED, TASK_TRIGGER_STAT_ACTIVE); + switch (rsmaTriggerStat) { + case TASK_TRIGGER_STAT_PAUSED: { + smaDebug("vgId:%d rsma trigger stat from paused to active", SMA_VID(pSma)); + break; + } + case TASK_TRIGGER_STAT_INIT: { + atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE); + smaDebug("vgId:%d rsma trigger stat from init to active", SMA_VID(pSma)); + break; + } + default: { + atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE); + smaWarn("vgId:%d rsma trigger stat %" PRIi8 " is unexpected", SMA_VID(pSma), rsmaTriggerStat); + ASSERT(0); + break; + } + } + return TSDB_CODE_SUCCESS; +} + /** * @brief pre-commit for rollup sma. * 1) set trigger stat of rsma timer TASK_TRIGGER_STAT_PAUSED. * 2) perform persist task for qTaskInfo * 3) wait all triggered fetch tasks finished - * 4) set trigger stat of rsma timer TASK_TRIGGER_STAT_ACTIVE. - * 5) finish * * @param pSma * @return int32_t @@ -63,10 +98,30 @@ static int32_t tdProcessRSmaPreCommitImpl(SSma *pSma) { SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); - // step 1 + + // step 1: set persistence task paused atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED); - // step 2 + // step 2: perform persist task for qTaskInfo + tdRSmaPersistExecImpl(pRSmaStat); + + // step 3: wait all triggered fetch tasks finished + int32_t nLoops = 0; + while (1) { + if (T_REF_VAL_GET(pStat) == 0) { + smaDebug("vgId:%d, rsma fetch tasks all finished", SMA_VID(pSma)); + break; + } else { + smaDebug("vgId:%d, rsma fetch tasks not all finished yet", SMA_VID(pSma)); + } + ++nLoops; + if (nLoops > 1000) { + sched_yield(); + nLoops = 0; + } + } + + smaDebug("vgId:%d, rsma pre commit succeess", SMA_VID(pSma)); return TSDB_CODE_SUCCESS; } @@ -103,48 +158,68 @@ static int32_t tdProcessRSmaPostCommitImpl(SSma *pSma) { TdDirPtr pDir = NULL; TdDirEntryPtr pDirEntry = NULL; char dir[TSDB_FILENAME_LEN]; - char bname[TSDB_FILENAME_LEN]; - const char *pattern = "^v[0-9]+qtaskinfo\\.ver([0-9]+)?$"; + const char *pattern = "v[0-9]+qtaskinfo\\.ver([0-9]+)?$"; regex_t regex; + int code = 0; - tdGetVndDirName(TD_VID(pVnode), VNODE_RSMA_DIR, dir); + tdGetVndDirName(TD_VID(pVnode), tfsGetPrimaryPath(pVnode->pTfs), VNODE_RSMA_DIR, true, dir); // Resource allocation and init - regcomp(®ex, pattern, REG_EXTENDED); - - if ((pDir = taosOpenDir(dir)) == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - smaWarn("rsma post-commit open dir %s failed since %s", dir, terrstr()); + if ((code = regcomp(®ex, pattern, REG_EXTENDED)) != 0) { + char errbuf[128]; + regerror(code, ®ex, errbuf, sizeof(errbuf)); + smaWarn("vgId:%d, rsma post commit, regcomp for %s failed since %s", TD_VID(pVnode), dir, errbuf); return TSDB_CODE_FAILED; } + if ((pDir = taosOpenDir(dir)) == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + smaWarn("vgId:%d, rsma post commit, open dir %s failed since %s", TD_VID(pVnode), dir, terrstr()); + return TSDB_CODE_FAILED; + } + + int32_t dirLen = strlen(dir); + char *dirEnd = POINTER_SHIFT(dir, dirLen); regmatch_t regMatch[2]; while ((pDirEntry = taosReadDir(pDir)) != NULL) { char *entryName = taosGetDirEntryName(pDirEntry); if (!entryName) { continue; } - char *fileName = taosDirEntryBaseName(entryName); - int code = regexec(®ex, bname, 2, regMatch, 0); + + code = regexec(®ex, entryName, 2, regMatch, 0); if (code == 0) { // match - printf("match 0 = %s\n", (char *)POINTER_SHIFT(fileName, regMatch[0].rm_so)); - printf("match 1 = %s\n", (char *)POINTER_SHIFT(fileName, regMatch[1].rm_so)); + int64_t version = -1; + sscanf((const char *)POINTER_SHIFT(entryName, regMatch[1].rm_so), "%" PRIi64, &version); + if ((version < committed) && (version > -1)) { + strncpy(dirEnd, entryName, TSDB_FILENAME_LEN - dirLen); + if (taosRemoveFile(dir) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + smaWarn("vgId:%d, committed version:%" PRIi64 ", failed to remove %s since %s", TD_VID(pVnode), committed, + dir, terrstr()); + } else { + smaDebug("vgId:%d, committed version:%" PRIi64 ", success to remove %s", TD_VID(pVnode), committed, dir); + } + } } else if (code == REG_NOMATCH) { // not match - smaInfo("rsma post-commit not match %s", fileName); + smaTrace("vgId:%d, rsma post commit, not match %s", TD_VID(pVnode), entryName); continue; } else { // has other error - terrno = TAOS_SYSTEM_ERROR(code); - smaWarn("rsma post-commit regexec failed since %s", terrstr()); + char errbuf[128]; + regerror(code, ®ex, errbuf, sizeof(errbuf)); + smaWarn("vgId:%d, rsma post commit, regexec failed since %s", TD_VID(pVnode), errbuf); taosCloseDir(&pDir); regfree(®ex); return TSDB_CODE_FAILED; } } + taosCloseDir(&pDir); + regfree(®ex); return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index 120d6612a2..c7b938f884 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -132,6 +132,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS if (smaType == TSDB_SMA_TYPE_ROLLUP) { SRSmaStat *pRSmaStat = (SRSmaStat *)(*pSmaStat); pRSmaStat->pSma = (SSma *)pSma; + atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INIT); // init smaMgmt smaMgmt.smaRef = taosOpenRef(SMA_MGMT_REF_NUM, tdDestroyRSmaStat); @@ -192,22 +193,20 @@ static void *tdFreeTSmaStat(STSmaStat *pStat) { static void tdDestroyRSmaStat(void *pRSmaStat) { if (pRSmaStat) { SRSmaStat *pStat = (SRSmaStat *)pRSmaStat; - smaDebug("vgId:%d %s:%d destroy rsma stat %p", SMA_VID(pStat->pSma), __func__, __LINE__, pRSmaStat); - // step 1: set persistence task cancelled + SSma *pSma = pStat->pSma; + smaDebug("vgId:%d, destroy rsma stat %p", SMA_VID(pSma), pRSmaStat); + // step 1: set rsma trigger stat cancelled atomic_store_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_CANCELLED); - // step 2: stop the persistence timer - taosTmrStopA(&RSMA_TMR_ID(pStat)); - - // step 3: wait the persistence thread to finish + // step 2: wait the persistence thread to finish int32_t nLoops = 0; if (atomic_load_8(RSMA_RUNNING_STAT(pStat)) == 1) { while (1) { if (atomic_load_8(RSMA_TRIGGER_STAT(pStat)) == TASK_TRIGGER_STAT_FINISHED) { - smaDebug("rsma, persist task finished already"); + smaDebug("vgId:%d, rsma persist task finished already", SMA_VID(pSma)); break; } else { - smaDebug("rsma, persist task not finished yet since rsma stat in %" PRIi8, + smaDebug("vgId:%d, rsma persist task not finished yet since rsma stat in %" PRIi8, SMA_VID(pSma), atomic_load_8(RSMA_TRIGGER_STAT(pStat))); } ++nLoops; @@ -218,13 +217,15 @@ static void tdDestroyRSmaStat(void *pRSmaStat) { } } - // step 4: destroy the rsma info and associated fetch tasks + // step 3: destroy the rsma info and associated fetch tasks // TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready. - void *infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), NULL); - while (infoHash) { - SRSmaInfo *pSmaInfo = *(SRSmaInfo **)infoHash; - tdFreeRSmaInfo(pSmaInfo); - infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), infoHash); + if (taosHashGetSize(RSMA_INFO_HASH(pStat)) > 0) { + void *infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), NULL); + while (infoHash) { + SRSmaInfo *pSmaInfo = *(SRSmaInfo **)infoHash; + tdFreeRSmaInfo(pSmaInfo); + infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), infoHash); + } } taosHashCleanup(RSMA_INFO_HASH(pStat)); @@ -232,10 +233,10 @@ static void tdDestroyRSmaStat(void *pRSmaStat) { nLoops = 0; while (1) { if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) { - smaDebug("rsma, all fetch task finished already"); + smaDebug("vgId:%d, rsma fetch tasks all finished", SMA_VID(pSma)); break; } else { - smaDebug("rsma, fetch tasks not all finished yet"); + smaDebug("vgId:%d, rsma fetch tasks not all finished yet", SMA_VID(pSma)); } ++nLoops; if (nLoops > 1000) { @@ -275,7 +276,7 @@ int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) { } else if (smaType == TSDB_SMA_TYPE_ROLLUP) { SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSmaStat); if (taosRemoveRef(smaMgmt.smaRef, RSMA_REF_ID(pRSmaStat)) < 0) { - smaError("remove refId from smaRef failed, refId:0x%" PRIx64, RSMA_REF_ID(pRSmaStat)); + smaError("remove refId from rsmaRef:0x%" PRIx64 " failed since %s", RSMA_REF_ID(pRSmaStat), terrstr()); } } else { ASSERT(0); diff --git a/source/dnode/vnode/src/sma/smaOpen.c b/source/dnode/vnode/src/sma/smaOpen.c index 641b8c7934..d73b03f4a2 100644 --- a/source/dnode/vnode/src/sma/smaOpen.c +++ b/source/dnode/vnode/src/sma/smaOpen.c @@ -135,17 +135,11 @@ _err: return -1; } -int32_t smaCloseEnv(SSma *pSma) { - if (pSma) { - SMA_TSMA_ENV(pSma) = tdFreeSmaEnv(SMA_TSMA_ENV(pSma)); - SMA_RSMA_ENV(pSma) = tdFreeSmaEnv(SMA_RSMA_ENV(pSma)); - } - return 0; -} - -int32_t smaCloseEx(SSma *pSma) { +int32_t smaClose(SSma *pSma) { if (pSma) { taosThreadMutexDestroy(&pSma->mutex); + SMA_TSMA_ENV(pSma) = tdFreeSmaEnv(SMA_TSMA_ENV(pSma)); + SMA_RSMA_ENV(pSma) = tdFreeSmaEnv(SMA_RSMA_ENV(pSma)); if SMA_RSMA_TSDB0 (pSma) tsdbClose(&SMA_RSMA_TSDB0(pSma)); if SMA_RSMA_TSDB1 (pSma) tsdbClose(&SMA_RSMA_TSDB1(pSma)); if SMA_RSMA_TSDB2 (pSma) tsdbClose(&SMA_RSMA_TSDB2(pSma)); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index ff9b17bf6e..9899440833 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -15,9 +15,8 @@ #include "sma.h" -#define RSMA_QTASKINFO_PERSIST_MS 7200000 -#define RSMA_QTASKINFO_BUFSIZE 32768 -#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid +#define RSMA_QTASKINFO_BUFSIZE 32768 +#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid SSmaMgmt smaMgmt = { .smaRef = -1, @@ -43,9 +42,9 @@ static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isF static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter); static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem); -static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma); -static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma); -static int32_t tdRSmaRestoreTSDataReload(SSma *pSma); +static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables); +static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed); +static int32_t tdRSmaRestoreTSDataReload(SSma *pSma, int64_t committed); struct SRSmaInfoItem { SRSmaInfo *pRsmaInfo; @@ -88,7 +87,7 @@ struct SRSmaQTaskInfoIter { }; static void tdRSmaQTaskInfoGetFName(int32_t vgId, int64_t version, char *outputName) { - tdGetVndFileName(vgId, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName); + tdGetVndFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName); } static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) { @@ -114,12 +113,14 @@ void *tdFreeRSmaInfo(SRSmaInfo *pInfo) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { SRSmaInfoItem *pItem = &pInfo->items[i]; if (pItem->taskInfo) { - smaDebug("vgId:%d, stb %" PRIi64 " stop fetch-timer %p level %d", SMA_VID(pSma), pInfo->suid, pItem->tmrId, - i + 1); - taosTmrStopA(&pItem->tmrId); + if (pItem->tmrId) { + smaDebug("vgId:%d, table %" PRIi64 " stop fetch timer %p level %d", SMA_VID(pSma), pInfo->suid, pItem->tmrId, + i + 1); + taosTmrStopA(&pItem->tmrId); + } tdFreeTaskHandle(&pItem->taskInfo, SMA_VID(pSma), i + 1); } else { - smaDebug("vgId:%d, stb %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", SMA_VID(pSma), + smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", SMA_VID(pSma), pInfo->suid, i + 1); } } @@ -358,13 +359,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con goto _err; } - smaDebug("vgId:%d, register rsma info succeed for suid:%" PRIi64, SMA_VID(pSma), suid); - - // start the persist timer - if (TASK_TRIGGER_STAT_INIT == - atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_INIT, TASK_TRIGGER_STAT_ACTIVE)) { - taosTmrStart(tdRSmaPersistTrigger, RSMA_QTASKINFO_PERSIST_MS, pStat, RSMA_TMR_HANDLE(pStat)); - } + smaDebug("vgId:%d, register rsma info succeed for table %" PRIi64, SMA_VID(pSma), suid); return TSDB_CODE_SUCCESS; _err: @@ -748,7 +743,7 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { return TSDB_CODE_SUCCESS; } -static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma) { +static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) { SVnode *pVnode = pSma->pVnode; SArray *suidList = taosArrayInit(1, sizeof(tb_uid_t)); @@ -758,7 +753,12 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma) { return TSDB_CODE_FAILED; } - int32_t arrSize = taosArrayGetSize(suidList); + int64_t arrSize = taosArrayGetSize(suidList); + + if (nTables) { + *nTables = arrSize; + } + if (arrSize == 0) { taosArrayDestroy(suidList); smaDebug("vgId:%d, no need to restore rsma env since empty stb id list", TD_VID(pVnode)); @@ -767,9 +767,9 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma) { SMetaReader mr = {0}; metaReaderInit(&mr, SMA_META(pSma), 0); - for (int32_t i = 0; i < arrSize; ++i) { + for (int64_t i = 0; i < arrSize; ++i) { tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i); - smaDebug("vgId:%d, rsma restore, suid[%d] is %" PRIi64, TD_VID(pVnode), i, suid); + smaDebug("vgId:%d, rsma restore, suid is %" PRIi64, TD_VID(pVnode), suid); if (metaGetTableEntryByUid(&mr, suid) < 0) { smaError("vgId:%d, rsma restore, failed to get table meta for %" PRIi64 " since %s", TD_VID(pVnode), suid, terrstr()); @@ -803,7 +803,7 @@ _err: return TSDB_CODE_FAILED; } -static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma) { +static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed) { SVnode *pVnode = pSma->pVnode; STFile tFile = {0}; char qTaskInfoFName[TSDB_FILENAME_LEN] = {0}; @@ -814,6 +814,13 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma) { } if (!taosCheckExistFile(TD_TFILE_FULL_NAME(&tFile))) { + if (pVnode->state.committed > 0) { + smaWarn("vgId:%d, rsma restore for version %" PRIi64 ", not start as %s not exist", TD_VID(pVnode), + pVnode->state.committed, TD_TFILE_FULL_NAME(&tFile)); + } else { + smaDebug("vgId:%d, rsma restore for version %" PRIi64 ", no need as %s not exist", TD_VID(pVnode), + pVnode->state.committed, TD_TFILE_FULL_NAME(&tFile)); + } return TSDB_CODE_SUCCESS; } @@ -839,9 +846,14 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma) { tdRSmaQTaskInfoIterDestroy(&fIter); tdCloseTFile(&tFile); tdDestroyTFile(&tFile); + + // restored successfully from committed + *committed = pVnode->state.committed; + return TSDB_CODE_SUCCESS; _err: - smaError("rsma restore, qtaskinfo reload failed since %s", terrstr()); + smaError("vgId:%d, rsma restore for version %" PRIi64 ", qtaskinfo reload failed since %s", TD_VID(pVnode), + pVnode->state.committed, terrstr()); return TSDB_CODE_FAILED; } @@ -849,35 +861,45 @@ _err: * @brief reload ts data from checkpoint * * @param pSma + * @param committed restore from committed version * @return int32_t */ -static int32_t tdRSmaRestoreTSDataReload(SSma *pSma) { +static int32_t tdRSmaRestoreTSDataReload(SSma *pSma, int64_t committed) { // TODO + smaDebug("vgId:%d, rsma restore from %" PRIi64 ", ts data reload success", SMA_VID(pSma), committed); return TSDB_CODE_SUCCESS; _err: - smaError("rsma restore, ts data reload failed since %s", terrstr()); + smaError("vgId:%d, rsma restore from %" PRIi64 ", ts data reload failed since %s", SMA_VID(pSma), committed, + terrstr()); return TSDB_CODE_FAILED; } int32_t tdProcessRSmaRestoreImpl(SSma *pSma) { // step 1: iterate all stables to restore the rsma env - if (tdRSmaRestoreQTaskInfoInit(pSma) < 0) { + int64_t nTables = 0; + if (tdRSmaRestoreQTaskInfoInit(pSma, &nTables) < 0) { goto _err; } + if (nTables <= 0) { + smaDebug("vgId:%d, no need to restore rsma task since no tables", SMA_VID(pSma)); + return TSDB_CODE_SUCCESS; + } + // step 2: retrieve qtaskinfo items from the persistence file(rsma/qtaskinfo) and restore - if (tdRSmaRestoreQTaskInfoReload(pSma) < 0) { + int64_t committed = -1; + if (tdRSmaRestoreQTaskInfoReload(pSma, &committed) < 0) { goto _err; } // step 3: reload ts data from checkpoint - if (tdRSmaRestoreTSDataReload(pSma) < 0) { + if (tdRSmaRestoreTSDataReload(pSma, committed) < 0) { goto _err; } return TSDB_CODE_SUCCESS; _err: - smaError("failed to restore rsma task since %s", terrstr()); + smaError("vgId:%d failed to restore rsma task since %s", SMA_VID(pSma), terrstr()); return TSDB_CODE_FAILED; } @@ -1017,7 +1039,8 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter) { pIter->qBuf = taosDecodeFixedI32(pIter->qBuf, &qTaskInfoLenWithHead); if (qTaskInfoLenWithHead < RSMA_QTASKINFO_HEAD_LEN) { terrno = TSDB_CODE_TDB_FILE_CORRUPTED; - smaError("restore rsma qtaskinfo file %s failed since %s", TD_TFILE_FULL_NAME(pIter->pTFile), terrstr()); + smaError("vgId:%d, restore rsma qtaskinfo file %s failed since %s", SMA_VID(pSma), + TD_TFILE_FULL_NAME(pIter->pTFile), terrstr()); return TSDB_CODE_FAILED; } @@ -1054,13 +1077,17 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter) { return TSDB_CODE_SUCCESS; } -static int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) { +int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) { SSma *pSma = pRSmaStat->pSma; SVnode *pVnode = pSma->pVnode; int32_t vid = SMA_VID(pSma); int64_t toffset = 0; bool isFileCreated = false; + if (taosHashGetSize(RSMA_INFO_HASH(pRSmaStat)) <= 0) { + return TSDB_CODE_SUCCESS; + } + void *infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL); if (!infoHash) { return TSDB_CODE_SUCCESS; @@ -1099,11 +1126,15 @@ static int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) { char qTaskInfoFName[TSDB_FILENAME_LEN]; tdRSmaQTaskInfoGetFName(vid, pSma->pVnode->state.applied, qTaskInfoFName); if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) { + smaError("vgId:%d, rsma persit, init %s failed since %s", vid, qTaskInfoFName, terrstr()); goto _err; } if (tdCreateTFile(&tFile, true, -1) < 0) { + smaError("vgId:%d, rsma persit, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr()); goto _err; } + smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d serialize qTaskInfo, file %s created", vid, pRSmaInfo->suid, + i + 1, TD_TFILE_FULL_NAME(&tFile)); isFileCreated = true; } @@ -1143,6 +1174,7 @@ static int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) { } return TSDB_CODE_SUCCESS; _err: + smaError("vgId:%d, rsma persit failed since %s", vid, terrstr()); if (isFileCreated) { tdRemoveTFile(&tFile); tdDestroyTFile(&tFile); @@ -1238,8 +1270,8 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) { // start persist task tdRSmaPersistTask(pRSmaStat); - taosTmrReset(tdRSmaPersistTrigger, RSMA_QTASKINFO_PERSIST_MS, pRSmaStat, pRSmaStat->tmrHandle, - &pRSmaStat->tmrId); + // taosTmrReset(tdRSmaPersistTrigger, 5000, pRSmaStat, pRSmaStat->tmrHandle, + // RSMA_TMR_ID(pRSmaStat)); } else { atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0); } diff --git a/source/dnode/vnode/src/sma/smaUtil.c b/source/dnode/vnode/src/sma/smaUtil.c index 9bea3ff468..14caf4144e 100644 --- a/source/dnode/vnode/src/sma/smaUtil.c +++ b/source/dnode/vnode/src/sma/smaUtil.c @@ -140,7 +140,7 @@ int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset) return -1; } -#if 1 +#if 0 smaDebug("append to file %s, offset:%" PRIi64 " nbyte:%" PRIi64 " fsize:%" PRIi64, TD_TFILE_FULL_NAME(pTFile), toffset, nbyte, toffset + nbyte); #endif @@ -181,16 +181,43 @@ void tdCloseTFile(STFile *pTFile) { void tdDestroyTFile(STFile *pTFile) { taosMemoryFreeClear(TD_TFILE_FULL_NAME(pTFile)); } -void tdGetVndFileName(int32_t vgId, const char *dname, const char *fname, int64_t version, char *outputName) { +void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t version, + char *outputName) { if (version < 0) { - snprintf(outputName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/v%d%s", vgId, dname, vgId, fname); + if (pdname) { + snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%sv%d%s", pdname, TD_DIRSEP, TD_DIRSEP, vgId, + TD_DIRSEP, dname, TD_DIRSEP, vgId, fname); + } else { + snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%sv%d%s", TD_DIRSEP, vgId, TD_DIRSEP, dname, TD_DIRSEP, + vgId, fname); + } } else { - snprintf(outputName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/v%d%s%" PRIi64, vgId, dname, vgId, fname, version); + if (pdname) { + snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%sv%d%s%" PRIi64, pdname, TD_DIRSEP, TD_DIRSEP, + vgId, TD_DIRSEP, dname, TD_DIRSEP, vgId, fname, version); + } else { + snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%sv%d%s%" PRIi64, TD_DIRSEP, vgId, TD_DIRSEP, dname, + TD_DIRSEP, vgId, fname, version); + } } } -void tdGetVndDirName(int32_t vgId, const char *dname, char *outputName) { - snprintf(outputName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s", vgId, dname); +void tdGetVndDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName) { + if (pdname) { + if (endWithSep) { + snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%s", pdname, TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP, + dname, TD_DIRSEP); + } else { + snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s", pdname, TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP, + dname); + } + } else { + if (endWithSep) { + snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%s", TD_DIRSEP, vgId, TD_DIRSEP, dname, TD_DIRSEP); + } else { + snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s", TD_DIRSEP, vgId, TD_DIRSEP, dname); + } + } } int32_t tdInitTFile(STFile *pTFile, const char *dname, const char *fname) { @@ -215,35 +242,36 @@ int32_t tdInitTFile(STFile *pTFile, const char *dname, const char *fname) { int32_t tdCreateTFile(STFile *pTFile, bool updateHeader, int8_t fType) { ASSERT(pTFile->info.fsize == 0 && pTFile->info.magic == TD_FILE_INIT_MAGIC); - pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pTFile->pFile == NULL) { if (errno == ENOENT) { // Try to create directory recursively - if (taosMulMkDir(taosDirName(TD_TFILE_FULL_NAME(pTFile))) != 0) { + char *s = strdup(TD_TFILE_FULL_NAME(pTFile)); + if (taosMulMkDir(taosDirName(s)) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + taosMemoryFree(s); + return -1; + } + taosMemoryFree(s); + pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); + if (pTFile->pFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; - } else { - pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); - if (pTFile->pFile == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } } } + } - if (!updateHeader) { - return 0; - } + if (!updateHeader) { + return 0; + } - pTFile->info.fsize += TD_FILE_HEAD_SIZE; - pTFile->info.fver = 0; + pTFile->info.fsize += TD_FILE_HEAD_SIZE; + pTFile->info.fver = 0; - if (tdUpdateTFileHeader(pTFile) < 0) { - tdCloseTFile(pTFile); - tdRemoveTFile(pTFile); - return -1; - } + if (tdUpdateTFileHeader(pTFile) < 0) { + tdCloseTFile(pTFile); + tdRemoveTFile(pTFile); + return -1; } return 0; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 7c4a7d520d..5ce3cfab45 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -406,193 +406,6 @@ OVER: return code; } -#if 0 -int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { - SMqPollReq* pReq = pMsg->pCont; - int64_t consumerId = pReq->consumerId; - int64_t timeout = pReq->timeout; - int32_t reqEpoch = pReq->epoch; - int64_t fetchOffset; - int32_t code = 0; - - // get offset to fetch message - if (pReq->currentOffset >= 0) { - fetchOffset = pReq->currentOffset + 1; - } else { - STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pReq->subKey); - if (pOffset != NULL) { - ASSERT(pOffset->val.type == TMQ_OFFSET__LOG); - tqDebug("consumer %ld, restore offset of %s on vg %d, offset(type:log) version: %ld", consumerId, pReq->subKey, - TD_VID(pTq->pVnode), pOffset->val.version); - fetchOffset = pOffset->val.version + 1; - } else { - if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) { - fetchOffset = walGetFirstVer(pTq->pWal); - } else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) { - fetchOffset = walGetCommittedVer(pTq->pWal); - } else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__NONE) { - tqError("tmq poll: no offset committed for consumer %ld in vg %d, subkey %s", consumerId, TD_VID(pTq->pVnode), - pReq->subKey); - terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; - return -1; - } - tqDebug("consumer %ld, restore offset of %s on vg %d failed, config is %ld, set to %ld", consumerId, pReq->subKey, - TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset); - } - } - - tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req offset %ld fetch offset %ld", consumerId, - pReq->epoch, TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset); - - STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey)); - /*ASSERT(pHandle);*/ - if (pHandle == NULL) { - tqError("tmq poll: no consumer handle for consumer %ld in vg %d, subkey %s", consumerId, TD_VID(pTq->pVnode), - pReq->subKey); - return -1; - } - - if (pHandle->consumerId != consumerId) { - tqError("tmq poll: consumer handle mismatch for consumer %ld in vg %d, subkey %s, handle consumer id %ld", - consumerId, TD_VID(pTq->pVnode), pReq->subKey, pHandle->consumerId); - return -1; - } - - int32_t consumerEpoch = atomic_load_32(&pHandle->epoch); - while (consumerEpoch < reqEpoch) { - consumerEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, consumerEpoch, reqEpoch); - } - - SMqDataBlkRsp rsp = {0}; - rsp.reqOffset = pReq->currentOffset; - - rsp.blockData = taosArrayInit(0, sizeof(void*)); - rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t)); - - if (rsp.blockData == NULL || rsp.blockDataLen == NULL) { - return -1; - } - - rsp.withTbName = pReq->withTbName; - if (rsp.withTbName) { - rsp.blockTbName = taosArrayInit(0, sizeof(void*)); - } - - if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - rsp.withSchema = false; - } else { - rsp.withSchema = true; - rsp.blockSchema = taosArrayInit(0, sizeof(void*)); - } - -#if 1 - if (pReq->useSnapshot) { - // TODO set ver into snapshot - int64_t lastVer = walGetCommittedVer(pTq->pWal); - if (rsp.reqOffset < lastVer) { - tqInfo("retrieve using snapshot req offset %ld last ver %ld", rsp.reqOffset, lastVer); - tqScanSnapshot(pTq, &pHandle->execHandle, &rsp, workerId); - - if (rsp.blockNum != 0) { - rsp.withTbName = false; - rsp.rspOffset = lastVer; - tqInfo("direct send by snapshot req offset %ld rsp offset %ld", rsp.reqOffset, rsp.rspOffset); - fetchOffset = lastVer; - goto SEND_RSP; - } - } - } -#endif - - SWalHead* pHeadWithCkSum = taosMemoryMalloc(sizeof(SWalHead) + 2048); - if (pHeadWithCkSum == NULL) { - return -1; - } - - walSetReaderCapacity(pHandle->pWalReader, 2048); - - while (1) { - consumerEpoch = atomic_load_32(&pHandle->epoch); - if (consumerEpoch > reqEpoch) { - tqWarn("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d, discard req epoch %d", - consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch); - break; - } - - if (tqFetchLog(pTq, pHandle, &fetchOffset, &pHeadWithCkSum) < 0) { - // TODO add push mgr - break; - } - - SWalCont* pHead = &pHeadWithCkSum->head; - - tqDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, - TD_VID(pTq->pVnode), fetchOffset, pHead->msgType); - - if (pHead->msgType == TDMT_VND_SUBMIT) { - SSubmitReq* pCont = (SSubmitReq*)&pHead->body; - - if (tqDataExec(pTq, &pHandle->execHandle, pCont, &rsp, workerId) < 0) { - /*ASSERT(0);*/ - } - } else { - ASSERT(pHandle->fetchMeta); - ASSERT(IS_META_MSG(pHead->msgType)); - tqInfo("fetch meta msg, ver: %ld, type: %d", pHead->version, pHead->msgType); - SMqMetaRsp metaRsp = {0}; - metaRsp.reqOffset = pReq->currentOffset; - metaRsp.rspOffset = fetchOffset; - metaRsp.resMsgType = pHead->msgType; - metaRsp.metaRspLen = pHead->bodyLen; - metaRsp.metaRsp = pHead->body; - if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) { - code = -1; - goto OVER; - } - code = 0; - goto OVER; - } - - // TODO batch optimization: - // TODO continue scan until meeting batch requirement - if (rsp.blockNum > 0 /* threshold */) { - break; - } else { - fetchOffset++; - } - } - - taosMemoryFree(pHeadWithCkSum); - -SEND_RSP: - ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum); - ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum); - if (rsp.withSchema) { - ASSERT(taosArrayGetSize(rsp.blockSchema) == rsp.blockNum); - } - - rsp.rspOffset = fetchOffset; - - if (tqSendDataRsp(pTq, pMsg, pReq, &rsp) < 0) { - code = -1; - } -OVER: - // TODO wrap in destroy func - taosArrayDestroy(rsp.blockDataLen); - taosArrayDestroyP(rsp.blockData, (FDelete)taosMemoryFree); - - if (rsp.withSchema) { - taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); - } - - if (rsp.withTbName) { - taosArrayDestroyP(rsp.blockTbName, (FDelete)taosMemoryFree); - } - - return code; -} -#endif - int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) { SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index a12e11b398..40112a1ee8 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -69,6 +69,9 @@ int vnodeBegin(SVnode *pVnode) { } } + // begin sma + smaBegin(pVnode->pSma); // TODO: refactor to include the rsma1/rsma2 tsdbBegin() after tsdb_refact branch merged + return 0; } @@ -230,8 +233,8 @@ int vnodeCommit(SVnode *pVnode) { } // preCommit - // TODO - + smaPreCommit(pVnode->pSma); + // commit each sub-system if (metaCommit(pVnode->pMeta) < 0) { ASSERT(0); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 57d7386667..0c654bee1f 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -152,12 +152,11 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { return pVnode; _err: - if (pVnode->pSma) smaCloseEnv(pVnode->pSma); if (pVnode->pQuery) vnodeQueryClose(pVnode); if (pVnode->pTq) tqClose(pVnode->pTq); if (pVnode->pWal) walClose(pVnode->pWal); if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb); - if (pVnode->pSma) smaCloseEx(pVnode->pSma); + if (pVnode->pSma) smaClose(pVnode->pSma); if (pVnode->pMeta) metaClose(pVnode->pMeta); tsem_destroy(&(pVnode->canCommit)); @@ -167,14 +166,13 @@ _err: void vnodeClose(SVnode *pVnode) { if (pVnode) { - smaCloseEnv(pVnode->pSma); vnodeCommit(pVnode); vnodeSyncClose(pVnode); vnodeQueryClose(pVnode); walClose(pVnode->pWal); tqClose(pVnode->pTq); if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb); - smaCloseEx(pVnode->pSma); + smaClose(pVnode->pSma); metaClose(pVnode->pMeta); vnodeCloseBufPool(pVnode); // destroy handle diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 8850ac1c7a..20e507b040 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1422,7 +1422,7 @@ void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t return; } #ifdef BUF_PAGE_DEBUG - qDebug("page_setbuf, groupId:%"PRIu64, groupId); + qDebug("page_setbuf, groupId:%" PRIu64, groupId); #endif doSetTableGroupOutputBuf(pOperator, pAggInfo, numOfOutput, groupId); @@ -1570,9 +1570,9 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI releaseBufPage(pBuf, page); pBlock->info.rows += pRow->numOfRows; -// if (pBlock->info.rows >= pBlock->info.capacity) { // output buffer is full -// break; -// } + // if (pBlock->info.rows >= pBlock->info.capacity) { // output buffer is full + // break; + // } } qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows, @@ -2868,7 +2868,24 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) { pInfo->cond.twindows[0].skey = oldSkey; pInfo->scanTimes = 0; pInfo->curTWinIdx = 0; + + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList); + bool found = false; + for (int32_t i = 0; i < tableSz; i++) { + STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i); + if (pTableInfo->uid == uid) { + found = true; + pInfo->currentTable = i; + } + } + // TODO after processing drop, + ASSERT(found); + qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts, + pInfo->currentTable, tableSz); } + return TSDB_CODE_SUCCESS; } else { @@ -4107,8 +4124,8 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, } else { isNull[index++] = 0; char* data = nodesGetValueFromNode(pValue); - if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON){ - if(tTagIsJson(data)){ + if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON) { + if (tTagIsJson(data)) { terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR; taosMemoryFree(keyBuf); nodesClearList(groupNew); @@ -4173,7 +4190,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) { STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId); - if(code){ + if (code) { pTaskInfo->code = code; return NULL; } @@ -4202,7 +4219,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo }; if (pHandle) { int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId); - if(code){ + if (code) { pTaskInfo->code = code; return NULL; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7b027a0570..061b4ab3c5 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1329,6 +1329,13 @@ static void destroySysScanOperator(void* param, int32_t numOfOutput) { taosArrayDestroy(pInfo->scanCols); } +static int32_t getSysTableDbNameColId(const char* pTable) { + // if (0 == strcmp(TSDB_INS_TABLE_USER_INDEXES, pTable)) { + // return 1; + // } + return TSDB_INS_USER_STABLES_DBNAME_COLID; +} + EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) { int32_t code = TSDB_CODE_SUCCESS; ENodeType nType = nodeType(pNode); @@ -1350,7 +1357,7 @@ EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) { } SColumnNode* node = (SColumnNode*)pNode; - if (TSDB_INS_USER_STABLES_DBNAME_COLID == node->colId) { + if (getSysTableDbNameColId(node->tableName) == node->colId) { *(int32_t*)pContext = 2; return DEAL_RES_CONTINUE; } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index a239e6bbcb..c140de24d8 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -778,7 +778,6 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) { *ignore = true; destroyCmsg(pMsg); return NULL; - // assert(0); } else { conn = exh->handle; transReleaseExHandle(refId); @@ -812,7 +811,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr); - // transPrintEpSet(&pCtx->epSet); bool ignore = false; SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore); if (ignore == true) { diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 9738fad24d..2b33067381 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -9,8 +9,8 @@ ## ---- db ./test.sh -f tsim/db/alter_option.sim -./test.sh -f tsim/db/alter_replica_13.sim -./test.sh -f tsim/db/alter_replica_31.sim +# ./test.sh -f tsim/db/alter_replica_13.sim +# ./test.sh -f tsim/db/alter_replica_31.sim ./test.sh -f tsim/db/basic1.sim ./test.sh -f tsim/db/basic2.sim ./test.sh -f tsim/db/basic3.sim @@ -24,26 +24,26 @@ ./test.sh -f tsim/db/taosdlog.sim # ---- dnode -./test.sh -f tsim/dnode/balance_replica1.sim -./test.sh -f tsim/dnode/balance_replica3.sim -./test.sh -f tsim/dnode/balance1.sim -./test.sh -f tsim/dnode/balance2.sim -./test.sh -f tsim/dnode/balance3.sim -./test.sh -f tsim/dnode/balancex.sim +# ./test.sh -f tsim/dnode/balance_replica1.sim +# ./test.sh -f tsim/dnode/balance_replica3.sim +# ./test.sh -f tsim/dnode/balance1.sim +# ./test.sh -f tsim/dnode/balance2.sim +# ./test.sh -f tsim/dnode/balance3.sim +# ./test.sh -f tsim/dnode/balancex.sim ./test.sh -f tsim/dnode/create_dnode.sim ./test.sh -f tsim/dnode/drop_dnode_has_mnode.sim -./test.sh -f tsim/dnode/drop_dnode_has_qnode_snode.sim -./test.sh -f tsim/dnode/drop_dnode_has_vnode_replica1.sim -./test.sh -f tsim/dnode/drop_dnode_has_vnode_replica3.sim -./test.sh -f tsim/dnode/drop_dnode_has_multi_vnode_replica1.sim -./test.sh -f tsim/dnode/drop_dnode_has_multi_vnode_replica3.sim +# ./test.sh -f tsim/dnode/drop_dnode_has_qnode_snode.sim +# ./test.sh -f tsim/dnode/drop_dnode_has_vnode_replica1.sim +# ./test.sh -f tsim/dnode/drop_dnode_has_vnode_replica3.sim +# ./test.sh -f tsim/dnode/drop_dnode_has_multi_vnode_replica1.sim +# ./test.sh -f tsim/dnode/drop_dnode_has_multi_vnode_replica3.sim ./test.sh -f tsim/dnode/offline_reason.sim -./test.sh -f tsim/dnode/redistribute_vgroup_replica1.sim -./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim -./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim -./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v2.sim -./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v3.sim -./test.sh -f tsim/dnode/vnode_clean.sim +# ./test.sh -f tsim/dnode/redistribute_vgroup_replica1.sim +# ./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim +# ./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim +# ./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v2.sim +# ./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v3.sim +# ./test.sh -f tsim/dnode/vnode_clean.sim # ---- insert ./test.sh -f tsim/insert/basic0.sim @@ -71,7 +71,7 @@ ./test.sh -f tsim/qnode/basic1.sim # ---- snode -./test.sh -f tsim/snode/basic1.sim +# ./test.sh -f tsim/snode/basic1.sim # ---- bnode ./test.sh -f tsim/bnode/basic1.sim @@ -104,7 +104,7 @@ # ./test.sh -f tsim/stream/triggerSession0.sim ./test.sh -f tsim/stream/partitionby.sim ./test.sh -f tsim/stream/partitionby1.sim -./test.sh -f tsim/stream/schedSnode.sim +# ./test.sh -f tsim/stream/schedSnode.sim ./test.sh -f tsim/stream/windowClose.sim ./test.sh -f tsim/stream/ignoreExpiredData.sim @@ -169,13 +169,13 @@ ./test.sh -f tsim/valgrind/checkError.sim -v # --- vnode -#./test.sh -f tsim/vnode/replica3_basic.sim -#./test.sh -f tsim/vnode/replica3_repeat.sim -./test.sh -f tsim/vnode/replica3_vgroup.sim -#./test.sh -f tsim/vnode/replica3_many.sim -#./test.sh -f tsim/vnode/replica3_import.sim -./test.sh -f tsim/vnode/stable_balance_replica1.sim -./test.sh -f tsim/vnode/stable_dnode2_stop.sim +# ./test.sh -f tsim/vnode/replica3_basic.sim +# ./test.sh -f tsim/vnode/replica3_repeat.sim +# ./test.sh -f tsim/vnode/replica3_vgroup.sim +# ./test.sh -f tsim/vnode/replica3_many.sim +# ./test.sh -f tsim/vnode/replica3_import.sim +# ./test.sh -f tsim/vnode/stable_balance_replica1.sim +# ./test.sh -f tsim/vnode/stable_dnode2_stop.sim ./test.sh -f tsim/vnode/stable_dnode2.sim ./test.sh -f tsim/vnode/stable_dnode3.sim ./test.sh -f tsim/vnode/stable_replica3_dnode6.sim diff --git a/tests/script/tsim/sma/drop_sma.sim b/tests/script/tsim/sma/drop_sma.sim index 73ada2de56..78f86f6e19 100644 --- a/tests/script/tsim/sma/drop_sma.sim +++ b/tests/script/tsim/sma/drop_sma.sim @@ -50,6 +50,21 @@ sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) t print --> create sma sql create sma index sma_index_name1 on stb function(max(c1),max(c2),min(c1)) interval(6m,10s) sliding(6m); +print --> show sma +sql show indexes from stb from d1; +if $rows != 1 then + return -1 +endi +if $data[0][0] != sma_index_name1 then + return -1 +endi +if $data[0][1] != d1 then + return -1 +endi +if $data[0][2] != stb then + return -1 +endi + print --> drop stb sql drop table stb; @@ -61,6 +76,21 @@ sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) t print --> create sma sql create sma index sma_index_name1 on stb function(max(c1),max(c2),min(c1)) interval(6m,10s) sliding(6m); +print --> show sma +sql show indexes from stb from d1; +if $rows != 1 then + return -1 +endi +if $data[0][0] != sma_index_name1 then + return -1 +endi +if $data[0][1] != d1 then + return -1 +endi +if $data[0][2] != stb then + return -1 +endi + print --> drop stb sql drop table stb; diff --git a/tests/system-test/1-insert/alter_stable.py b/tests/system-test/1-insert/alter_stable.py index cd64e3ddfe..a4cec78138 100644 --- a/tests/system-test/1-insert/alter_stable.py +++ b/tests/system-test/1-insert/alter_stable.py @@ -16,135 +16,150 @@ import string from util.log import * from util.cases import * from util.sql import * - +from util.sqlset import * +from util import constant +from util.common import * class TDTestCase: def init(self, conn, logSql): tdLog.debug("start to execute %s" % __file__) tdSql.init(conn.cursor()) + self.setsql = TDSetSql() + self.ntbname = 'ntb' + self.stbname = 'stb' + self.binary_length = 20 # the length of binary for column_dict + self.nchar_length = 20 # the length of nchar for column_dict + self.column_dict = { + 'ts' : 'timestamp', + 'col1': 'tinyint', + 'col2': 'smallint', + 'col3': 'int', + 'col4': 'bigint', + 'col5': 'tinyint unsigned', + 'col6': 'smallint unsigned', + 'col7': 'int unsigned', + 'col8': 'bigint unsigned', + 'col9': 'float', + 'col10': 'double', + 'col11': 'bool', + 'col12': f'binary({self.binary_length})', + 'col13': f'nchar({self.nchar_length})' + } + self.tag_dict = { + 'ts_tag' : 'timestamp', + 't1': 'tinyint', + 't2': 'smallint', + 't3': 'int', + 't4': 'bigint', + 't5': 'tinyint unsigned', + 't6': 'smallint unsigned', + 't7': 'int unsigned', + 't8': 'bigint unsigned', + 't9': 'float', + 't10': 'double', + 't11': 'bool', + 't12': f'binary({self.binary_length})', + 't13': f'nchar({self.nchar_length})' + } + self.tag_list = [ + f'now,1,2,3,4,5,6,7,8,9.9,10.1,true,"abcd","涛思数据"' + ] + self.tbnum = 1 + self.values_list = [ + f'now,1,2,3,4,5,6,7,8,9.9,10.1,true,"abcd","涛思数据"' + ] + self.column_add_dict = { + 'col_time' : 'timestamp', + 'col_tinyint' : 'tinyint', + 'col_smallint' : 'smallint', + 'col_int' : 'int', + 'col_bigint' : 'bigint', + 'col_untinyint' : 'tinyint unsigned', + 'col_smallint' : 'smallint unsigned', + 'col_int' : 'int unsigned', + 'col_bigint' : 'bigint unsigned', + 'col_bool' : 'bool', + 'col_float' : 'float', + 'col_double' : 'double', + 'col_binary' : f'binary({constant.BINARY_LENGTH_MAX})', + 'col_nchar' : f'nchar({constant.NCAHR_LENGTH_MAX})' - def get_long_name(self, length, mode="mixed"): - """ - generate long name - mode could be numbers/letters/letters_mixed/mixed - """ - if mode == "numbers": - population = string.digits - elif mode == "letters": - population = string.ascii_letters.lower() - elif mode == "letters_mixed": - population = string.ascii_letters.upper() + string.ascii_letters.lower() - else: - population = string.ascii_letters.lower() + string.digits - return "".join(random.choices(population, k=length)) - def alter_stable_column_check(self,dbname,stbname,tbname): - tdSql.execute(f'create database if not exists {dbname}') - tdSql.execute(f'use {dbname}') - tdSql.execute( - f'create stable {stbname} (ts timestamp, c1 tinyint, c2 smallint, c3 int, \ - c4 bigint, c5 tinyint unsigned, c6 smallint unsigned, c7 int unsigned, c8 bigint unsigned, c9 float, c10 double, c11 bool,c12 binary(20),c13 nchar(20)) tags(t0 int) ') - tdSql.execute(f'create table {tbname} using {stbname} tags(1)') - tdSql.execute(f'insert into {tbname} values (now,1,2,3,4,5,6,7,8,9.9,10.1,true,"abcd","涛思数据")') - tdSql.execute(f'alter stable {stbname} add column c14 int') - tdSql.query(f'select c14 from {stbname}') - tdSql.checkRows(1) - tdSql.execute(f'alter stable {stbname} add column `c15` int') - tdSql.query(f'select c15 from {stbname}') - tdSql.checkRows(1) - tdSql.query(f'describe {stbname}') - tdSql.checkRows(17) - tdSql.execute(f'alter stable {stbname} drop column c14') - tdSql.query(f'describe {stbname}') - tdSql.checkRows(16) - tdSql.execute(f'alter stable {stbname} drop column `c15`') - tdSql.query(f'describe {stbname}') - tdSql.checkRows(15) - tdSql.execute(f'alter stable {stbname} modify column c12 binary(30)') - tdSql.query(f'describe {stbname}') - tdSql.checkData(12,2,30) - tdSql.execute(f'alter stable {stbname} modify column `c12` binary(35)') - tdSql.query(f'describe {stbname}') - tdSql.checkData(12,2,35) - tdSql.error(f'alter stable {stbname} modify column `c12` binary(34)') - tdSql.execute(f'alter stable {stbname} modify column c13 nchar(30)') - tdSql.query(f'describe {stbname}') - tdSql.checkData(13,2,30) - tdSql.error(f'alter stable {stbname} modify column c13 nchar(29)') - tdSql.error(f'alter stable {stbname} rename column c1 c21') - tdSql.error(f'alter stable {stbname} modify column c1 int') - tdSql.error(f'alter stable {stbname} modify column c4 int') - tdSql.error(f'alter stable {stbname} modify column c8 int') - tdSql.error(f'alter stable {stbname} modify column c1 unsigned int') - tdSql.error(f'alter stable {stbname} modify column c9 double') - tdSql.error(f'alter stable {stbname} modify column c10 float') - tdSql.error(f'alter stable {stbname} modify column c11 int') - tdSql.error(f'alter stable {stbname} drop tag t0') - tdSql.execute(f'drop database {dbname}') - - def alter_stable_tag_check(self,dbname,stbname,tbname): - tdSql.execute(f'create database if not exists {dbname}') - tdSql.execute(f'use {dbname}') - tdSql.execute( - f'create stable {stbname} (ts timestamp, c1 int) tags(ts_tag timestamp, t1 tinyint, t2 smallint, t3 int, \ - t4 bigint, t5 tinyint unsigned, t6 smallint unsigned, t7 int unsigned, t8 bigint unsigned, t9 float, t10 double, t11 bool,t12 binary(20),t13 nchar(20)) ') - tdSql.execute(f'create table {tbname} using {stbname} tags(now,1,2,3,4,5,6,7,8,9.9,10.1,true,"abcd","涛思数据")') - tdSql.execute(f'insert into {tbname} values(now,1)') - - tdSql.execute(f'alter stable {stbname} add tag t14 int') - tdSql.query(f'select t14 from {stbname}') - tdSql.checkRows(1) - tdSql.execute(f'alter stable {stbname} add tag `t15` int') - tdSql.query(f'select t14 from {stbname}') - tdSql.checkRows(1) - tdSql.query(f'describe {stbname}') - tdSql.checkRows(18) - tdSql.execute(f'alter stable {stbname} drop tag t14') - tdSql.query(f'describe {stbname}') - tdSql.checkRows(17) - tdSql.execute(f'alter stable {stbname} drop tag `t15`') - tdSql.query(f'describe {stbname}') - tdSql.checkRows(16) - tdSql.execute(f'alter stable {stbname} modify tag t12 binary(30)') - tdSql.query(f'describe {stbname}') - tdSql.checkData(14,2,30) - tdSql.execute(f'alter stable {stbname} modify tag `t12` binary(35)') - tdSql.query(f'describe {stbname}') - tdSql.checkData(14,2,35) - tdSql.error(f'alter stable {stbname} modify tag `t12` binary(34)') - tdSql.execute(f'alter stable {stbname} modify tag t13 nchar(30)') - tdSql.query(f'describe {stbname}') - tdSql.checkData(15,2,30) - tdSql.error(f'alter stable {stbname} modify tag t13 nchar(29)') - tdSql.execute(f'alter table {stbname} rename tag t1 t21') - tdSql.query(f'describe {stbname}') - tdSql.checkData(3,0,'t21') - tdSql.execute(f'alter table {stbname} rename tag `t21` t1') - tdSql.query(f'describe {stbname}') - tdSql.checkData(3,0,'t1') - - for i in ['bigint','unsigned int','float','double','binary(10)','nchar(10)']: - for j in [1,2,3]: - tdSql.error(f'alter stable {stbname} modify tag t{j} {i}') - for i in ['int','unsigned int','float','binary(10)','nchar(10)']: - tdSql.error(f'alter stable {stbname} modify tag t8 {i}') - tdSql.error(f'alter stable {stbname} modify tag t4 int') - tdSql.error(f'alter stable {stbname} drop column t0') - #!bug TD-16410 - # tdSql.error(f'alter stable {tbname} set tag t1=100 ') - # tdSql.execute(f'create table ntb (ts timestamp,c0 int)') - tdSql.error(f'alter stable ntb add column c2 ') - tdSql.execute(f'drop database {dbname}') + } + def alter_stable_check(self): + tdSql.prepare() + tdSql.execute(self.setsql.set_create_stable_sql(self.stbname,self.column_dict,self.tag_dict)) + tdSql.execute(self.setsql.set_create_normaltable_sql(self.ntbname,self.column_dict)) + for i in self.values_list: + tdSql.execute(f'insert into {self.ntbname} values({i})') + for i in range(self.tbnum): + tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({self.tag_list[i]})') + for j in self.values_list: + tdSql.execute(f'insert into {self.stbname}_{i} values({j})') + for key,values in self.column_add_dict.items(): + tdSql.execute(f'alter stable {self.stbname} add column {key} {values}') + tdSql.query(f'describe {self.stbname}') + tdSql.checkRows(len(self.column_dict)+len(self.tag_dict)+1) + for i in range(self.tbnum): + tdSql.query(f'describe {self.stbname}_{i}') + tdSql.checkRows(len(self.column_dict)+len(self.tag_dict)+1) + tdSql.query(f'select {key} from {self.stbname}_{i}') + tdSql.checkRows(len(self.values_list)) + for i in range(self.tbnum): + tdSql.error(f'alter stable {self.stbname}_{i} add column {key} {values}') + tdSql.error(f'alter stable {self.stbname}_{i} drop column {key}') + #! bug TD-16921 + #tdSql.error(f'alter stable {self.ntbname} add column {key} {values}') + #tdSql.error(f'alter stable {self.ntbname} drop column {key}') + tdSql.execute(f'alter stable {self.stbname} drop column {key}') + tdSql.query(f'describe {self.stbname}') + tdSql.checkRows(len(self.column_dict)+len(self.tag_dict)) + for i in range(self.tbnum): + tdSql.query(f'describe {self.stbname}_{i}') + tdSql.checkRows(len(self.column_dict)+len(self.tag_dict)) + tdSql.error(f'select {key} from {self.stbname} ') + for key,values in self.column_dict.items(): + if 'binary' in values.lower(): + v = f'binary({self.binary_length+1})' + v_error = f'binary({self.binary_length-1})' + tdSql.error(f'alter stable {self.stbname} modify column {key} {v_error}') + tdSql.execute(f'alter stable {self.stbname} modify column {key} {v}') + tdSql.query(f'describe {self.stbname}') + result = tdCom.getOneRow(1,'VARCHAR') + tdSql.checkEqual(result[0][2],self.binary_length+1) + for i in range(self.tbnum): + tdSql.query(f'describe {self.stbname}_{i}') + result = tdCom.getOneRow(1,'VARCHAR') + tdSql.checkEqual(result[0][2],self.binary_length+1) + tdSql.error(f'alter stable {self.stbname}_{i} modify column {key} {v}') + #! bug TD-16921 + # tdSql.error(f'alter stable {self.ntbname} modify column {key} {v}') + elif 'nchar' in values.lower(): + v = f'nchar({self.binary_length+1})' + v_error = f'nchar({self.binary_length-1})' + tdSql.error(f'alter stable {self.stbname} modify column {key} {v_error}') + tdSql.execute(f'alter stable {self.stbname} modify column {key} {v}') + tdSql.query(f'describe {self.stbname}') + result = tdCom.getOneRow(1,'NCHAR') + tdSql.checkEqual(result[0][2],self.binary_length+1) + for i in range(self.tbnum): + tdSql.query(f'describe {self.stbname}_{i}') + result = tdCom.getOneRow(1,'NCHAR') + tdSql.checkEqual(result[0][2],self.binary_length+1) + tdSql.error(f'alter stable {self.stbname}_{i} modify column {key} {v}') + #! bug TD-16921 + #tdSql.error(f'alter stable {self.ntbname} modify column {key} {v}') + else: + for v in self.column_dict.values(): + tdSql.error(f'alter stable {self.stbname} modify column {key} {v}') + # tdSql.error(f'alter stable {self.ntbname} modify column {key} {v}') + for i in range(self.tbnum): + tdSql.error(f'alter stable {self.stbname}_{i} modify column {key} {v}') def run(self): - dbname = self.get_long_name(length=10, mode="letters") - stbname = self.get_long_name(length=5, mode="letters") - tbname = self.get_long_name(length=5, mode="letters") - self.alter_stable_column_check(dbname,stbname,tbname) - self.alter_stable_tag_check(dbname,stbname,tbname) - + self.alter_stable_check() def stop(self): tdSql.close() tdLog.success("%s successfully executed" % __file__) tdCases.addWindows(__file__, TDTestCase()) -tdCases.addLinux(__file__, TDTestCase()) \ No newline at end of file +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/system-test/1-insert/alter_table.py b/tests/system-test/1-insert/alter_table.py index 0f7a830634..a2613c39e7 100644 --- a/tests/system-test/1-insert/alter_table.py +++ b/tests/system-test/1-insert/alter_table.py @@ -126,6 +126,7 @@ class TDTestCase: tdSql.execute(f'alter table {self.ntbname} rename column {key} {rename_str}') tdSql.query(f'select {rename_str} from {self.ntbname}') tdSql.checkRows(1) + tdSql.error(f'select {key} from {self.ntbname}') def alter_check_tb(self): tag_tinyint = random.randint(constant.TINYINT_MIN,constant.TINYINT_MAX) @@ -277,7 +278,11 @@ class TDTestCase: else: for v in self.column_dict.values(): tdSql.error(f'alter table {self.stbname} modify column {key} {v}') - + for key,values in self.column_dict.items(): + rename_str = f'{tdCom.getLongName(constant.COL_NAME_LENGTH_MAX,"letters")}' + tdSql.error(f'alter table {self.stbname} rename column {key} {rename_str}') + for i in range(self.tbnum): + tdSql.error(f'alter table {self.stbname}_{i} rename column {key} {rename_str}') def run(self): self.alter_check_ntb() self.alter_check_tb() diff --git a/tests/system-test/2-query/Now.py b/tests/system-test/2-query/Now.py index 6785fddc6f..3caf632209 100644 --- a/tests/system-test/2-query/Now.py +++ b/tests/system-test/2-query/Now.py @@ -40,6 +40,7 @@ class TDTestCase: self.time_unit = ['b','u','a','s','m','h','d','w'] self.symbol = ['+','-','*','/'] self.error_values = [1.5,'abc','"abc"','!@','today()'] + self.db_percision = ['ms','us','ns'] def tbtype_check(self,tb_type): if tb_type == 'normal table' or tb_type == 'child table': tdSql.checkRows(len(self.values_list)) @@ -70,23 +71,29 @@ class TDTestCase: tdSql.checkData(i,0,None) def now_check_ntb(self): - tdSql.prepare() - tdSql.execute(self.setsql.set_create_normaltable_sql(self.ntbname,self.column_dict)) - for value in self.values_list: - tdSql.execute( - f'insert into {self.ntbname} values({value})') - self.data_check(self.ntbname,'normal table') + for time_unit in self.db_percision: + tdSql.execute(f'create database db precision "{time_unit}"') + tdSql.execute('use db') + tdSql.execute(self.setsql.set_create_normaltable_sql(self.ntbname,self.column_dict)) + for value in self.values_list: + tdSql.execute( + f'insert into {self.ntbname} values({value})') + self.data_check(self.ntbname,'normal table') + tdSql.execute('drop database db') def now_check_stb(self): - tdSql.prepare() - tdSql.execute(self.setsql.set_create_stable_sql(self.stbname,self.column_dict,self.tag_dict)) - for i in range(self.tbnum): - tdSql.execute(f"create table {self.stbname}_{i} using {self.stbname} tags({self.tag_values[0]})") - for value in self.values_list: - tdSql.execute(f'insert into {self.stbname}_{i} values({value})') - for i in range(self.tbnum): - self.data_check(f'{self.stbname}_{i}','child table') - self.data_check(self.stbname,'stable') + for time_unit in self.db_percision: + tdSql.execute(f'create database db precision "{time_unit}"') + tdSql.execute('use db') + tdSql.execute(self.setsql.set_create_stable_sql(self.stbname,self.column_dict,self.tag_dict)) + for i in range(self.tbnum): + tdSql.execute(f"create table {self.stbname}_{i} using {self.stbname} tags({self.tag_values[0]})") + for value in self.values_list: + tdSql.execute(f'insert into {self.stbname}_{i} values({value})') + for i in range(self.tbnum): + self.data_check(f'{self.stbname}_{i}','child table') + self.data_check(self.stbname,'stable') + tdSql.execute('drop database db') def run(self): # sourcery skip: extract-duplicate-method self.now_check_ntb() diff --git a/tests/system-test/2-query/Today.py b/tests/system-test/2-query/Today.py index a9c3215a12..e6199d629e 100644 --- a/tests/system-test/2-query/Today.py +++ b/tests/system-test/2-query/Today.py @@ -5,6 +5,7 @@ from util.log import * from util.sql import * from util.cases import * import datetime +import pandas as pd class TDTestCase: @@ -12,8 +13,8 @@ class TDTestCase: def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor()) - self.today_date = datetime.datetime.strptime( - datetime.datetime.now().strftime("%Y-%m-%d"), "%Y-%m-%d") + self.today_date = datetime.datetime.strptime(datetime.datetime.now().strftime("%Y-%m-%d"), "%Y-%m-%d") + self.today_ts = datetime.datetime.strptime(datetime.datetime.now().strftime("%Y-%m-%d"), "%Y-%m-%d").timestamp() self.time_unit = ['b','u','a','s','m','h','d','w'] self.error_param = ['1.5','abc','!@#','"abc"','today()'] self.arithmetic_operators = ['+','-','*','/'] @@ -41,7 +42,7 @@ class TDTestCase: f'today(),3,3.333,333.333333,now()', f'today()-1d,10,11.11,99.999999,now()', f'today()+1d,1,1.55,100.555555,today()'] - + self.db_percision = ['ms','us','ns'] def set_create_normaltable_sql(self, ntbname, column_dict): column_sql = '' for k, v in column_dict.items(): @@ -57,7 +58,8 @@ class TDTestCase: tag_sql += f"{k} {v}," create_stb_sql = f'create table {stbname} ({column_sql[:-1]}) tags({tag_sql[:-1]})' return create_stb_sql - def data_check(self,column_dict={},tbname = '',values_list = [],tb_num = 1,tb = 'tb'): + + def data_check(self,column_dict={},tbname = '',values_list = [],tb_num = 1,tb = 'tb',precision = 'ms'): for k,v in column_dict.items(): num_up = 0 num_down = 0 @@ -65,12 +67,27 @@ class TDTestCase: if v.lower() == 'timestamp': tdSql.query(f'select {k} from {tbname}') for i in tdSql.queryResult: - if i[0] > self.today_date: - num_up += 1 - elif i[0] == self.today_date: - num_same += 1 - elif i[0] < self.today_date: - num_down += 1 + if precision == 'ms': + if int(i[0].timestamp())*1000 > int(self.today_ts)*1000: + num_up += 1 + elif int(i[0].timestamp())*1000 == int(self.today_ts)*1000: + num_same += 1 + elif int(i[0].timestamp())*1000 < int(self.today_ts)*1000: + num_down += 1 + elif precision == 'us': + if int(i[0].timestamp())*1000000 > int(self.today_ts)*1000000: + num_up += 1 + elif int(i[0].timestamp())*1000000 == int(self.today_ts)*1000000: + num_same += 1 + elif int(i[0].timestamp())*1000000 < int(self.today_ts)*1000000: + num_down += 1 + elif precision == 'ns': + if i[0] > int(self.today_ts)*1000000000: + num_up += 1 + elif i[0] == int(self.today_ts)*1000000000: + num_same += 1 + elif i[0] < int(self.today_ts)*1000000000: + num_down += 1 tdSql.query(f"select today() from {tbname}") tdSql.checkRows(len(values_list)*tb_num) tdSql.checkData(0, 0, str(self.today_date)) @@ -130,32 +147,36 @@ class TDTestCase: for i in range(num_same): tdSql.checkData(i, 0, str(self.today_date)) def today_check_ntb(self): - tdSql.prepare() - tdSql.execute(self.set_create_normaltable_sql(self.ntbname,self.column_dict)) - for i in self.values_list: - tdSql.execute( - f'insert into {self.ntbname} values({i})') - self.data_check(self.column_dict,self.ntbname,self.values_list) - tdSql.execute('drop database db') + for time_unit in self.db_percision: + print(time_unit) + tdSql.execute(f'create database db precision "{time_unit}"') + tdSql.execute('use db') + tdSql.execute(self.set_create_normaltable_sql(self.ntbname,self.column_dict)) + for i in self.values_list: + tdSql.execute( + f'insert into {self.ntbname} values({i})') + self.data_check(self.column_dict,self.ntbname,self.values_list,1,'tb',time_unit) + tdSql.execute('drop database db') def today_check_stb_tb(self): - tdSql.prepare() - tdSql.execute(self.set_create_stable_sql(self.stbname,self.column_dict,self.tag_dict)) - for i in range(self.tbnum): - tdSql.execute(f'create table if not exists {self.stbname}_{i} using {self.stbname} tags({self.tag_values[i]})') - for j in self.values_list: - tdSql.execute(f'insert into {self.stbname}_{i} values ({j})') - # check child table - for i in range(self.tbnum): - self.data_check(self.column_dict,f'{self.stbname}_{i}',self.values_list) - # check stable - self.data_check(self.column_dict,self.stbname,self.values_list,self.tbnum,'stb') - tdSql.execute('drop database db') + for time_unit in self.db_percision: + print(time_unit) + tdSql.execute(f'create database db precision "{time_unit}"') + tdSql.execute('use db') + tdSql.execute(self.set_create_stable_sql(self.stbname,self.column_dict,self.tag_dict)) + for i in range(self.tbnum): + tdSql.execute(f'create table if not exists {self.stbname}_{i} using {self.stbname} tags({self.tag_values[i]})') + for j in self.values_list: + tdSql.execute(f'insert into {self.stbname}_{i} values ({j})') + # check child table + for i in range(self.tbnum): + self.data_check(self.column_dict,f'{self.stbname}_{i}',self.values_list,1,'tb',time_unit) + # check stable + self.data_check(self.column_dict,self.stbname,self.values_list,self.tbnum,'stb',time_unit) + tdSql.execute('drop database db') def run(self): # sourcery skip: extract-duplicate-method - tdLog.printNoPrefix("==========check today() for normal table ==========") self.today_check_ntb() - tdLog.printNoPrefix("==========check today() for stable and child table==========") self.today_check_stb_tb() def stop(self): diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg.py b/tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg.py new file mode 100644 index 0000000000..a4d6648276 --- /dev/null +++ b/tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg.py @@ -0,0 +1,242 @@ + +import taos +import sys +import time +import socket +import os +import threading +import math + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + def __init__(self): + self.vgroups = 4 + self.ctbNum = 10 + self.rowsPerTbl = 10000 + + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + def prepareTestEnv(self): + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 10000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 3, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 1} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tmqCom.initConsumerTable() + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) + tdLog.info("create stb") + tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) + tdLog.info("create ctb") + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + tdLog.info("insert data") + tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + + tdLog.info("restart taosd to ensure that the data falls into the disk") + tdDnodes.stop(1) + # tdDnodes.start(1) + tdDnodes.starttaosd(1) + return + + def tmqCase1(self): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 10000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 3, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 1} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + topicNameList = ['topic1'] + expectRowsList = [] + tmqCom.initConsumerTable() + + tdLog.info("create topics from stb with filter") + queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) + # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicNameList[0], queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + tdSql.query(queryString) + expectRowsList.append(tdSql.getRows()) + + # init consume info, and start tmq_sim, then check consume result + tdLog.info("insert consume info to consume processor") + consumerId = 0 + expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] + topicList = topicNameList[0] + ifcheckdata = 1 + ifManualCommit = 1 + keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + tdLog.info("wait the consume result") + + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + + if expectRowsList[0] != resultList[0]: + tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) + tdLog.exit("%d tmq consume rows error!"%consumerId) + + # tmqCom.checkFileContent(consumerId, queryString) + + time.sleep(10) + for i in range(len(topicNameList)): + tdSql.query("drop topic %s"%topicNameList[i]) + + tdLog.printNoPrefix("======== test case 1 end ...... ") + + def tmqCase2(self): + tdLog.printNoPrefix("======== test case 2: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 10000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 3, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 1} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + topicNameList = ['topic1'] + expectRowsList = [] + tmqCom.initConsumerTable() + + tdLog.info("create topics from stb with filter") + queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) + # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicNameList[0], queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + tdSql.query(queryString) + expectRowsList.append(tdSql.getRows()) + totalRowsInserted = expectRowsList[0] + + # init consume info, and start tmq_sim, then check consume result + tdLog.info("insert consume info to consume processor") + consumerId = 1 + expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] / 3) + topicList = topicNameList[0] + ifcheckdata = 1 + ifManualCommit = 1 + keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor 0") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + tdLog.info("wait the consume result") + + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + + if not (expectrowcnt <= resultList[0] and totalRowsInserted >= resultList[0]): + tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(resultList[0], expectrowcnt, totalRowsInserted)) + tdLog.exit("%d tmq consume rows error!"%consumerId) + + firstConsumeRows = resultList[0] + + # reinit consume info, and start tmq_sim, then check consume result + tmqCom.initConsumerTable() + consumerId = 2 + expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2/3) + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor 1") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + tdLog.info("wait the consume result") + + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + + actConsumeTotalRows = firstConsumeRows + resultList[0] + + if not (expectrowcnt >= resultList[0] and totalRowsInserted == actConsumeTotalRows): + tdLog.info("act consume rows, first: %d, second: %d "%(firstConsumeRows, resultList[0])) + tdLog.info("and sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted)) + tdLog.exit("%d tmq consume rows error!"%consumerId) + + time.sleep(10) + for i in range(len(topicNameList)): + tdSql.query("drop topic %s"%topicNameList[i]) + + tdLog.printNoPrefix("======== test case 2 end ...... ") + + def run(self): + tdSql.prepare() + self.prepareTestEnv() + self.tmqCase1() + self.tmqCase2() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index ead3970b48..8f5791f1c5 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -116,8 +116,8 @@ python3 ./test.py -f 2-query/irate.py python3 ./test.py -f 2-query/function_null.py python3 ./test.py -f 2-query/queryQnode.py -python3 ./test.py -f 6-cluster/5dnode1mnode.py -python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3 +#python3 ./test.py -f 6-cluster/5dnode1mnode.py +#python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3 #python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3 #python3 ./test.py -f 6-cluster/5dnode3mnodeStopLoop.py -N 5 -M 3 # BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M 3 @@ -157,3 +157,4 @@ python3 ./test.py -f 7-tmq/tmqShow.py python3 ./test.py -f 7-tmq/tmqAlterSchema.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb1.py +python3 ./test.py -f 7-tmq/tmqConsFromTsdb-mutilVg.py diff --git a/tools/taos-tools b/tools/taos-tools index 5fdd694621..7105027650 160000 --- a/tools/taos-tools +++ b/tools/taos-tools @@ -1 +1 @@ -Subproject commit 5fdd694621fbb7bd2d6102ff4feaec92a7001038 +Subproject commit 7105027650b51e701cfa1dac11b8fb42d447dd01