Merge pull request #29913 from taosdata/enh/TS-5445-3.0

feat: compact meta
This commit is contained in:
Simon Guan 2025-02-28 23:36:25 +08:00 committed by GitHub
commit f171c81442
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 565 additions and 321 deletions

View File

@ -20,9 +20,9 @@ if(${BUILD_WITH_SQLITE})
add_subdirectory(sqlite) add_subdirectory(sqlite)
endif(${BUILD_WITH_SQLITE}) endif(${BUILD_WITH_SQLITE})
if(${BUILD_S3}) # if(${BUILD_S3})
add_subdirectory(azure) # add_subdirectory(azure)
endif() # endif()
add_subdirectory(tdev) add_subdirectory(tdev)
add_subdirectory(lz4) add_subdirectory(lz4)

View File

@ -16,8 +16,8 @@ TDengine is designed for various writing scenarios, and many of these scenarios
### Syntax ### Syntax
```sql ```sql
COMPACT DATABASE db_name [start with 'XXXX'] [end with 'YYYY']; COMPACT DATABASE db_name [start with 'XXXX'] [end with 'YYYY'] [META_ONLY];
COMPACT [db_name.]VGROUPS IN (vgroup_id1, vgroup_id2, ...) [start with 'XXXX'] [end with 'YYYY']; COMPACT [db_name.]VGROUPS IN (vgroup_id1, vgroup_id2, ...) [start with 'XXXX'] [end with 'YYYY'] [META_ONLY];
SHOW COMPACTS; SHOW COMPACTS;
SHOW COMPACT compact_id; SHOW COMPACT compact_id;
KILL COMPACT compact_id; KILL COMPACT compact_id;
@ -30,6 +30,7 @@ KILL COMPACT compact_id;
- COMPACT will merge multiple STT files - COMPACT will merge multiple STT files
- You can specify the start time of the COMPACT data with the start with keyword - You can specify the start time of the COMPACT data with the start with keyword
- You can specify the end time of the COMPACT data with the end with keyword - You can specify the end time of the COMPACT data with the end with keyword
- You can specify the META_ONLY keyword to only compact the meta data which are not compacted by default
- The COMPACT command will return the ID of the COMPACT task - The COMPACT command will return the ID of the COMPACT task
- COMPACT tasks are executed asynchronously in the background, and you can view the progress of COMPACT tasks using the SHOW COMPACTS command - COMPACT tasks are executed asynchronously in the background, and you can view the progress of COMPACT tasks using the SHOW COMPACTS command
- The SHOW command will return the ID of the COMPACT task, and you can terminate the COMPACT task using the KILL COMPACT command - The SHOW command will return the ID of the COMPACT task, and you can terminate the COMPACT task using the KILL COMPACT command

View File

@ -17,8 +17,8 @@ TDengine 面向多种写入场景而很多写入场景下TDengine 的存
### 语法 ### 语法
```SQL ```SQL
COMPACT DATABASE db_name [start with 'XXXX'] [end with 'YYYY']; COMPACT DATABASE db_name [start with 'XXXX'] [end with 'YYYY'] [META_ONLY];
COMPACT [db_name.]VGROUPS IN (vgroup_id1, vgroup_id2, ...) [start with 'XXXX'] [end with 'YYYY']; COMPACT [db_name.]VGROUPS IN (vgroup_id1, vgroup_id2, ...) [start with 'XXXX'] [end with 'YYYY'] [META_ONLY];
SHOW COMPACTS; SHOW COMPACTS;
SHOW COMPACT compact_id; SHOW COMPACT compact_id;
KILL COMPACT compact_id; KILL COMPACT compact_id;
@ -32,6 +32,7 @@ KILL COMPACT compact_id;
- COMPACT 会合并多个 STT 文件 - COMPACT 会合并多个 STT 文件
- 可通过 start with 关键字指定 COMPACT 数据的起始时间 - 可通过 start with 关键字指定 COMPACT 数据的起始时间
- 可通过 end with 关键字指定 COMPACT 数据的终止时间 - 可通过 end with 关键字指定 COMPACT 数据的终止时间
- 可通过 `META_ONLY` 关键字指定只 compact 元数据。元数据默认情况下不会 compact。
- COMPACT 命令会返回 COMPACT 任务的 ID - COMPACT 命令会返回 COMPACT 任务的 ID
- COMPACT 任务会在后台异步执行,可以通过 SHOW COMPACTS 命令查看 COMPACT 任务的进度 - COMPACT 任务会在后台异步执行,可以通过 SHOW COMPACTS 命令查看 COMPACT 任务的进度
- SHOW 命令会返回 COMPACT 任务的 ID可以通过 KILL COMPACT 命令终止 COMPACT 任务 - SHOW 命令会返回 COMPACT 任务的 ID可以通过 KILL COMPACT 命令终止 COMPACT 任务

View File

@ -1640,6 +1640,7 @@ typedef struct {
int32_t sqlLen; int32_t sqlLen;
char* sql; char* sql;
SArray* vgroupIds; SArray* vgroupIds;
int8_t metaOnly;
} SCompactDbReq; } SCompactDbReq;
int32_t tSerializeSCompactDbReq(void* buf, int32_t bufLen, SCompactDbReq* pReq); int32_t tSerializeSCompactDbReq(void* buf, int32_t bufLen, SCompactDbReq* pReq);
@ -2084,6 +2085,7 @@ typedef struct {
int64_t compactStartTime; int64_t compactStartTime;
STimeWindow tw; STimeWindow tw;
int32_t compactId; int32_t compactId;
int8_t metaOnly;
} SCompactVnodeReq; } SCompactVnodeReq;
int32_t tSerializeSCompactVnodeReq(void* buf, int32_t bufLen, SCompactVnodeReq* pReq); int32_t tSerializeSCompactVnodeReq(void* buf, int32_t bufLen, SCompactVnodeReq* pReq);

View File

@ -169,6 +169,7 @@ typedef struct SCompactDatabaseStmt {
char dbName[TSDB_DB_NAME_LEN]; char dbName[TSDB_DB_NAME_LEN];
SNode* pStart; SNode* pStart;
SNode* pEnd; SNode* pEnd;
bool metaOnly;
} SCompactDatabaseStmt; } SCompactDatabaseStmt;
typedef struct SCompactVgroupsStmt { typedef struct SCompactVgroupsStmt {
@ -177,6 +178,7 @@ typedef struct SCompactVgroupsStmt {
SNodeList* vgidList; SNodeList* vgidList;
SNode* pStart; SNode* pStart;
SNode* pEnd; SNode* pEnd;
bool metaOnly;
} SCompactVgroupsStmt; } SCompactVgroupsStmt;
typedef struct STableOptions { typedef struct STableOptions {

View File

@ -4686,6 +4686,8 @@ int32_t tSerializeSCompactDbReq(void *buf, int32_t bufLen, SCompactDbReq *pReq)
} }
} }
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->metaOnly));
tEndEncode(&encoder); tEndEncode(&encoder);
_exit: _exit:
@ -4729,6 +4731,12 @@ int32_t tDeserializeSCompactDbReq(void *buf, int32_t bufLen, SCompactDbReq *pReq
} }
} }
} }
if (!tDecodeIsEnd(&decoder)) {
TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->metaOnly));
} else {
pReq->metaOnly = false;
}
tEndDecode(&decoder); tEndDecode(&decoder);
_exit: _exit:
@ -7156,6 +7164,7 @@ int32_t tSerializeSCompactVnodeReq(void *buf, int32_t bufLen, SCompactVnodeReq *
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->tw.ekey)); TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->tw.ekey));
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->compactId)); TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->compactId));
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->metaOnly));
tEndEncode(&encoder); tEndEncode(&encoder);
@ -7193,6 +7202,12 @@ int32_t tDeserializeSCompactVnodeReq(void *buf, int32_t bufLen, SCompactVnodeReq
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->compactId)); TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->compactId));
} }
if (!tDecodeIsEnd(&decoder)) {
TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->metaOnly));
} else {
pReq->metaOnly = false;
}
tEndDecode(&decoder); tEndDecode(&decoder);
_exit: _exit:
tDecoderClear(&decoder); tDecoderClear(&decoder);

View File

@ -170,8 +170,6 @@ static void dmSetSignalHandle() {
#endif #endif
} }
extern bool generateNewMeta;
static int32_t dmParseArgs(int32_t argc, char const *argv[]) { static int32_t dmParseArgs(int32_t argc, char const *argv[]) {
global.startTime = taosGetTimestampMs(); global.startTime = taosGetTimestampMs();
@ -210,8 +208,6 @@ static int32_t dmParseArgs(int32_t argc, char const *argv[]) {
global.dumpSdb = true; global.dumpSdb = true;
} else if (strcmp(argv[i], "-dTxn") == 0) { } else if (strcmp(argv[i], "-dTxn") == 0) {
global.deleteTrans = true; global.deleteTrans = true;
} else if (strcmp(argv[i], "-r") == 0) {
generateNewMeta = true;
} else if (strcmp(argv[i], "-E") == 0) { } else if (strcmp(argv[i], "-E") == 0) {
if (i < argc - 1) { if (i < argc - 1) {
if (strlen(argv[++i]) >= PATH_MAX) { if (strlen(argv[++i]) >= PATH_MAX) {

View File

@ -47,7 +47,7 @@ int32_t mndSetMoveVgroupsInfoToTrans(SMnode *, STrans *pTrans, int32_t dropDnode
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup, int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
SArray *pArray, SVgObj* pNewVgroup); SArray *pArray, SVgObj* pNewVgroup);
int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs, int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
STimeWindow tw); STimeWindow tw, bool metaOnly);
int32_t mndBuildRaftAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup, int32_t mndBuildRaftAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
SArray *pArray); SArray *pArray);

View File

@ -927,7 +927,8 @@ static int32_t mndCompactDispatchAudit(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pD
return 0; return 0;
} }
extern int32_t mndCompactDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow tw, SArray *vgroupIds); extern int32_t mndCompactDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow tw, SArray *vgroupIds,
bool metaOnly);
static int32_t mndCompactDispatch(SRpcMsg *pReq) { static int32_t mndCompactDispatch(SRpcMsg *pReq) {
int32_t code = 0; int32_t code = 0;
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
@ -982,7 +983,7 @@ static int32_t mndCompactDispatch(SRpcMsg *pReq) {
.skey = convertTimePrecision(curMs + compactStartTime * 60000LL, TSDB_TIME_PRECISION_MILLI, pDb->cfg.precision), .skey = convertTimePrecision(curMs + compactStartTime * 60000LL, TSDB_TIME_PRECISION_MILLI, pDb->cfg.precision),
.ekey = convertTimePrecision(curMs + compactEndTime * 60000LL, TSDB_TIME_PRECISION_MILLI, pDb->cfg.precision)}; .ekey = convertTimePrecision(curMs + compactEndTime * 60000LL, TSDB_TIME_PRECISION_MILLI, pDb->cfg.precision)};
if ((code = mndCompactDb(pMnode, NULL, pDb, tw, NULL)) == 0) { if ((code = mndCompactDb(pMnode, NULL, pDb, tw, NULL, false)) == 0) {
mInfo("db:%p,%s, succeed to dispatch compact with range:[%" PRIi64 ",%" PRIi64 "], interval:%dm, start:%" PRIi64 mInfo("db:%p,%s, succeed to dispatch compact with range:[%" PRIi64 ",%" PRIi64 "], interval:%dm, start:%" PRIi64
"m, end:%" PRIi64 "m, offset:%" PRIi8 "h", "m, end:%" PRIi64 "m, offset:%" PRIi8 "h",
pDb, pDb->name, tw.skey, tw.ekey, pDb->cfg.compactInterval, compactStartTime, compactEndTime, pDb, pDb->name, tw.skey, tw.ekey, pDb->cfg.compactInterval, compactStartTime, compactEndTime,

View File

@ -3632,11 +3632,12 @@ bool mndVgroupInDnode(SVgObj *pVgroup, int32_t dnodeId) {
} }
static void *mndBuildCompactVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, int64_t compactTs, static void *mndBuildCompactVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, int64_t compactTs,
STimeWindow tw) { STimeWindow tw, bool metaOnly) {
SCompactVnodeReq compactReq = {0}; SCompactVnodeReq compactReq = {0};
compactReq.dbUid = pDb->uid; compactReq.dbUid = pDb->uid;
compactReq.compactStartTime = compactTs; compactReq.compactStartTime = compactTs;
compactReq.tw = tw; compactReq.tw = tw;
compactReq.metaOnly = metaOnly;
tstrncpy(compactReq.db, pDb->name, TSDB_DB_FNAME_LEN); tstrncpy(compactReq.db, pDb->name, TSDB_DB_FNAME_LEN);
mInfo("vgId:%d, build compact vnode config req", pVgroup->vgId); mInfo("vgId:%d, build compact vnode config req", pVgroup->vgId);
@ -3667,13 +3668,13 @@ static void *mndBuildCompactVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgrou
} }
static int32_t mndAddCompactVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs, static int32_t mndAddCompactVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
STimeWindow tw) { STimeWindow tw, bool metaOnly) {
int32_t code = 0; int32_t code = 0;
STransAction action = {0}; STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup); action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildCompactVnodeReq(pMnode, pDb, pVgroup, &contLen, compactTs, tw); void *pReq = mndBuildCompactVnodeReq(pMnode, pDb, pVgroup, &contLen, compactTs, tw, metaOnly);
if (pReq == NULL) { if (pReq == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL; code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno; if (terrno != 0) code = terrno;
@ -3693,7 +3694,7 @@ static int32_t mndAddCompactVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *
} }
int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs, int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
STimeWindow tw) { STimeWindow tw, bool metaOnly) {
TAOS_CHECK_RETURN(mndAddCompactVnodeAction(pMnode, pTrans, pDb, pVgroup, compactTs, tw)); TAOS_CHECK_RETURN(mndAddCompactVnodeAction(pMnode, pTrans, pDb, pVgroup, compactTs, tw, metaOnly));
return 0; return 0;
} }

View File

@ -96,6 +96,7 @@ if(TD_VNODE_PLUGINS)
vnode vnode
PRIVATE PRIVATE
${TD_ENTERPRISE_DIR}/src/plugins/vnode/src/tsdbCompact.c ${TD_ENTERPRISE_DIR}/src/plugins/vnode/src/tsdbCompact.c
${TD_ENTERPRISE_DIR}/src/plugins/vnode/src/metaCompact.c
${TD_ENTERPRISE_DIR}/src/plugins/vnode/src/tsdbCompactMonitor.c ${TD_ENTERPRISE_DIR}/src/plugins/vnode/src/tsdbCompactMonitor.c
${TD_ENTERPRISE_DIR}/src/plugins/vnode/src/vnodeCompact.c ${TD_ENTERPRISE_DIR}/src/plugins/vnode/src/vnodeCompact.c
) )

View File

@ -24,7 +24,6 @@
extern "C" { extern "C" {
#endif #endif
typedef struct SMetaIdx SMetaIdx;
typedef struct SMetaDB SMetaDB; typedef struct SMetaDB SMetaDB;
typedef struct SMetaCache SMetaCache; typedef struct SMetaCache SMetaCache;
@ -103,8 +102,6 @@ struct SMeta {
// stream // stream
TTB* pStreamDb; TTB* pStreamDb;
SMetaIdx* pIdx;
SMetaCache* pCache; SMetaCache* pCache;
}; };

View File

@ -168,7 +168,7 @@ int metaTtlFindExpired(SMeta* pMeta, int64_t timePointMs, SArray* tb
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp); int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
int metaUpdateChangeTimeWithLock(SMeta* pMeta, tb_uid_t uid, int64_t changeTimeMs); int metaUpdateChangeTimeWithLock(SMeta* pMeta, tb_uid_t uid, int64_t changeTimeMs);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock); SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);
int64_t metaGetTableCreateTime(SMeta *pMeta, tb_uid_t uid, int lock); int64_t metaGetTableCreateTime(SMeta* pMeta, tb_uid_t uid, int lock);
int32_t metaGetTbTSchemaNotNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema); int32_t metaGetTbTSchemaNotNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema);
int32_t metaGetTbTSchemaMaybeNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema); int32_t metaGetTbTSchemaMaybeNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema);
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock); STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);
@ -487,7 +487,13 @@ struct SVnode {
// commit variables // commit variables
SVATaskID commitTask; SVATaskID commitTask;
struct {
TdThreadRwlock metaRWLock;
SMeta* pMeta; SMeta* pMeta;
SMeta* pNewMeta;
SVATaskID metaCompactTask;
};
SSma* pSma; SSma* pSma;
STsdb* pTsdb; STsdb* pTsdb;
SWal* pWal; SWal* pWal;

View File

@ -135,12 +135,17 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
return 0; return 0;
} }
int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) { int metaDecodeEntryImpl(SDecoder *pCoder, SMetaEntry *pME, bool headerOnly) {
TAOS_CHECK_RETURN(tStartDecode(pCoder)); TAOS_CHECK_RETURN(tStartDecode(pCoder));
TAOS_CHECK_RETURN(tDecodeI64(pCoder, &pME->version)); TAOS_CHECK_RETURN(tDecodeI64(pCoder, &pME->version));
TAOS_CHECK_RETURN(tDecodeI8(pCoder, &pME->type)); TAOS_CHECK_RETURN(tDecodeI8(pCoder, &pME->type));
TAOS_CHECK_RETURN(tDecodeI64(pCoder, &pME->uid)); TAOS_CHECK_RETURN(tDecodeI64(pCoder, &pME->uid));
if (headerOnly) {
tEndDecode(pCoder);
return 0;
}
if (pME->type > 0) { if (pME->type > 0) {
TAOS_CHECK_RETURN(tDecodeCStr(pCoder, &pME->name)); TAOS_CHECK_RETURN(tDecodeCStr(pCoder, &pME->name));
@ -209,6 +214,8 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
return 0; return 0;
} }
int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) { return metaDecodeEntryImpl(pCoder, pME, false); }
static int32_t metaCloneSchema(const SSchemaWrapper *pSrc, SSchemaWrapper *pDst) { static int32_t metaCloneSchema(const SSchemaWrapper *pSrc, SSchemaWrapper *pDst) {
if (pSrc == NULL || pDst == NULL) { if (pSrc == NULL || pDst == NULL) {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;

View File

@ -133,7 +133,7 @@ static void doScan(SMeta *pMeta) {
} }
} }
static int32_t metaOpenImpl(SVnode *pVnode, SMeta **ppMeta, const char *metaDir, int8_t rollback) { int32_t metaOpenImpl(SVnode *pVnode, SMeta **ppMeta, const char *metaDir, int8_t rollback) {
SMeta *pMeta = NULL; SMeta *pMeta = NULL;
int32_t code = 0; int32_t code = 0;
int32_t lino; int32_t lino;
@ -251,187 +251,35 @@ _exit:
return code; return code;
} }
bool generateNewMeta = false; void vnodeGetMetaPath(SVnode *pVnode, const char *metaDir, char *fname) {
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, fname, TSDB_FILENAME_LEN);
static int32_t metaGenerateNewMeta(SMeta **ppMeta) { int32_t offset = strlen(fname);
SMeta *pNewMeta = NULL; snprintf(fname + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, metaDir);
SMeta *pMeta = *ppMeta;
SVnode *pVnode = pMeta->pVnode;
metaInfo("vgId:%d start to generate new meta", TD_VID(pMeta->pVnode));
// Open a new meta for orgainzation
int32_t code = metaOpenImpl(pMeta->pVnode, &pNewMeta, VNODE_META_TMP_DIR, false);
if (code) {
return code;
}
code = metaBegin(pNewMeta, META_BEGIN_HEAP_NIL);
if (code) {
return code;
}
// i == 0, scan super table
// i == 1, scan normal table and child table
for (int i = 0; i < 2; i++) {
TBC *uidCursor = NULL;
int32_t counter = 0;
code = tdbTbcOpen(pMeta->pUidIdx, &uidCursor, NULL);
if (code) {
metaError("vgId:%d failed to open uid index cursor, reason:%s", TD_VID(pVnode), tstrerror(code));
return code;
}
code = tdbTbcMoveToFirst(uidCursor);
if (code) {
metaError("vgId:%d failed to move to first, reason:%s", TD_VID(pVnode), tstrerror(code));
tdbTbcClose(uidCursor);
return code;
}
for (;;) {
const void *pKey;
int kLen;
const void *pVal;
int vLen;
if (tdbTbcGet(uidCursor, &pKey, &kLen, &pVal, &vLen) < 0) {
break;
}
tb_uid_t uid = *(tb_uid_t *)pKey;
SUidIdxVal *pUidIdxVal = (SUidIdxVal *)pVal;
if ((i == 0 && (pUidIdxVal->suid && pUidIdxVal->suid == uid)) // super table
|| (i == 1 && (pUidIdxVal->suid == 0 || pUidIdxVal->suid != uid)) // normal table and child table
) {
counter++;
if (i == 0) {
metaInfo("vgId:%d counter:%d new meta handle %s table uid:%" PRId64, TD_VID(pVnode), counter, "super", uid);
} else {
metaInfo("vgId:%d counter:%d new meta handle %s table uid:%" PRId64, TD_VID(pVnode), counter,
pUidIdxVal->suid == 0 ? "normal" : "child", uid);
}
// fetch table entry
void *value = NULL;
int valueSize = 0;
if (tdbTbGet(pMeta->pTbDb,
&(STbDbKey){
.version = pUidIdxVal->version,
.uid = uid,
},
sizeof(uid), &value, &valueSize) == 0) {
SDecoder dc = {0};
SMetaEntry me = {0};
tDecoderInit(&dc, value, valueSize);
if (metaDecodeEntry(&dc, &me) == 0) {
if (me.type == TSDB_CHILD_TABLE &&
tdbTbGet(pMeta->pUidIdx, &me.ctbEntry.suid, sizeof(me.ctbEntry.suid), NULL, NULL) != 0) {
metaError("vgId:%d failed to get super table uid:%" PRId64 " for child table uid:%" PRId64,
TD_VID(pVnode), me.ctbEntry.suid, uid);
} else if (metaHandleEntry2(pNewMeta, &me) != 0) {
metaError("vgId:%d failed to handle entry, uid:%" PRId64, TD_VID(pVnode), uid);
}
}
tDecoderClear(&dc);
}
tdbFree(value);
}
code = tdbTbcMoveToNext(uidCursor);
if (code) {
metaError("vgId:%d failed to move to next, reason:%s", TD_VID(pVnode), tstrerror(code));
return code;
}
}
tdbTbcClose(uidCursor);
}
code = metaCommit(pNewMeta, pNewMeta->txn);
if (code) {
metaError("vgId:%d failed to commit, reason:%s", TD_VID(pVnode), tstrerror(code));
return code;
}
code = metaFinishCommit(pNewMeta, pNewMeta->txn);
if (code) {
metaError("vgId:%d failed to finish commit, reason:%s", TD_VID(pVnode), tstrerror(code));
return code;
}
if ((code = metaBegin(pNewMeta, META_BEGIN_HEAP_NIL)) != 0) {
metaError("vgId:%d failed to begin new meta, reason:%s", TD_VID(pVnode), tstrerror(code));
}
metaClose(&pNewMeta);
metaInfo("vgId:%d finish to generate new meta", TD_VID(pVnode));
return 0;
} }
int32_t metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) { int32_t metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
if (generateNewMeta) { int32_t code = TSDB_CODE_SUCCESS;
char path[TSDB_FILENAME_LEN] = {0}; char metaDir[TSDB_FILENAME_LEN] = {0};
char oldMetaPath[TSDB_FILENAME_LEN] = {0}; char metaTempDir[TSDB_FILENAME_LEN] = {0};
char newMetaPath[TSDB_FILENAME_LEN] = {0};
char backupMetaPath[TSDB_FILENAME_LEN] = {0};
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, path, TSDB_FILENAME_LEN); vnodeGetMetaPath(pVnode, VNODE_META_DIR, metaDir);
snprintf(oldMetaPath, sizeof(oldMetaPath) - 1, "%s%s%s", path, TD_DIRSEP, VNODE_META_DIR); vnodeGetMetaPath(pVnode, VNODE_META_TMP_DIR, metaTempDir);
snprintf(newMetaPath, sizeof(newMetaPath) - 1, "%s%s%s", path, TD_DIRSEP, VNODE_META_TMP_DIR);
snprintf(backupMetaPath, sizeof(backupMetaPath) - 1, "%s%s%s", path, TD_DIRSEP, VNODE_META_BACKUP_DIR);
bool oldMetaExist = taosCheckExistFile(oldMetaPath); // Check file states
bool newMetaExist = taosCheckExistFile(newMetaPath); if (!taosCheckExistFile(metaDir) && taosCheckExistFile(metaTempDir)) {
bool backupMetaExist = taosCheckExistFile(backupMetaPath); code = taosRenameFile(metaTempDir, metaDir);
if ((!backupMetaExist && !oldMetaExist && newMetaExist) // case 2
|| (backupMetaExist && !oldMetaExist && !newMetaExist) // case 4
|| (backupMetaExist && oldMetaExist && newMetaExist) // case 8
) {
metaError("vgId:%d invalid meta state, please check", TD_VID(pVnode));
return TSDB_CODE_FAILED;
} else if ((backupMetaExist && oldMetaExist && !newMetaExist) // case 7
|| (!backupMetaExist && !oldMetaExist && !newMetaExist) // case 1
) {
return metaOpenImpl(pVnode, ppMeta, VNODE_META_DIR, rollback);
} else if (backupMetaExist && !oldMetaExist && newMetaExist) {
if (taosRenameFile(newMetaPath, oldMetaPath) != 0) {
metaError("vgId:%d failed to rename new meta to old meta, reason:%s", TD_VID(pVnode), tstrerror(terrno));
return terrno;
}
return metaOpenImpl(pVnode, ppMeta, VNODE_META_DIR, rollback);
} else {
int32_t code = metaOpenImpl(pVnode, ppMeta, VNODE_META_DIR, rollback);
if (code) { if (code) {
return code; metaError("vgId:%d, %s failed at %s:%d since %s: rename %s to %s failed", TD_VID(pVnode), __func__, __FILE__,
} __LINE__, tstrerror(code), metaTempDir, metaDir);
code = metaGenerateNewMeta(ppMeta);
if (code) {
metaError("vgId:%d failed to generate new meta, reason:%s", TD_VID(pVnode), tstrerror(code));
}
metaClose(ppMeta);
if (taosRenameFile(oldMetaPath, backupMetaPath) != 0) {
metaError("vgId:%d failed to rename old meta to backup, reason:%s", TD_VID(pVnode), tstrerror(terrno));
return terrno;
}
// rename the new meta to old meta
if (taosRenameFile(newMetaPath, oldMetaPath) != 0) {
metaError("vgId:%d failed to rename new meta to old meta, reason:%s", TD_VID(pVnode), tstrerror(terrno));
return terrno;
}
code = metaOpenImpl(pVnode, ppMeta, VNODE_META_DIR, false);
if (code) {
metaError("vgId:%d failed to open new meta, reason:%s", TD_VID(pVnode), tstrerror(code));
return code; return code;
} }
} }
} else { // Do open meta
return metaOpenImpl(pVnode, ppMeta, VNODE_META_DIR, rollback); code = metaOpenImpl(pVnode, ppMeta, VNODE_META_DIR, rollback);
if (code) {
metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pVnode), __func__, __FILE__, __LINE__, tstrerror(code));
return code;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;

View File

@ -21,6 +21,7 @@ struct SMetaSnapReader {
int64_t sver; int64_t sver;
int64_t ever; int64_t ever;
TBC* pTbc; TBC* pTbc;
int32_t iLoop;
}; };
int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader) { int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader) {
@ -65,6 +66,22 @@ void metaSnapReaderClose(SMetaSnapReader** ppReader) {
} }
} }
extern int metaDecodeEntryImpl(SDecoder* pCoder, SMetaEntry* pME, bool headerOnly);
static int32_t metaDecodeEntryHeader(void* data, int32_t size, SMetaEntry* entry) {
SDecoder decoder = {0};
tDecoderInit(&decoder, (uint8_t*)data, size);
int32_t code = metaDecodeEntryImpl(&decoder, entry, true);
if (code) {
tDecoderClear(&decoder);
return code;
}
tDecoderClear(&decoder);
return 0;
}
int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) { int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0; int32_t code = 0;
const void* pKey = NULL; const void* pKey = NULL;
@ -72,19 +89,47 @@ int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
int32_t nKey = 0; int32_t nKey = 0;
int32_t nData = 0; int32_t nData = 0;
STbDbKey key; STbDbKey key;
int32_t c;
*ppData = NULL; *ppData = NULL;
for (;;) { while (pReader->iLoop < 2) {
if (tdbTbcGet(pReader->pTbc, &pKey, &nKey, &pData, &nData)) { if (tdbTbcGet(pReader->pTbc, &pKey, &nKey, &pData, &nData) != 0 || ((STbDbKey*)pKey)->version > pReader->ever) {
pReader->iLoop++;
// Reopen the cursor to read from the beginning
tdbTbcClose(pReader->pTbc);
pReader->pTbc = NULL;
code = tdbTbcOpen(pReader->pMeta->pTbDb, &pReader->pTbc, NULL);
if (code) {
metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pReader->pMeta->pVnode), __func__, __FILE__, __LINE__,
tstrerror(code));
goto _exit;
}
code = tdbTbcMoveTo(pReader->pTbc, &(STbDbKey){.version = pReader->sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c);
if (code) {
metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pReader->pMeta->pVnode), __func__, __FILE__, __LINE__,
tstrerror(code));
goto _exit;
}
continue;
}
// Decode meta entry
SMetaEntry entry = {0};
code = metaDecodeEntryHeader((void*)pData, nData, &entry);
if (code) {
metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pReader->pMeta->pVnode), __func__, __FILE__, __LINE__,
tstrerror(code));
goto _exit; goto _exit;
} }
key = ((STbDbKey*)pKey)[0]; key = ((STbDbKey*)pKey)[0];
if (key.version > pReader->ever) { if (key.version < pReader->sver //
goto _exit; || (pReader->iLoop == 0 && TABS(entry.type) != TSDB_SUPER_TABLE) // First loop send super table entry
} || (pReader->iLoop == 1 && TABS(entry.type) == TSDB_SUPER_TABLE) // Second loop send non-super table entry
) {
if (key.version < pReader->sver) {
if (tdbTbcMoveToNext(pReader->pTbc) != 0) { if (tdbTbcMoveToNext(pReader->pTbc) != 0) {
metaTrace("vgId:%d, vnode snapshot meta read data done", TD_VID(pReader->pMeta->pVnode)); metaTrace("vgId:%d, vnode snapshot meta read data done", TD_VID(pReader->pMeta->pVnode));
} }

View File

@ -449,6 +449,7 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
} }
// open meta // open meta
(void)taosThreadRwlockInit(&pVnode->metaRWLock, NULL);
vInfo("vgId:%d, start to open vnode meta", TD_VID(pVnode)); vInfo("vgId:%d, start to open vnode meta", TD_VID(pVnode));
if (metaOpen(pVnode, &pVnode->pMeta, rollback) < 0) { if (metaOpen(pVnode, &pVnode->pMeta, rollback) < 0) {
vError("vgId:%d, failed to open vnode meta since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to open vnode meta since %s", TD_VID(pVnode), tstrerror(terrno));
@ -548,6 +549,7 @@ _err:
if (pVnode->pMeta) metaClose(&pVnode->pMeta); if (pVnode->pMeta) metaClose(&pVnode->pMeta);
if (pVnode->freeList) vnodeCloseBufPool(pVnode); if (pVnode->freeList) vnodeCloseBufPool(pVnode);
(void)taosThreadRwlockDestroy(&pVnode->metaRWLock);
taosMemoryFree(pVnode); taosMemoryFree(pVnode);
return NULL; return NULL;
} }

View File

@ -49,7 +49,7 @@ int32_t fillTableColCmpr(SMetaReader *reader, SSchemaExt *pExt, int32_t numOfCol
return 0; return 0;
} }
void vnodePrintTableMeta(STableMetaRsp* pMeta) { void vnodePrintTableMeta(STableMetaRsp *pMeta) {
if (!(qDebugFlag & DEBUG_DEBUG)) { if (!(qDebugFlag & DEBUG_DEBUG)) {
return; return;
} }
@ -70,14 +70,13 @@ void vnodePrintTableMeta(STableMetaRsp* pMeta) {
qDebug("sysInfo:%d", pMeta->sysInfo); qDebug("sysInfo:%d", pMeta->sysInfo);
if (pMeta->pSchemas) { if (pMeta->pSchemas) {
for (int32_t i = 0; i < (pMeta->numOfColumns + pMeta->numOfTags); ++i) { for (int32_t i = 0; i < (pMeta->numOfColumns + pMeta->numOfTags); ++i) {
SSchema* pSchema = pMeta->pSchemas + i; SSchema *pSchema = pMeta->pSchemas + i;
qDebug("%d col/tag: type:%d, flags:%d, colId:%d, bytes:%d, name:%s", i, pSchema->type, pSchema->flags, pSchema->colId, pSchema->bytes, pSchema->name); qDebug("%d col/tag: type:%d, flags:%d, colId:%d, bytes:%d, name:%s", i, pSchema->type, pSchema->flags,
pSchema->colId, pSchema->bytes, pSchema->name);
} }
} }
} }
int32_t vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { int32_t vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
STableInfoReq infoReq = {0}; STableInfoReq infoReq = {0};
STableMetaRsp metaRsp = {0}; STableMetaRsp metaRsp = {0};
@ -528,6 +527,13 @@ _exit:
return code; return code;
} }
#define VNODE_DO_META_QUERY(pVnode, cmd) \
do { \
(void)taosThreadRwlockRdlock(&(pVnode)->metaRWLock); \
cmd; \
(void)taosThreadRwlockUnlock(&(pVnode)->metaRWLock); \
} while (0)
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
SSyncState state = syncGetState(pVnode->sync); SSyncState state = syncGetState(pVnode->sync);
pLoad->syncAppliedIndex = pVnode->state.applied; pLoad->syncAppliedIndex = pVnode->state.applied;
@ -543,8 +549,8 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
pLoad->learnerProgress = state.progress; pLoad->learnerProgress = state.progress;
pLoad->cacheUsage = tsdbCacheGetUsage(pVnode); pLoad->cacheUsage = tsdbCacheGetUsage(pVnode);
pLoad->numOfCachedTables = tsdbCacheGetElems(pVnode); pLoad->numOfCachedTables = tsdbCacheGetElems(pVnode);
pLoad->numOfTables = metaGetTbNum(pVnode->pMeta); VNODE_DO_META_QUERY(pVnode, pLoad->numOfTables = metaGetTbNum(pVnode->pMeta));
pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta, 1); VNODE_DO_META_QUERY(pVnode, pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta, 1));
pLoad->totalStorage = (int64_t)3 * 1073741824; pLoad->totalStorage = (int64_t)3 * 1073741824;
pLoad->compStorage = (int64_t)2 * 1073741824; pLoad->compStorage = (int64_t)2 * 1073741824;
pLoad->pointsWritten = 100; pLoad->pointsWritten = 100;

View File

@ -202,9 +202,9 @@ SNode* createAlterDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName, SNode*
SNode* createFlushDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName); SNode* createFlushDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName);
SNode* createTrimDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName, int32_t maxSpeed); SNode* createTrimDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName, int32_t maxSpeed);
SNode* createS3MigrateDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName); SNode* createS3MigrateDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName);
SNode* createCompactStmt(SAstCreateContext* pCxt, SToken* pDbName, SNode* pStart, SNode* pEnd); SNode* createCompactStmt(SAstCreateContext* pCxt, SToken* pDbName, SNode* pStart, SNode* pEnd, bool metaOnly);
SNode* createCompactVgroupsStmt(SAstCreateContext* pCxt, SNode* pDbName, SNodeList* vgidList, SNode* pStart, SNode* createCompactVgroupsStmt(SAstCreateContext* pCxt, SNode* pDbName, SNodeList* vgidList, SNode* pStart,
SNode* pEnd); SNode* pEnd, bool metaOnly);
SNode* createDefaultTableOptions(SAstCreateContext* pCxt); SNode* createDefaultTableOptions(SAstCreateContext* pCxt);
SNode* createAlterTableOptions(SAstCreateContext* pCxt); SNode* createAlterTableOptions(SAstCreateContext* pCxt);
SNode* setTableOption(SAstCreateContext* pCxt, SNode* pOptions, ETableOptionType type, void* pVal); SNode* setTableOption(SAstCreateContext* pCxt, SNode* pOptions, ETableOptionType type, void* pVal);

View File

@ -242,8 +242,13 @@ cmd ::= ALTER DATABASE db_name(A) alter_db_options(B).
cmd ::= FLUSH DATABASE db_name(A). { pCxt->pRootNode = createFlushDatabaseStmt(pCxt, &A); } cmd ::= FLUSH DATABASE db_name(A). { pCxt->pRootNode = createFlushDatabaseStmt(pCxt, &A); }
cmd ::= TRIM DATABASE db_name(A) speed_opt(B). { pCxt->pRootNode = createTrimDatabaseStmt(pCxt, &A, B); } cmd ::= TRIM DATABASE db_name(A) speed_opt(B). { pCxt->pRootNode = createTrimDatabaseStmt(pCxt, &A, B); }
cmd ::= S3MIGRATE DATABASE db_name(A). { pCxt->pRootNode = createS3MigrateDatabaseStmt(pCxt, &A); } cmd ::= S3MIGRATE DATABASE db_name(A). { pCxt->pRootNode = createS3MigrateDatabaseStmt(pCxt, &A); }
cmd ::= COMPACT DATABASE db_name(A) start_opt(B) end_opt(C). { pCxt->pRootNode = createCompactStmt(pCxt, &A, B, C); } cmd ::= COMPACT DATABASE db_name(A) start_opt(B) end_opt(C) meta_only(D). { pCxt->pRootNode = createCompactStmt(pCxt, &A, B, C, D); }
cmd ::= COMPACT db_name_cond_opt(A) VGROUPS IN NK_LP integer_list(B) NK_RP start_opt(C) end_opt(D). { pCxt->pRootNode = createCompactVgroupsStmt(pCxt, A, B, C, D); } cmd ::= COMPACT db_name_cond_opt(A) VGROUPS IN NK_LP integer_list(B) NK_RP start_opt(C) end_opt(D) meta_only(E). { pCxt->pRootNode = createCompactVgroupsStmt(pCxt, A, B, C, D, E); }
%type meta_only { bool }
%destructor meta_only { }
meta_only(A) ::= . { A = false; }
meta_only(A) ::= META_ONLY. { A = true; }
%type not_exists_opt { bool } %type not_exists_opt { bool }
%destructor not_exists_opt { } %destructor not_exists_opt { }

View File

@ -2212,7 +2212,7 @@ _err:
return NULL; return NULL;
} }
SNode* createCompactStmt(SAstCreateContext* pCxt, SToken* pDbName, SNode* pStart, SNode* pEnd) { SNode* createCompactStmt(SAstCreateContext* pCxt, SToken* pDbName, SNode* pStart, SNode* pEnd, bool metaOnly) {
CHECK_PARSER_STATUS(pCxt); CHECK_PARSER_STATUS(pCxt);
CHECK_NAME(checkDbName(pCxt, pDbName, false)); CHECK_NAME(checkDbName(pCxt, pDbName, false));
SCompactDatabaseStmt* pStmt = NULL; SCompactDatabaseStmt* pStmt = NULL;
@ -2221,6 +2221,7 @@ SNode* createCompactStmt(SAstCreateContext* pCxt, SToken* pDbName, SNode* pStart
COPY_STRING_FORM_ID_TOKEN(pStmt->dbName, pDbName); COPY_STRING_FORM_ID_TOKEN(pStmt->dbName, pDbName);
pStmt->pStart = pStart; pStmt->pStart = pStart;
pStmt->pEnd = pEnd; pStmt->pEnd = pEnd;
pStmt->metaOnly = metaOnly;
return (SNode*)pStmt; return (SNode*)pStmt;
_err: _err:
nodesDestroyNode(pStart); nodesDestroyNode(pStart);
@ -2229,7 +2230,7 @@ _err:
} }
SNode* createCompactVgroupsStmt(SAstCreateContext* pCxt, SNode* pDbName, SNodeList* vgidList, SNode* pStart, SNode* createCompactVgroupsStmt(SAstCreateContext* pCxt, SNode* pDbName, SNodeList* vgidList, SNode* pStart,
SNode* pEnd) { SNode* pEnd, bool metaOnly) {
CHECK_PARSER_STATUS(pCxt); CHECK_PARSER_STATUS(pCxt);
if (NULL == pDbName) { if (NULL == pDbName) {
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "database not specified"); snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "database not specified");
@ -2243,6 +2244,7 @@ SNode* createCompactVgroupsStmt(SAstCreateContext* pCxt, SNode* pDbName, SNodeLi
pStmt->vgidList = vgidList; pStmt->vgidList = vgidList;
pStmt->pStart = pStart; pStmt->pStart = pStart;
pStmt->pEnd = pEnd; pStmt->pEnd = pEnd;
pStmt->metaOnly = metaOnly;
return (SNode*)pStmt; return (SNode*)pStmt;
_err: _err:
nodesDestroyNode(pDbName); nodesDestroyNode(pDbName);

View File

@ -361,7 +361,8 @@ static SKeyword keywordTable[] = {
{"NOTIFY_HISTORY", TK_NOTIFY_HISTORY}, {"NOTIFY_HISTORY", TK_NOTIFY_HISTORY},
{"REGEXP", TK_REGEXP}, {"REGEXP", TK_REGEXP},
{"ASSIGN", TK_ASSIGN}, {"ASSIGN", TK_ASSIGN},
{"TRUE_FOR", TK_TRUE_FOR} {"TRUE_FOR", TK_TRUE_FOR},
{"META_ONLY", TK_META_ONLY}
}; };
// clang-format on // clang-format on

View File

@ -11319,7 +11319,9 @@ static int32_t translateCompactRange(STranslateContext* pCxt, const char* dbName
} }
static int32_t translateCompactDb(STranslateContext* pCxt, SCompactDatabaseStmt* pStmt) { static int32_t translateCompactDb(STranslateContext* pCxt, SCompactDatabaseStmt* pStmt) {
SCompactDbReq compactReq = {0}; SCompactDbReq compactReq = {
.metaOnly = pStmt->metaOnly,
};
SName name; SName name;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
code = tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->dbName, strlen(pStmt->dbName)); code = tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->dbName, strlen(pStmt->dbName));
@ -11388,7 +11390,9 @@ static int32_t translateVgroupList(STranslateContext* pCxt, SNodeList* vgroupLis
static int32_t translateCompactVgroups(STranslateContext* pCxt, SCompactVgroupsStmt* pStmt) { static int32_t translateCompactVgroups(STranslateContext* pCxt, SCompactVgroupsStmt* pStmt) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SName name; SName name;
SCompactDbReq req = {0}; SCompactDbReq req = {
.metaOnly = pStmt->metaOnly,
};
code = tNameSetDbName(&name, pCxt->pParseCxt->acctId, ((SValueNode*)pStmt->pDbName)->literal, code = tNameSetDbName(&name, pCxt->pParseCxt->acctId, ((SValueNode*)pStmt->pDbName)->literal,
strlen(((SValueNode*)pStmt->pDbName)->literal)); strlen(((SValueNode*)pStmt->pDbName)->literal));

View File

@ -1813,3 +1813,6 @@
,,n,develop-test,python3 ./test.py -f 2-query/ts-range.py ,,n,develop-test,python3 ./test.py -f 2-query/ts-range.py
,,n,develop-test,python3 ./test.py -f 2-query/tag_scan.py ,,n,develop-test,python3 ./test.py -f 2-query/tag_scan.py
,,n,develop-test,python3 ./test.py -f 2-query/show_create_db.py ,,n,develop-test,python3 ./test.py -f 2-query/show_create_db.py
# new test
,,y,test_new,./pytest.sh python3 ./test.py -f storage/compact/test_compact_meta.py

View File

@ -22,6 +22,7 @@ from util.log import *
import platform import platform
import ast import ast
class TDCase: class TDCase:
def __init__(self, name, case): def __init__(self, name, case):
self.name = name self.name = name
@ -55,7 +56,8 @@ class TDCases:
with open(file_path, "r", encoding="utf-8") as file: with open(file_path, "r", encoding="utf-8") as file:
tree = ast.parse(file.read(), filename=file_path) tree = ast.parse(file.read(), filename=file_path)
classes = [node.name for node in ast.walk(tree) if isinstance(node, ast.ClassDef)] classes = [node.name for node in ast.walk(
tree) if isinstance(node, ast.ClassDef)]
return classes return classes
def runAllLinux(self, conn): def runAllLinux(self, conn):
@ -123,7 +125,7 @@ class TDCases:
class_names = self.get_local_classes_in_order(fileName) class_names = self.get_local_classes_in_order(fileName)
case_class = getattr(testModule, class_names[-1]) case_class = getattr(testModule, class_names[-1])
case = case_class() case = case_class()
case.init(conn, self._logSql,replicaVar) case.init(conn, self._logSql, replicaVar)
try: try:
case.run() case.run()
except Exception as e: except Exception as e:
@ -201,14 +203,15 @@ class TDCases:
buildPath = tdCases.getTaosBenchmarkPath() buildPath = tdCases.getTaosBenchmarkPath()
if (platform.system().lower() == 'windows'): if (platform.system().lower() == 'windows'):
cmdStr1 = ' mintty -h never %s %s '%(buildPath, param) cmdStr1 = ' mintty -h never %s %s ' % (buildPath, param)
tdLog.info(cmdStr1) tdLog.info(cmdStr1)
os.system(cmdStr1) os.system(cmdStr1)
else: else:
cmdStr1 = '%s %s &'%(buildPath, param) cmdStr1 = '%s %s &' % (buildPath, param)
tdLog.info(cmdStr1) tdLog.info(cmdStr1)
os.system(cmdStr1) os.system(cmdStr1)
time.sleep(5) time.sleep(5)
tdCases = TDCases() tdCases = TDCases()

View File

@ -0,0 +1,248 @@
# tests/test_new/xxx/xxx/test_xxx.py
# import ...
'''
./pytest.sh python3 ./test.py -f storage/compact/test_compact_meta.py
'''
import taos
import sys
from math import inf
from util.dnodes import tdDnodes
from util.sql import *
from util.cases import *
from util.log import *
import inspect
import random
sys.path.append("../tests/pytest")
class TestCompactMeta:
def caseDescription(self):
'''
case1<Hongze Cheng>: [TS-5445] Compact Meta Data
'''
return
def init(self, conn, logSql, replicaVer=1):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), True)
self.conn = conn
def run(self):
self.test_case1()
self.test_case2()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
def test_case1(self):
"""
Description:
1. Alter child table tags
2. Make sure compact meta works
"""
tdLog.info(f'case {inspect.currentframe().f_code.co_name} start')
db_name = 'db1'
stb_name = 'stb1'
ctb_name_prefix = 'ctb'
num_child_tables = 10000
# Drop database
sql = f'drop database if exists {db_name}'
tdSql.execute(sql)
# Create database
sql = f'create database {db_name} vgroups 1'
tdSql.execute(sql)
# Create super table
sql = f'create table {db_name}.{stb_name} (ts timestamp, c1 int, c2 int) tags(t1 int)'
tdSql.execute(sql)
# Create child tables
tdLog.info(
f'case {inspect.currentframe().f_code.co_name}: create {num_child_tables} child tables')
for i in range(1, num_child_tables+1):
if i % 100 == 0:
tdLog.info(
f'case {inspect.currentframe().f_code.co_name}: create {i} child tables')
sql = f'create table {db_name}.{ctb_name_prefix}{i} using {db_name}.{stb_name} tags({i})'
tdSql.execute(sql)
# Insert some data
tdLog.info(
f'case {inspect.currentframe().f_code.co_name}: insert data to child tables')
for i in range(1, num_child_tables+1):
if i % 100 == 0:
tdLog.info(
f'case {inspect.currentframe().f_code.co_name}: insert data to {i} child tables')
sql = f'insert into {db_name}.{ctb_name_prefix}{i} values(now, 1, 2)'
tdSql.execute(sql)
# Alter child table tags
tdLog.info(
f'case {inspect.currentframe().f_code.co_name}: alter child table tags')
for i in range(1, num_child_tables+1):
if i % 100 == 0:
tdLog.info(
f'case {inspect.currentframe().f_code.co_name}: altered {i} child tables')
sql = f'alter table {db_name}.{ctb_name_prefix}{i} set tag t1 = {i+1}'
tdSql.execute(sql)
# Randomly select 100 child tables to do query
tdLog.info(
f'case {inspect.currentframe().f_code.co_name}: randomly select 100 child tables to query')
selected_tables = random.sample(range(1, num_child_tables + 1), 100)
for i, table_idx in enumerate(selected_tables):
# Query data from the child table
sql = f'select count(*) from {db_name}.{stb_name} where t1 = {table_idx + 1}'
tdSql.query(sql)
tdSql.checkData(0, 0, 1) # Check c2 column value
# Compact meta
tdLog.info(
f'case {inspect.currentframe().f_code.co_name}: start to compact meta')
sql = f'compact database {db_name} meta_only'
tdSql.execute(sql)
# Wait for the compact is done
tdLog.info(
f'case {inspect.currentframe().f_code.co_name}: wait compact is done')
while True:
sql = 'show compacts'
rows = tdSql.query(sql)
if rows == 0:
break
time.sleep(1)
# Write more data
tdLog.info(
f'case {inspect.currentframe().f_code.co_name}: insert more data to child tables')
for i in range(1, num_child_tables+1):
if i % 100 == 0:
tdLog.info(
f'case {inspect.currentframe().f_code.co_name}: insert data to {i} child tables')
sql = f'insert into {db_name}.{ctb_name_prefix}{i} values(now, 1, 2)'
tdSql.execute(sql)
# Randomly select 100 child tables to do query
tdLog.info(
f'case {inspect.currentframe().f_code.co_name}: query data again to verify')
for i, table_idx in enumerate(selected_tables):
# Query data from the child table
sql = f'select count(*) from {db_name}.{stb_name} where t1 = {table_idx + 1}'
tdSql.query(sql)
tdSql.checkData(0, 0, 2) # Check c2 column value
def test_case2(self):
"""
Description:
1. Alter super table schema
2. Make sure compact meta works
"""
tdLog.info(f'case {inspect.currentframe().f_code.co_name} start')
db_name = 'db2'
stb_name = 'stb2'
ctb_name_prefix = 'ctb'
num_child_tables = 1000
# Drop database
sql = f'drop database if exists {db_name}'
tdSql.execute(sql)
# Create database
sql = f'create database {db_name} vgroups 1'
tdSql.execute(sql)
# Create super table
sql = f'create table {db_name}.{stb_name} (ts timestamp, c1 int, c2 int) tags(t1 int)'
tdSql.execute(sql)
# Create child tables
tdLog.info(
f'case {inspect.currentframe().f_code.co_name}: create {num_child_tables} child tables')
for i in range(1, num_child_tables+1):
if i % 100 == 0:
tdLog.info(
f'case {inspect.currentframe().f_code.co_name}: create {i} child tables')
sql = f'create table {db_name}.{ctb_name_prefix}{i} using {db_name}.{stb_name} tags({i})'
tdSql.execute(sql)
# Insert some data
tdLog.info(
f'case {inspect.currentframe().f_code.co_name}: insert data to child tables')
for i in range(1, num_child_tables+1):
if i % 100 == 0:
tdLog.info(
f'case {inspect.currentframe().f_code.co_name}: insert data to {i} child tables')
sql = f'insert into {db_name}.{ctb_name_prefix}{i} (ts, c1) values (now, 1)'
tdSql.execute(sql)
# Alter super table schema
tdLog.info(
f'case {inspect.currentframe().f_code.co_name}: alter super table schema')
for i in range(3, 2000):
if i % 100 == 0:
tdLog.info(
f'case {inspect.currentframe().f_code.co_name}: altered {i} times of super table schema')
# Add a column
sql = f'alter table {db_name}.{stb_name} add column c{i} int'
tdSql.execute(sql)
# Drop a column
sql = f'alter table {db_name}.{stb_name} drop column c{i}'
tdSql.execute(sql)
# Randomly select 100 child tables to do query
tdLog.info(
f'case {inspect.currentframe().f_code.co_name}: randomly select 100 child tables to query')
selected_tables = random.sample(range(1, num_child_tables + 1), 100)
for i, table_idx in enumerate(selected_tables):
# Query data from the child table
sql = f'select count(*) from {db_name}.{stb_name} where t1 = {table_idx + 1}'
tdSql.query(sql)
tdSql.checkData(0, 0, 1) # Check c2 column value
# Compact meta
tdLog.info(
f'case {inspect.currentframe().f_code.co_name}: start to compact meta')
sql = f'compact database {db_name} meta_only'
tdSql.execute(sql)
# Wait for the compact is done
tdLog.info(
f'case {inspect.currentframe().f_code.co_name}: wait compact is done')
while True:
sql = 'show compacts'
rows = tdSql.query(sql)
if rows == 0:
break
time.sleep(1)
# Write more data
tdLog.info(
f'case {inspect.currentframe().f_code.co_name}: insert more data to child tables')
for i in range(1, num_child_tables+1):
if i % 100 == 0:
tdLog.info(
f'case {inspect.currentframe().f_code.co_name}: insert data to {i} child tables')
sql = f'insert into {db_name}.{ctb_name_prefix}{i} values(now, 1, 2)'
tdSql.execute(sql)
# Randomly select 100 child tables to do query
tdLog.info(
f'case {inspect.currentframe().f_code.co_name}: query data again to verify')
for i, table_idx in enumerate(selected_tables):
# Query data from the child table
sql = f'select count(*) from {db_name}.{stb_name} where t1 = {table_idx + 1}'
tdSql.query(sql)
tdSql.checkData(0, 0, 2) # Check c2 column value
tdCases.addWindows(__file__, TestCompactMeta())
tdCases.addLinux(__file__, TestCompactMeta())

View File

@ -24,7 +24,6 @@ import platform
import socket import socket
import threading import threading
import importlib import importlib
import ast
print(f"Python version: {sys.version}") print(f"Python version: {sys.version}")
print(f"Version info: {sys.version_info}") print(f"Version info: {sys.version_info}")
@ -40,14 +39,15 @@ import taos
import taosrest import taosrest
import taosws import taosws
def checkRunTimeError(): def checkRunTimeError():
import win32gui import win32gui
timeCount = 0 timeCount = 0
while 1: while 1:
time.sleep(1) time.sleep(1)
timeCount = timeCount + 1 timeCount = timeCount + 1
print("checkRunTimeError",timeCount) print("checkRunTimeError", timeCount)
if (timeCount>1200): if (timeCount > 1200):
print("stop the test.") print("stop the test.")
os.system("TASKKILL /F /IM taosd.exe") os.system("TASKKILL /F /IM taosd.exe")
os.system("TASKKILL /F /IM taos.exe") os.system("TASKKILL /F /IM taos.exe")
@ -55,15 +55,18 @@ def checkRunTimeError():
os.system("TASKKILL /F /IM mintty.exe") os.system("TASKKILL /F /IM mintty.exe")
os.system("TASKKILL /F /IM python.exe") os.system("TASKKILL /F /IM python.exe")
quit(0) quit(0)
hwnd = win32gui.FindWindow(None, "Microsoft Visual C++ Runtime Library") hwnd = win32gui.FindWindow(
None, "Microsoft Visual C++ Runtime Library")
if hwnd: if hwnd:
os.system("TASKKILL /F /IM taosd.exe") os.system("TASKKILL /F /IM taosd.exe")
def get_local_classes_in_order(file_path): def get_local_classes_in_order(file_path):
with open(file_path, "r", encoding="utf-8") as file: with open(file_path, "r", encoding="utf-8") as file:
tree = ast.parse(file.read(), filename=file_path) tree = ast.parse(file.read(), filename=file_path)
classes = [node.name for node in ast.walk(tree) if isinstance(node, ast.ClassDef)] classes = [node.name for node in ast.walk(
tree) if isinstance(node, ast.ClassDef)]
return classes return classes
@ -74,6 +77,8 @@ def dynamicLoadModule(fileName):
# #
# run case on previous cluster # run case on previous cluster
# #
def runOnPreviousCluster(host, config, fileName): def runOnPreviousCluster(host, config, fileName):
print("enter run on previeous") print("enter run on previeous")
@ -130,7 +135,7 @@ if __name__ == "__main__":
previousCluster = False previousCluster = False
crashGen = False crashGen = False
opts, args = getopt.gnu_getopt(sys.argv[1:], 'f:p:m:l:scghrd:k:e:N:M:Q:C:RWD:n:i:aP:G', [ opts, args = getopt.gnu_getopt(sys.argv[1:], 'f:p:m:l:scghrd:k:e:N:M:Q:C:RWD:n:i:aP:G', [
'file=', 'path=', 'master', 'logSql', 'stop', 'cluster', 'valgrind', 'help', 'restart', 'updateCfgDict', 'killv', 'execCmd','dnodeNums','mnodeNums','queryPolicy','createDnodeNums','restful','websocket','adaptercfgupdate','replicaVar','independentMnode','previous',"crashGen"]) 'file=', 'path=', 'master', 'logSql', 'stop', 'cluster', 'valgrind', 'help', 'restart', 'updateCfgDict', 'killv', 'execCmd', 'dnodeNums', 'mnodeNums', 'queryPolicy', 'createDnodeNums', 'restful', 'websocket', 'adaptercfgupdate', 'replicaVar', 'independentMnode', 'previous', "crashGen"])
for key, value in opts: for key, value in opts:
if key in ['-h', '--help']: if key in ['-h', '--help']:
tdLog.printNoPrefix( tdLog.printNoPrefix(
@ -156,7 +161,8 @@ if __name__ == "__main__":
tdLog.printNoPrefix('-n the number of replicas') tdLog.printNoPrefix('-n the number of replicas')
tdLog.printNoPrefix('-i independentMnode Mnode') tdLog.printNoPrefix('-i independentMnode Mnode')
tdLog.printNoPrefix('-a address sanitizer mode') tdLog.printNoPrefix('-a address sanitizer mode')
tdLog.printNoPrefix('-P run case with [P]revious cluster, do not create new cluster to run case.') tdLog.printNoPrefix(
'-P run case with [P]revious cluster, do not create new cluster to run case.')
tdLog.printNoPrefix('-G crashGen mode') tdLog.printNoPrefix('-G crashGen mode')
sys.exit(0) sys.exit(0)
@ -234,7 +240,8 @@ if __name__ == "__main__":
if key in ['-D', '--adaptercfgupdate']: if key in ['-D', '--adaptercfgupdate']:
try: try:
adaptercfgupdate = eval(base64.b64decode(value.encode()).decode()) adaptercfgupdate = eval(
base64.b64decode(value.encode()).decode())
except: except:
print('adapter cfg update convert fail.') print('adapter cfg update convert fail.')
sys.exit(0) sys.exit(0)
@ -248,7 +255,6 @@ if __name__ == "__main__":
if key in ['-G', '--crashGen']: if key in ['-G', '--crashGen']:
crashGen = True crashGen = True
# #
# do exeCmd command # do exeCmd command
# #
@ -275,7 +281,7 @@ if __name__ == "__main__":
psCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}'" % toBeKilled psCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}'" % toBeKilled
processID = subprocess.check_output(psCmd, shell=True) processID = subprocess.check_output(psCmd, shell=True)
while(processID): while (processID):
os.system(killCmd) os.system(killCmd)
time.sleep(1) time.sleep(1)
processID = subprocess.check_output(psCmd, shell=True) processID = subprocess.check_output(psCmd, shell=True)
@ -302,7 +308,7 @@ if __name__ == "__main__":
# psCmd = f"pgrep {toBeKilled}" # psCmd = f"pgrep {toBeKilled}"
processID = subprocess.check_output(psCmd, shell=True) processID = subprocess.check_output(psCmd, shell=True)
while(processID): while (processID):
os.system(killCmd) os.system(killCmd)
time.sleep(1) time.sleep(1)
processID = subprocess.check_output(psCmd, shell=True) processID = subprocess.check_output(psCmd, shell=True)
@ -349,7 +355,7 @@ if __name__ == "__main__":
if platform.system().lower() == 'windows': if platform.system().lower() == 'windows':
fileName = fileName.replace("/", os.sep) fileName = fileName.replace("/", os.sep)
if (masterIp == "" and not fileName == "0-others\\udf_create.py"): if (masterIp == "" and not fileName == "0-others\\udf_create.py"):
threading.Thread(target=checkRunTimeError,daemon=True).start() threading.Thread(target=checkRunTimeError, daemon=True).start()
tdLog.info("Procedures for testing self-deployment") tdLog.info("Procedures for testing self-deployment")
tdDnodes.init(deployPath, masterIp) tdDnodes.init(deployPath, masterIp)
tdDnodes.setTestCluster(testCluster) tdDnodes.setTestCluster(testCluster)
@ -372,7 +378,8 @@ if __name__ == "__main__":
ucase = case_class() ucase = case_class()
if ((json.dumps(updateCfgDict) == '{}') and hasattr(ucase, 'updatecfgDict')): if ((json.dumps(updateCfgDict) == '{}') and hasattr(ucase, 'updatecfgDict')):
updateCfgDict = ucase.updatecfgDict updateCfgDict = ucase.updatecfgDict
updateCfgDictStr = "-d %s"%base64.b64encode(json.dumps(updateCfgDict).encode()).decode() updateCfgDictStr = "-d %s" % base64.b64encode(
json.dumps(updateCfgDict).encode()).decode()
if ((json.dumps(adapter_cfg_dict) == '{}') and hasattr(ucase, 'taosadapter_cfg_dict')): if ((json.dumps(adapter_cfg_dict) == '{}') and hasattr(ucase, 'taosadapter_cfg_dict')):
adapter_cfg_dict = ucase.taosadapter_cfg_dict adapter_cfg_dict = ucase.taosadapter_cfg_dict
# adapter_cfg_dict_str = f"-D {base64.b64encode(toml.dumps(adapter_cfg_dict).encode()).decode()}" # adapter_cfg_dict_str = f"-D {base64.b64encode(toml.dumps(adapter_cfg_dict).encode()).decode()}"
@ -384,8 +391,8 @@ if __name__ == "__main__":
tAdapter.init(deployPath, masterIp) tAdapter.init(deployPath, masterIp)
tAdapter.stop(force_kill=True) tAdapter.stop(force_kill=True)
if dnodeNums == 1 : if dnodeNums == 1:
tdDnodes.deploy(1,updateCfgDict) tdDnodes.deploy(1, updateCfgDict)
tdDnodes.start(1) tdDnodes.start(1)
tdCases.logSql(logSql) tdCases.logSql(logSql)
if restful or websocket: if restful or websocket:
@ -393,13 +400,15 @@ if __name__ == "__main__":
tAdapter.start() tAdapter.start()
if queryPolicy != 1: if queryPolicy != 1:
queryPolicy=int(queryPolicy) queryPolicy = int(queryPolicy)
if restful: if restful:
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc") conn = taosrest.connect(
url=f"http://{host}:6041", timezone="utc")
elif websocket: elif websocket:
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041") conn = taosws.connect(
f"taosws://root:taosdata@{host}:6041")
else: else:
conn = taos.connect(host,config=tdDnodes.getSimCfgPath()) conn = taos.connect(host, config=tdDnodes.getSimCfgPath())
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("create qnode on dnode 1") cursor.execute("create qnode on dnode 1")
@ -407,16 +416,20 @@ if __name__ == "__main__":
cursor.execute("show local variables") cursor.execute("show local variables")
res = cursor.fetchall() res = cursor.fetchall()
for i in range(cursor.rowcount): for i in range(cursor.rowcount):
if res[i][0] == "queryPolicy" : if res[i][0] == "queryPolicy":
if int(res[i][1]) == int(queryPolicy): if int(res[i][1]) == int(queryPolicy):
tdLog.info(f'alter queryPolicy to {queryPolicy} successfully') tdLog.info(
f'alter queryPolicy to {queryPolicy} successfully')
cursor.close() cursor.close()
else: else:
tdLog.debug(res) tdLog.debug(res)
tdLog.exit(f"alter queryPolicy to {queryPolicy} failed") tdLog.exit(
else : f"alter queryPolicy to {queryPolicy} failed")
tdLog.debug("create an cluster with %s nodes and make %s dnode as independent mnode"%(dnodeNums,mnodeNums)) else:
dnodeslist = cluster.configure_cluster(dnodeNums=dnodeNums, mnodeNums=mnodeNums, independentMnode=independentMnode) tdLog.debug("create an cluster with %s nodes and make %s dnode as independent mnode" % (
dnodeNums, mnodeNums))
dnodeslist = cluster.configure_cluster(
dnodeNums=dnodeNums, mnodeNums=mnodeNums, independentMnode=independentMnode)
tdDnodes = ClusterDnodes(dnodeslist) tdDnodes = ClusterDnodes(dnodeslist)
tdDnodes.init(deployPath, masterIp) tdDnodes.init(deployPath, masterIp)
tdDnodes.setTestCluster(testCluster) tdDnodes.setTestCluster(testCluster)
@ -433,31 +446,34 @@ if __name__ == "__main__":
tAdapter.start() tAdapter.start()
if restful: if restful:
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc") conn = taosrest.connect(
url=f"http://{host}:6041", timezone="utc")
elif websocket: elif websocket:
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041") conn = taosws.connect(f"taosws://root:taosdata@{host}:6041")
else: else:
conn = taos.connect(host,config=tdDnodes.getSimCfgPath()) conn = taos.connect(host, config=tdDnodes.getSimCfgPath())
# tdLog.info(tdDnodes.getSimCfgPath(),host) # tdLog.info(tdDnodes.getSimCfgPath(),host)
if createDnodeNums == 1: if createDnodeNums == 1:
createDnodeNums=dnodeNums createDnodeNums = dnodeNums
else: else:
createDnodeNums=createDnodeNums createDnodeNums = createDnodeNums
cluster.create_dnode(conn,createDnodeNums) cluster.create_dnode(conn, createDnodeNums)
cluster.create_mnode(conn,mnodeNums) cluster.create_mnode(conn, mnodeNums)
try: try:
if cluster.check_dnode(conn) : if cluster.check_dnode(conn):
print("check dnode ready") print("check dnode ready")
except Exception as r: except Exception as r:
print(r) print(r)
if queryPolicy != 1: if queryPolicy != 1:
queryPolicy=int(queryPolicy) queryPolicy = int(queryPolicy)
if restful: if restful:
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc") conn = taosrest.connect(
url=f"http://{host}:6041", timezone="utc")
elif websocket: elif websocket:
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041") conn = taosws.connect(
f"taosws://root:taosdata@{host}:6041")
else: else:
conn = taos.connect(host,config=tdDnodes.getSimCfgPath()) conn = taos.connect(host, config=tdDnodes.getSimCfgPath())
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("create qnode on dnode 1") cursor.execute("create qnode on dnode 1")
@ -465,23 +481,27 @@ if __name__ == "__main__":
cursor.execute("show local variables") cursor.execute("show local variables")
res = cursor.fetchall() res = cursor.fetchall()
for i in range(cursor.rowcount): for i in range(cursor.rowcount):
if res[i][0] == "queryPolicy" : if res[i][0] == "queryPolicy":
if int(res[i][1]) == int(queryPolicy): if int(res[i][1]) == int(queryPolicy):
tdLog.info(f'alter queryPolicy to {queryPolicy} successfully') tdLog.info(
f'alter queryPolicy to {queryPolicy} successfully')
cursor.close() cursor.close()
else: else:
tdLog.debug(res) tdLog.debug(res)
tdLog.exit(f"alter queryPolicy to {queryPolicy} failed") tdLog.exit(
f"alter queryPolicy to {queryPolicy} failed")
if ucase is not None and hasattr(ucase, 'noConn') and ucase.noConn == True: if ucase is not None and hasattr(ucase, 'noConn') and ucase.noConn == True:
conn = None conn = None
else: else:
if restful: if restful:
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc") conn = taosrest.connect(
url=f"http://{host}:6041", timezone="utc")
elif websocket: elif websocket:
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041") conn = taosws.connect(f"taosws://root:taosdata@{host}:6041")
else: else:
conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath()) conn = taos.connect(
host=f"{host}", config=tdDnodes.getSimCfgPath())
if testCluster: if testCluster:
tdLog.info("Procedures for testing cluster") tdLog.info("Procedures for testing cluster")
@ -492,11 +512,13 @@ if __name__ == "__main__":
else: else:
tdLog.info("Procedures for testing self-deployment") tdLog.info("Procedures for testing self-deployment")
if restful: if restful:
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc") conn = taosrest.connect(
url=f"http://{host}:6041", timezone="utc")
elif websocket: elif websocket:
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041") conn = taosws.connect(f"taosws://root:taosdata@{host}:6041")
else: else:
conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath()) conn = taos.connect(
host=f"{host}", config=tdDnodes.getSimCfgPath())
if fileName == "all": if fileName == "all":
tdCases.runAllWindows(conn) tdCases.runAllWindows(conn)
@ -513,14 +535,19 @@ if __name__ == "__main__":
tdDnodes.start(1) tdDnodes.start(1)
time.sleep(1) time.sleep(1)
if restful: if restful:
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc") conn = taosrest.connect(
url=f"http://{host}:6041", timezone="utc")
elif websocket: elif websocket:
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041") conn = taosws.connect(
f"taosws://root:taosdata@{host}:6041")
else: else:
conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath()) conn = taos.connect(
tdLog.info("Procedures for tdengine deployed in %s" % (host)) host=f"{host}", config=tdDnodes.getSimCfgPath())
tdLog.info(
"Procedures for tdengine deployed in %s" % (host))
tdLog.info("query test after taosd restart") tdLog.info("query test after taosd restart")
tdCases.runOneWindows(conn, sp[0] + "_" + "restart.py", replicaVar) tdCases.runOneWindows(
conn, sp[0] + "_" + "restart.py", replicaVar)
else: else:
tdLog.info("not need to query") tdLog.info("not need to query")
else: else:
@ -554,9 +581,9 @@ if __name__ == "__main__":
tAdapter.init(deployPath, masterIp) tAdapter.init(deployPath, masterIp)
tAdapter.stop(force_kill=True) tAdapter.stop(force_kill=True)
if dnodeNums == 1 : if dnodeNums == 1:
# dnode is one # dnode is one
tdDnodes.deploy(1,updateCfgDict) tdDnodes.deploy(1, updateCfgDict)
tdDnodes.start(1) tdDnodes.start(1)
tdCases.logSql(logSql) tdCases.logSql(logSql)
@ -565,13 +592,16 @@ if __name__ == "__main__":
tAdapter.start() tAdapter.start()
if queryPolicy != 1: if queryPolicy != 1:
queryPolicy=int(queryPolicy) queryPolicy = int(queryPolicy)
if restful: if restful:
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc") conn = taosrest.connect(
url=f"http://{host}:6041", timezone="utc")
elif websocket: elif websocket:
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041") conn = taosws.connect(
f"taosws://root:taosdata@{host}:6041")
else: else:
conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath()) conn = taos.connect(
host=f"{host}", config=tdDnodes.getSimCfgPath())
# tdSql.init(conn.cursor()) # tdSql.init(conn.cursor())
# tdSql.execute("create qnode on dnode 1") # tdSql.execute("create qnode on dnode 1")
# tdSql.execute('alter local "queryPolicy" "%d"'%queryPolicy) # tdSql.execute('alter local "queryPolicy" "%d"'%queryPolicy)
@ -590,19 +620,23 @@ if __name__ == "__main__":
cursor.execute("show local variables") cursor.execute("show local variables")
res = cursor.fetchall() res = cursor.fetchall()
for i in range(cursor.rowcount): for i in range(cursor.rowcount):
if res[i][0] == "queryPolicy" : if res[i][0] == "queryPolicy":
if int(res[i][1]) == int(queryPolicy): if int(res[i][1]) == int(queryPolicy):
tdLog.info(f'alter queryPolicy to {queryPolicy} successfully') tdLog.info(
f'alter queryPolicy to {queryPolicy} successfully')
cursor.close() cursor.close()
else: else:
tdLog.debug(res) tdLog.debug(res)
tdLog.exit(f"alter queryPolicy to {queryPolicy} failed") tdLog.exit(
f"alter queryPolicy to {queryPolicy} failed")
else : else:
# dnode > 1 cluster # dnode > 1 cluster
tdLog.debug("create an cluster with %s nodes and make %s dnode as independent mnode"%(dnodeNums,mnodeNums)) tdLog.debug("create an cluster with %s nodes and make %s dnode as independent mnode" % (
print(independentMnode,"independentMnode valuse") dnodeNums, mnodeNums))
dnodeslist = cluster.configure_cluster(dnodeNums=dnodeNums, mnodeNums=mnodeNums, independentMnode=independentMnode) print(independentMnode, "independentMnode valuse")
dnodeslist = cluster.configure_cluster(
dnodeNums=dnodeNums, mnodeNums=mnodeNums, independentMnode=independentMnode)
tdDnodes = ClusterDnodes(dnodeslist) tdDnodes = ClusterDnodes(dnodeslist)
tdDnodes.init(deployPath, masterIp) tdDnodes.init(deployPath, masterIp)
tdDnodes.setTestCluster(testCluster) tdDnodes.setTestCluster(testCluster)
@ -610,7 +644,7 @@ if __name__ == "__main__":
tdDnodes.setAsan(asan) tdDnodes.setAsan(asan)
tdDnodes.stopAll() tdDnodes.stopAll()
for dnode in tdDnodes.dnodes: for dnode in tdDnodes.dnodes:
tdDnodes.deploy(dnode.index,updateCfgDict) tdDnodes.deploy(dnode.index, updateCfgDict)
for dnode in tdDnodes.dnodes: for dnode in tdDnodes.dnodes:
tdDnodes.starttaosd(dnode.index) tdDnodes.starttaosd(dnode.index)
tdCases.logSql(logSql) tdCases.logSql(logSql)
@ -621,34 +655,39 @@ if __name__ == "__main__":
# create taos connect # create taos connect
if restful: if restful:
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc") conn = taosrest.connect(
url=f"http://{host}:6041", timezone="utc")
elif websocket: elif websocket:
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041") conn = taosws.connect(f"taosws://root:taosdata@{host}:6041")
else: else:
conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath()) conn = taos.connect(
print(tdDnodes.getSimCfgPath(),host) host=f"{host}", config=tdDnodes.getSimCfgPath())
print(tdDnodes.getSimCfgPath(), host)
if createDnodeNums == 1: if createDnodeNums == 1:
createDnodeNums=dnodeNums createDnodeNums = dnodeNums
else: else:
createDnodeNums=createDnodeNums createDnodeNums = createDnodeNums
cluster.create_dnode(conn,createDnodeNums) cluster.create_dnode(conn, createDnodeNums)
cluster.create_mnode(conn,mnodeNums) cluster.create_mnode(conn, mnodeNums)
try: try:
if cluster.check_dnode(conn) : if cluster.check_dnode(conn):
print("check dnode ready") print("check dnode ready")
except Exception as r: except Exception as r:
print(r) print(r)
# do queryPolicy option # do queryPolicy option
if queryPolicy != 1: if queryPolicy != 1:
queryPolicy=int(queryPolicy) queryPolicy = int(queryPolicy)
if restful: if restful:
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc") conn = taosrest.connect(
url=f"http://{host}:6041", timezone="utc")
elif websocket: elif websocket:
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041") conn = taosws.connect(
f"taosws://root:taosdata@{host}:6041")
else: else:
conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath()) conn = taos.connect(
host=f"{host}", config=tdDnodes.getSimCfgPath())
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("create qnode on dnode 1") cursor.execute("create qnode on dnode 1")
@ -656,14 +695,15 @@ if __name__ == "__main__":
cursor.execute("show local variables") cursor.execute("show local variables")
res = cursor.fetchall() res = cursor.fetchall()
for i in range(cursor.rowcount): for i in range(cursor.rowcount):
if res[i][0] == "queryPolicy" : if res[i][0] == "queryPolicy":
if int(res[i][1]) == int(queryPolicy): if int(res[i][1]) == int(queryPolicy):
tdLog.info(f'alter queryPolicy to {queryPolicy} successfully') tdLog.info(
f'alter queryPolicy to {queryPolicy} successfully')
cursor.close() cursor.close()
else: else:
tdLog.debug(res) tdLog.debug(res)
tdLog.exit(f"alter queryPolicy to {queryPolicy} failed") tdLog.exit(
f"alter queryPolicy to {queryPolicy} failed")
# run case # run case
if testCluster: if testCluster:
@ -675,11 +715,13 @@ if __name__ == "__main__":
else: else:
tdLog.info("Procedures for testing self-deployment") tdLog.info("Procedures for testing self-deployment")
if restful: if restful:
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc") conn = taosrest.connect(
url=f"http://{host}:6041", timezone="utc")
elif websocket: elif websocket:
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041") conn = taosws.connect(f"taosws://root:taosdata@{host}:6041")
else: else:
conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath()) conn = taos.connect(
host=f"{host}", config=tdDnodes.getSimCfgPath())
if fileName == "all": if fileName == "all":
tdCases.runAllLinux(conn) tdCases.runAllLinux(conn)
@ -697,14 +739,19 @@ if __name__ == "__main__":
tdDnodes.start(1) tdDnodes.start(1)
time.sleep(1) time.sleep(1)
if restful: if restful:
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc") conn = taosrest.connect(
url=f"http://{host}:6041", timezone="utc")
elif websocket: elif websocket:
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041") conn = taosws.connect(
f"taosws://root:taosdata@{host}:6041")
else: else:
conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath()) conn = taos.connect(
tdLog.info("Procedures for tdengine deployed in %s" % (host)) host=f"{host}", config=tdDnodes.getSimCfgPath())
tdLog.info(
"Procedures for tdengine deployed in %s" % (host))
tdLog.info("query test after taosd restart") tdLog.info("query test after taosd restart")
tdCases.runOneLinux(conn, sp[0] + "_" + "restart.py", replicaVar) tdCases.runOneLinux(
conn, sp[0] + "_" + "restart.py", replicaVar)
else: else:
tdLog.info("not need to query") tdLog.info("not need to query")