Merge branch '3.0' into merge/mainto3.0
This commit is contained in:
commit
7326e3d8bc
|
@ -20,9 +20,9 @@ if(${BUILD_WITH_SQLITE})
|
|||
add_subdirectory(sqlite)
|
||||
endif(${BUILD_WITH_SQLITE})
|
||||
|
||||
if(${BUILD_S3})
|
||||
add_subdirectory(azure)
|
||||
endif()
|
||||
# if(${BUILD_S3})
|
||||
# add_subdirectory(azure)
|
||||
# endif()
|
||||
|
||||
add_subdirectory(tdev)
|
||||
add_subdirectory(lz4)
|
||||
|
|
|
@ -16,8 +16,8 @@ TDengine is designed for various writing scenarios, and many of these scenarios
|
|||
### Syntax
|
||||
|
||||
```sql
|
||||
COMPACT DATABASE db_name [start with 'XXXX'] [end with 'YYYY'];
|
||||
COMPACT [db_name.]VGROUPS IN (vgroup_id1, vgroup_id2, ...) [start with 'XXXX'] [end with 'YYYY'];
|
||||
COMPACT DATABASE db_name [start with 'XXXX'] [end with 'YYYY'] [META_ONLY];
|
||||
COMPACT [db_name.]VGROUPS IN (vgroup_id1, vgroup_id2, ...) [start with 'XXXX'] [end with 'YYYY'] [META_ONLY];
|
||||
SHOW COMPACTS;
|
||||
SHOW COMPACT compact_id;
|
||||
KILL COMPACT compact_id;
|
||||
|
@ -30,6 +30,7 @@ KILL COMPACT compact_id;
|
|||
- COMPACT will merge multiple STT files
|
||||
- You can specify the start time of the COMPACT data with the start with keyword
|
||||
- You can specify the end time of the COMPACT data with the end with keyword
|
||||
- You can specify the META_ONLY keyword to only compact the meta data which are not compacted by default
|
||||
- The COMPACT command will return the ID of the COMPACT task
|
||||
- COMPACT tasks are executed asynchronously in the background, and you can view the progress of COMPACT tasks using the SHOW COMPACTS command
|
||||
- The SHOW command will return the ID of the COMPACT task, and you can terminate the COMPACT task using the KILL COMPACT command
|
||||
|
|
|
@ -23,11 +23,11 @@ The list of keywords is as follows:
|
|||
| ALIVE | |
|
||||
| ALL | |
|
||||
| ALTER | |
|
||||
| ANALYZE | Version 3.3.4.3 and later |
|
||||
| ANALYZE | 3.3.4.3+ |
|
||||
| AND | |
|
||||
| ANODE | Version 3.3.4.3 and later |
|
||||
| ANODES | Version 3.3.4.3 and later |
|
||||
| ANOMALY_WINDOW | Version 3.3.4.3 and later |
|
||||
| ANODE | 3.3.4.3+ |
|
||||
| ANODES | 3.3.4.3+ |
|
||||
| ANOMALY_WINDOW | 3.3.4.3+ |
|
||||
| ANTI | |
|
||||
| APPS | |
|
||||
| ARBGROUPS | |
|
||||
|
@ -37,7 +37,8 @@ The list of keywords is as follows:
|
|||
| ASOF | |
|
||||
| AT_ONCE | |
|
||||
| ATTACH | |
|
||||
| ASSIGN | Version 3.3.6.0 and later |
|
||||
| AUTO | 3.3.5.0+ |
|
||||
| ASSIGN | 3.3.6.0+ |
|
||||
|
||||
### B
|
||||
|
||||
|
@ -79,12 +80,16 @@ The list of keywords is as follows:
|
|||
| CLIENT_VERSION | |
|
||||
| CLUSTER | |
|
||||
| COLON | |
|
||||
| COLS | 3.3.6.0+ |
|
||||
| COLUMN | |
|
||||
| COMMA | |
|
||||
| COMMENT | |
|
||||
| COMP | |
|
||||
| COMPACT | |
|
||||
| COMPACTS | |
|
||||
| COMPACT_INTERVAL | 3.3.5.0+ |
|
||||
| COMPACT_TIME_OFFSET | 3.3.5.0+ |
|
||||
| COMPACT_TIME_RANGE | 3.3.5.0+ |
|
||||
| CONCAT | |
|
||||
| CONFLICT | |
|
||||
| CONNECTION | |
|
||||
|
@ -115,6 +120,7 @@ The list of keywords is as follows:
|
|||
| DESC | |
|
||||
| DESCRIBE | |
|
||||
| DETACH | |
|
||||
| DISK_INFO | 3.3.5.0+ |
|
||||
| DISTINCT | |
|
||||
| DISTRIBUTED | |
|
||||
| DIVIDE | |
|
||||
|
@ -149,19 +155,19 @@ The list of keywords is as follows:
|
|||
|Keyword|Description|
|
||||
|----------------------|-|
|
||||
| FAIL | |
|
||||
| FHIGH | Version 3.3.4.3 and later |
|
||||
| FHIGH | 3.3.4.3+ |
|
||||
| FILE | |
|
||||
| FILL | |
|
||||
| FILL_HISTORY | |
|
||||
| FIRST | |
|
||||
| FLOAT | |
|
||||
| FLOW | Version 3.3.4.3 and later |
|
||||
| FLOW | 3.3.4.3+ |
|
||||
| FLUSH | |
|
||||
| FOR | |
|
||||
| FORCE | |
|
||||
| FORCE_WINDOW_CLOSE | Version 3.3.4.3 and later |
|
||||
| FORCE_WINDOW_CLOSE | 3.3.4.3+ |
|
||||
| FROM | |
|
||||
| FROWTS | Version 3.3.4.3 and later |
|
||||
| FROWTS | 3.3.4.3+ |
|
||||
| FULL | |
|
||||
| FUNCTION | |
|
||||
| FUNCTIONS | |
|
||||
|
@ -210,6 +216,7 @@ The list of keywords is as follows:
|
|||
| INTO | |
|
||||
| IPTOKEN | |
|
||||
| IROWTS | |
|
||||
| IROWTS_ORIGIN | 3.3.5.0+ |
|
||||
| IS | |
|
||||
| IS_IMPORT | |
|
||||
| ISFILLED | |
|
||||
|
@ -243,6 +250,7 @@ The list of keywords is as follows:
|
|||
| LEADER | |
|
||||
| LEADING | |
|
||||
| LEFT | |
|
||||
| LEVEL | 3.3.0.0 - 3.3.2.11 |
|
||||
| LICENCES | |
|
||||
| LIKE | |
|
||||
| LIMIT | |
|
||||
|
@ -264,6 +272,7 @@ The list of keywords is as follows:
|
|||
| MEDIUMBLOB | |
|
||||
| MERGE | |
|
||||
| META | |
|
||||
| META_ONLY | 3.3.6.0+ |
|
||||
| MINROWS | |
|
||||
| MINUS | |
|
||||
| MNODE | |
|
||||
|
@ -282,6 +291,8 @@ The list of keywords is as follows:
|
|||
| NONE | |
|
||||
| NORMAL | |
|
||||
| NOT | |
|
||||
| NOTIFY | 3.3.6.0+ |
|
||||
| NOTIFY_HISTORY | 3.3.6.0+ |
|
||||
| NOTNULL | |
|
||||
| NOW | |
|
||||
| NULL | |
|
||||
|
@ -296,6 +307,7 @@ The list of keywords is as follows:
|
|||
| OFFSET | |
|
||||
| ON | |
|
||||
| ONLY | |
|
||||
| ON_FAILURE | 3.3.6.0+ |
|
||||
| OR | |
|
||||
| ORDER | |
|
||||
| OUTER | |
|
||||
|
@ -346,6 +358,7 @@ The list of keywords is as follows:
|
|||
| RATIO | |
|
||||
| READ | |
|
||||
| RECURSIVE | |
|
||||
| REGEXP | 3.3.6.0+ |
|
||||
| REDISTRIBUTE | |
|
||||
| REM | |
|
||||
| REPLACE | |
|
||||
|
@ -419,7 +432,7 @@ The list of keywords is as follows:
|
|||
| TABLE_PREFIX | |
|
||||
| TABLE_SUFFIX | |
|
||||
| TABLES | |
|
||||
| tag | |
|
||||
| TAG | |
|
||||
| TAGS | |
|
||||
| TBNAME | |
|
||||
| THEN | |
|
||||
|
@ -436,6 +449,7 @@ The list of keywords is as follows:
|
|||
| TRANSACTIONS | |
|
||||
| TRIGGER | |
|
||||
| TRIM | |
|
||||
| TRUE_FOR | 3.3.6.0+ |
|
||||
| TSDB_PAGESIZE | |
|
||||
| TSERIES | |
|
||||
| TSMA | |
|
||||
|
|
|
@ -17,8 +17,8 @@ TDengine 面向多种写入场景,而很多写入场景下,TDengine 的存
|
|||
### 语法
|
||||
|
||||
```SQL
|
||||
COMPACT DATABASE db_name [start with 'XXXX'] [end with 'YYYY'];
|
||||
COMPACT [db_name.]VGROUPS IN (vgroup_id1, vgroup_id2, ...) [start with 'XXXX'] [end with 'YYYY'];
|
||||
COMPACT DATABASE db_name [start with 'XXXX'] [end with 'YYYY'] [META_ONLY];
|
||||
COMPACT [db_name.]VGROUPS IN (vgroup_id1, vgroup_id2, ...) [start with 'XXXX'] [end with 'YYYY'] [META_ONLY];
|
||||
SHOW COMPACTS;
|
||||
SHOW COMPACT compact_id;
|
||||
KILL COMPACT compact_id;
|
||||
|
@ -32,6 +32,7 @@ KILL COMPACT compact_id;
|
|||
- COMPACT 会合并多个 STT 文件
|
||||
- 可通过 start with 关键字指定 COMPACT 数据的起始时间
|
||||
- 可通过 end with 关键字指定 COMPACT 数据的终止时间
|
||||
- 可通过 `META_ONLY` 关键字指定只 compact 元数据。元数据默认情况下不会 compact。
|
||||
- COMPACT 命令会返回 COMPACT 任务的 ID
|
||||
- COMPACT 任务会在后台异步执行,可以通过 SHOW COMPACTS 命令查看 COMPACT 任务的进度
|
||||
- SHOW 命令会返回 COMPACT 任务的 ID,可以通过 KILL COMPACT 命令终止 COMPACT 任务
|
||||
|
|
|
@ -23,11 +23,11 @@ description: TDengine 保留关键字的详细列表
|
|||
| ALIVE | |
|
||||
| ALL | |
|
||||
| ALTER | |
|
||||
| ANALYZE | 3.3.4.3 及后续版本 |
|
||||
| ANALYZE | 3.3.4.3+ |
|
||||
| AND | |
|
||||
| ANODE | 3.3.4.3 及后续版本 |
|
||||
| ANODES | 3.3.4.3 及后续版本 |
|
||||
| ANOMALY_WINDOW | 3.3.4.3 及后续版本 |
|
||||
| ANODE | 3.3.4.3+ |
|
||||
| ANODES | 3.3.4.3+ |
|
||||
| ANOMALY_WINDOW | 3.3.4.3+ |
|
||||
| ANTI | |
|
||||
| APPS | |
|
||||
| ARBGROUPS | |
|
||||
|
@ -37,8 +37,8 @@ description: TDengine 保留关键字的详细列表
|
|||
| ASOF | |
|
||||
| AT_ONCE | |
|
||||
| ATTACH | |
|
||||
| AUTO | 3.3.5.0 及后续版本 |
|
||||
| ASSIGN | 3.3.6.0 及后续版本 |
|
||||
| AUTO | 3.3.5.0+ |
|
||||
| ASSIGN | 3.3.6.0+ |
|
||||
|
||||
### B
|
||||
|关键字|说明|
|
||||
|
@ -78,16 +78,16 @@ description: TDengine 保留关键字的详细列表
|
|||
| CLIENT_VERSION | |
|
||||
| CLUSTER | |
|
||||
| COLON | |
|
||||
| COLS | 3.3.6.0 及后续版本 |
|
||||
| COLS | 3.3.6.0+ |
|
||||
| COLUMN | |
|
||||
| COMMA | |
|
||||
| COMMENT | |
|
||||
| COMP | |
|
||||
| COMPACT | |
|
||||
| COMPACTS | |
|
||||
| COMPACT_INTERVAL | 3.3.5.0 及后续版本 |
|
||||
| COMPACT_TIME_OFFSET | 3.3.5.0 及后续版本 |
|
||||
| COMPACT_TIME_RANGE | 3.3.5.0 及后续版本 |
|
||||
| COMPACT_INTERVAL | 3.3.5.0+ |
|
||||
| COMPACT_TIME_OFFSET | 3.3.5.0+ |
|
||||
| COMPACT_TIME_RANGE | 3.3.5.0+ |
|
||||
| CONCAT | |
|
||||
| CONFLICT | |
|
||||
| CONNECTION | |
|
||||
|
@ -117,7 +117,7 @@ description: TDengine 保留关键字的详细列表
|
|||
| DESC | |
|
||||
| DESCRIBE | |
|
||||
| DETACH | |
|
||||
| DISK_INFO | 3.3.5.0 及后续版本 |
|
||||
| DISK_INFO | 3.3.5.0+ |
|
||||
| DISTINCT | |
|
||||
| DISTRIBUTED | |
|
||||
| DIVIDE | |
|
||||
|
@ -150,19 +150,19 @@ description: TDengine 保留关键字的详细列表
|
|||
|关键字|说明|
|
||||
|----------------------|-|
|
||||
| FAIL | |
|
||||
| FHIGH | 3.3.4.3 及后续版本 |
|
||||
| FHIGH | 3.3.4.3+ |
|
||||
| FILE | |
|
||||
| FILL | |
|
||||
| FILL_HISTORY | |
|
||||
| FIRST | |
|
||||
| FLOAT | |
|
||||
| FLOW | 3.3.4.3 及后续版本 |
|
||||
| FLOW | 3.3.4.3+ |
|
||||
| FLUSH | |
|
||||
| FOR | |
|
||||
| FORCE | |
|
||||
| FORCE_WINDOW_CLOSE | 3.3.4.3 及后续版本 |
|
||||
| FORCE_WINDOW_CLOSE | 3.3.4.3+ |
|
||||
| FROM | |
|
||||
| FROWTS | 3.3.4.3 及后续版本 |
|
||||
| FROWTS | 3.3.4.3+ |
|
||||
| FULL | |
|
||||
| FUNCTION | |
|
||||
| FUNCTIONS | |
|
||||
|
@ -208,7 +208,7 @@ description: TDengine 保留关键字的详细列表
|
|||
| INTO | |
|
||||
| IPTOKEN | |
|
||||
| IROWTS | |
|
||||
| IROWTS_ORIGIN | 3.3.5.0 及后续版本 |
|
||||
| IROWTS_ORIGIN | 3.3.5.0+ |
|
||||
| IS | |
|
||||
| IS_IMPORT | |
|
||||
| ISFILLED | |
|
||||
|
@ -260,6 +260,7 @@ description: TDengine 保留关键字的详细列表
|
|||
| MEDIUMBLOB | |
|
||||
| MERGE | |
|
||||
| META | |
|
||||
| META_ONLY | 3.3.6.0+ |
|
||||
| MINROWS | |
|
||||
| MINUS | |
|
||||
| MNODE | |
|
||||
|
@ -277,8 +278,8 @@ description: TDengine 保留关键字的详细列表
|
|||
| NONE | |
|
||||
| NORMAL | |
|
||||
| NOT | |
|
||||
| NOTIFY | 3.3.6.0 及后续版本 |
|
||||
| NOTIFY_HISTORY | 3.3.6.0 及后续版本 |
|
||||
| NOTIFY | 3.3.6.0+ |
|
||||
| NOTIFY_HISTORY | 3.3.6.0+ |
|
||||
| NOTNULL | |
|
||||
| NOW | |
|
||||
| NULL | |
|
||||
|
@ -292,7 +293,7 @@ description: TDengine 保留关键字的详细列表
|
|||
| OFFSET | |
|
||||
| ON | |
|
||||
| ONLY | |
|
||||
| ON_FAILURE | 3.3.6.0 及后续版本 |
|
||||
| ON_FAILURE | 3.3.6.0+ |
|
||||
| OR | |
|
||||
| ORDER | |
|
||||
| OUTER | |
|
||||
|
@ -340,7 +341,7 @@ description: TDengine 保留关键字的详细列表
|
|||
| RATIO | |
|
||||
| READ | |
|
||||
| RECURSIVE | |
|
||||
| REGEXP | 3.3.6.0 及后续版本 |
|
||||
| REGEXP | 3.3.6.0+ |
|
||||
| REDISTRIBUTE | |
|
||||
| REM | |
|
||||
| REPLACE | |
|
||||
|
@ -429,7 +430,7 @@ description: TDengine 保留关键字的详细列表
|
|||
| TRANSACTIONS | |
|
||||
| TRIGGER | |
|
||||
| TRIM | |
|
||||
| TRUE_FOR | |
|
||||
| TRUE_FOR | 3.3.6.0+ |
|
||||
| TSDB_PAGESIZE | |
|
||||
| TSERIES | |
|
||||
| TSMA | |
|
||||
|
|
|
@ -1640,6 +1640,7 @@ typedef struct {
|
|||
int32_t sqlLen;
|
||||
char* sql;
|
||||
SArray* vgroupIds;
|
||||
int8_t metaOnly;
|
||||
} SCompactDbReq;
|
||||
|
||||
int32_t tSerializeSCompactDbReq(void* buf, int32_t bufLen, SCompactDbReq* pReq);
|
||||
|
@ -2085,6 +2086,7 @@ typedef struct {
|
|||
int64_t compactStartTime;
|
||||
STimeWindow tw;
|
||||
int32_t compactId;
|
||||
int8_t metaOnly;
|
||||
} SCompactVnodeReq;
|
||||
|
||||
int32_t tSerializeSCompactVnodeReq(void* buf, int32_t bufLen, SCompactVnodeReq* pReq);
|
||||
|
|
|
@ -169,6 +169,7 @@ typedef struct SCompactDatabaseStmt {
|
|||
char dbName[TSDB_DB_NAME_LEN];
|
||||
SNode* pStart;
|
||||
SNode* pEnd;
|
||||
bool metaOnly;
|
||||
} SCompactDatabaseStmt;
|
||||
|
||||
typedef struct SCompactVgroupsStmt {
|
||||
|
@ -177,6 +178,7 @@ typedef struct SCompactVgroupsStmt {
|
|||
SNodeList* vgidList;
|
||||
SNode* pStart;
|
||||
SNode* pEnd;
|
||||
bool metaOnly;
|
||||
} SCompactVgroupsStmt;
|
||||
|
||||
typedef struct STableOptions {
|
||||
|
|
|
@ -158,11 +158,14 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch
|
|||
int32_t qStmtBindParams2(SQuery* pQuery, TAOS_STMT2_BIND* pParams, int32_t colIdx, void* charsetCxt);
|
||||
int32_t qBindStmtStbColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen,
|
||||
STSchema** pTSchema, SBindInfo2* pBindInfos, void *charsetCxt);
|
||||
int32_t qBindStmtColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen, void *charsetCxt);
|
||||
int32_t qBindStmtColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen,
|
||||
void* charsetCxt);
|
||||
int32_t qBindStmtSingleColValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen,
|
||||
int32_t colIdx, int32_t rowNum, void *charsetCxt);
|
||||
int32_t colIdx, int32_t rowNum, void* charsetCxt);
|
||||
int32_t qBindStmt2RowValue(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen,
|
||||
STSchema** pTSchema, SBindInfo2* pBindInfos, void* charsetCxt);
|
||||
int32_t qBindStmtTagsValue2(void* pBlock, void* boundTags, int64_t suid, const char* sTableName, char* tName,
|
||||
TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen, void *charsetCxt);
|
||||
TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen, void* charsetCxt);
|
||||
|
||||
void destroyBoundColumnInfo(void* pBoundInfo);
|
||||
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf,
|
||||
|
|
|
@ -101,6 +101,7 @@ typedef struct {
|
|||
bool autoCreateTbl;
|
||||
SHashObj *pVgHash;
|
||||
SBindInfo2 *pBindInfo;
|
||||
bool bindRowFormat;
|
||||
|
||||
SStbInterlaceInfo siInfo;
|
||||
} SStmtSQLInfo2;
|
||||
|
|
|
@ -1421,7 +1421,12 @@ int stmtBindBatch2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bind, int32_t colIdx) {
|
|||
pStmt->exec.pCurrBlock = *pDataBlock;
|
||||
if (pStmt->sql.stbInterlaceMode) {
|
||||
taosArrayDestroy(pStmt->exec.pCurrBlock->pData->aCol);
|
||||
pStmt->exec.pCurrBlock->pData->aCol = NULL;
|
||||
(*pDataBlock)->pData->aCol = NULL;
|
||||
}
|
||||
if (colIdx < -1) {
|
||||
pStmt->sql.bindRowFormat = true;
|
||||
taosArrayDestroy((*pDataBlock)->pData->aCol);
|
||||
(*pDataBlock)->pData->aCol = taosArrayInit(20, POINTER_BYTES);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1449,10 +1454,21 @@ int stmtBindBatch2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bind, int32_t colIdx) {
|
|||
if (pStmt->sql.stbInterlaceMode) {
|
||||
(*pDataBlock)->pData->flags = 0;
|
||||
code = qBindStmtStbColsValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf,
|
||||
pStmt->exec.pRequest->msgBufLen, &pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo, pStmt->taos->optionInfo.charsetCxt);
|
||||
pStmt->exec.pRequest->msgBufLen, &pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo,
|
||||
pStmt->taos->optionInfo.charsetCxt);
|
||||
} else {
|
||||
code =
|
||||
qBindStmtColsValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen, pStmt->taos->optionInfo.charsetCxt);
|
||||
if (colIdx == -1) {
|
||||
if (pStmt->sql.bindRowFormat) {
|
||||
tscError("can't mix bind row format and bind column format");
|
||||
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
|
||||
}
|
||||
code = qBindStmtColsValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf,
|
||||
pStmt->exec.pRequest->msgBufLen, pStmt->taos->optionInfo.charsetCxt);
|
||||
} else {
|
||||
code = qBindStmt2RowValue(*pDataBlock, (*pDataBlock)->pData->aRowP, bind, pStmt->exec.pRequest->msgBuf,
|
||||
pStmt->exec.pRequest->msgBufLen, &pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo,
|
||||
pStmt->taos->optionInfo.charsetCxt);
|
||||
}
|
||||
}
|
||||
|
||||
if (code) {
|
||||
|
@ -1465,6 +1481,11 @@ int stmtBindBatch2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bind, int32_t colIdx) {
|
|||
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
|
||||
}
|
||||
|
||||
if (pStmt->sql.bindRowFormat) {
|
||||
tscError("can't mix bind row format and bind column format");
|
||||
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
|
||||
}
|
||||
|
||||
if (colIdx != (pStmt->bInfo.sBindLastIdx + 1) && colIdx != 0) {
|
||||
tscError("bind column index not in sequence");
|
||||
STMT_ERR_RET(TSDB_CODE_APP_ERROR);
|
||||
|
@ -1695,11 +1716,11 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {
|
|||
return pStmt->errCode;
|
||||
}
|
||||
|
||||
(void)taosThreadMutexLock(&pStmt->asyncBindParam.mutex);
|
||||
TSC_ERR_RET(taosThreadMutexLock(&pStmt->asyncBindParam.mutex));
|
||||
while (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) {
|
||||
(void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex);
|
||||
}
|
||||
(void)taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex);
|
||||
TSC_ERR_RET(taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex));
|
||||
|
||||
if (pStmt->sql.stbInterlaceMode) {
|
||||
STMT_ERR_RET(stmtAddBatch2(pStmt));
|
||||
|
@ -1802,11 +1823,11 @@ int stmtClose2(TAOS_STMT2* stmt) {
|
|||
pStmt->bindThreadInUse = false;
|
||||
}
|
||||
|
||||
(void)taosThreadMutexLock(&pStmt->asyncBindParam.mutex);
|
||||
TSC_ERR_RET(taosThreadMutexLock(&pStmt->asyncBindParam.mutex));
|
||||
while (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) {
|
||||
(void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex);
|
||||
}
|
||||
(void)taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex);
|
||||
TSC_ERR_RET(taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex));
|
||||
|
||||
(void)taosThreadCondDestroy(&pStmt->queue.waitCond);
|
||||
(void)taosThreadMutexDestroy(&pStmt->queue.mutex);
|
||||
|
|
|
@ -958,8 +958,49 @@ TEST(stmt2Case, stmt2_insert_non_statndard) {
|
|||
"double,bool_col bool,binary_col binary(20),nchar_col nchar(20),varbinary_col varbinary(20),geometry_col "
|
||||
"geometry(200)) tags(int_tag int,long_tag bigint,double_tag double,bool_tag bool,binary_tag "
|
||||
"binary(20),nchar_tag nchar(20),varbinary_tag varbinary(20),geometry_tag geometry(200));");
|
||||
do_query(taos, "use stmt2_testdb_6");
|
||||
|
||||
TAOS_STMT2_OPTION option = {0, false, false, NULL, NULL};
|
||||
TAOS_STMT2_OPTION option = {0, true, true, NULL, NULL};
|
||||
|
||||
// less cols and tags using stb
|
||||
{
|
||||
TAOS_STMT2* stmt = taos_stmt2_init(taos, &option);
|
||||
ASSERT_NE(stmt, nullptr);
|
||||
const char* sql = "INSERT INTO stmt2_testdb_6.? using stmt2_testdb_6.stb1 (int_tag)tags(1) (ts) VALUES (?)";
|
||||
int code = taos_stmt2_prepare(stmt, sql, 0);
|
||||
checkError(stmt, code);
|
||||
int total_affect_rows = 0;
|
||||
|
||||
int t64_len[2] = {sizeof(int64_t), sizeof(int64_t)};
|
||||
int tag_i = 0;
|
||||
int tag_l = sizeof(int);
|
||||
int64_t ts[2] = {1591060628000, 1591060628100};
|
||||
for (int i = 0; i < 3; i++) {
|
||||
ts[0] += 1000;
|
||||
ts[1] += 1000;
|
||||
|
||||
TAOS_STMT2_BIND tags1 = {TSDB_DATA_TYPE_INT, &tag_i, &tag_l, NULL, 1};
|
||||
TAOS_STMT2_BIND tags2 = {TSDB_DATA_TYPE_INT, &tag_i, &tag_l, NULL, 1};
|
||||
TAOS_STMT2_BIND params1 = {TSDB_DATA_TYPE_TIMESTAMP, &ts, &t64_len[0], NULL, 2};
|
||||
TAOS_STMT2_BIND params2 = {TSDB_DATA_TYPE_TIMESTAMP, &ts, &t64_len[0], NULL, 2};
|
||||
|
||||
TAOS_STMT2_BIND* tagv[2] = {&tags1, &tags2};
|
||||
TAOS_STMT2_BIND* paramv[2] = {¶ms1, ¶ms2};
|
||||
char* tbname[2] = {"tb1", "tb2"};
|
||||
TAOS_STMT2_BINDV bindv = {2, &tbname[0], NULL, ¶mv[0]};
|
||||
code = taos_stmt2_bind_param(stmt, &bindv, -1);
|
||||
checkError(stmt, code);
|
||||
|
||||
int affected_rows;
|
||||
taos_stmt2_exec(stmt, &affected_rows);
|
||||
total_affect_rows += affected_rows;
|
||||
|
||||
checkError(stmt, code);
|
||||
}
|
||||
|
||||
ASSERT_EQ(total_affect_rows, 12);
|
||||
taos_stmt2_close(stmt);
|
||||
}
|
||||
|
||||
// less cols and tags
|
||||
{
|
||||
|
@ -985,7 +1026,7 @@ TEST(stmt2Case, stmt2_insert_non_statndard) {
|
|||
|
||||
TAOS_STMT2_BIND* tagv[2] = {&tags1, &tags2};
|
||||
TAOS_STMT2_BIND* paramv[2] = {¶ms1, ¶ms2};
|
||||
char* tbname[2] = {"tb1", "tb2"};
|
||||
char* tbname[2] = {"tb3", "tb4"};
|
||||
TAOS_STMT2_BINDV bindv = {2, &tbname[0], &tagv[0], ¶mv[0]};
|
||||
code = taos_stmt2_bind_param(stmt, &bindv, -1);
|
||||
checkError(stmt, code);
|
||||
|
@ -1013,26 +1054,29 @@ TEST(stmt2Case, stmt2_insert_non_statndard) {
|
|||
int tag_l = sizeof(int);
|
||||
int tag_bl = 3;
|
||||
int64_t ts[2] = {1591060628000, 1591060628100};
|
||||
int64_t ts_2[2] = {1591060628800, 1591060628900};
|
||||
int t64_len[2] = {sizeof(int64_t), sizeof(int64_t)};
|
||||
int coli[2] = {1, 2};
|
||||
int coli_2[2] = {3, 4};
|
||||
int ilen[2] = {sizeof(int), sizeof(int)};
|
||||
int total_affect_rows = 0;
|
||||
for (int i = 0; i < 3; i++) {
|
||||
ts[0] += 1000;
|
||||
ts[1] += 1000;
|
||||
ts_2[0] += 1000;
|
||||
ts_2[1] += 1000;
|
||||
|
||||
TAOS_STMT2_BIND tags1[2] = {{TSDB_DATA_TYPE_BINARY, (void*)"abc", &tag_bl, NULL, 1},
|
||||
{TSDB_DATA_TYPE_INT, &tag_i, &tag_l, NULL, 1}};
|
||||
TAOS_STMT2_BIND tags2[2] = {{TSDB_DATA_TYPE_BINARY, (void*)"abc", &tag_bl, NULL, 1},
|
||||
{TSDB_DATA_TYPE_INT, &tag_i, &tag_l, NULL, 1}};
|
||||
TAOS_STMT2_BIND params1[2] = {{TSDB_DATA_TYPE_INT, &coli, &ilen[0], NULL, 2},
|
||||
{TSDB_DATA_TYPE_TIMESTAMP, &ts, &t64_len[0], NULL, 2}};
|
||||
TAOS_STMT2_BIND params2[2] = {{TSDB_DATA_TYPE_INT, &coli, &ilen[0], NULL, 2},
|
||||
{TSDB_DATA_TYPE_TIMESTAMP, &ts, &t64_len[0], NULL, 2}};
|
||||
TAOS_STMT2_BIND tags[2][2] = {
|
||||
{{TSDB_DATA_TYPE_BINARY, (void*)"abc", &tag_bl, NULL, 1}, {TSDB_DATA_TYPE_INT, &tag_i, &tag_l, NULL, 1}},
|
||||
{{TSDB_DATA_TYPE_BINARY, (void*)"def", &tag_bl, NULL, 1}, {TSDB_DATA_TYPE_INT, &tag_i, &tag_l, NULL, 1}}};
|
||||
TAOS_STMT2_BIND params[2][2] = {
|
||||
{{TSDB_DATA_TYPE_INT, &coli[0], &ilen[0], NULL, 2}, {TSDB_DATA_TYPE_TIMESTAMP, &ts[0], &t64_len[0], NULL, 2}},
|
||||
{{TSDB_DATA_TYPE_INT, &coli_2[0], &ilen[0], NULL, 2},
|
||||
{TSDB_DATA_TYPE_TIMESTAMP, &ts_2[0], &t64_len[0], NULL, 2}}};
|
||||
|
||||
TAOS_STMT2_BIND* tagv[2] = {&tags1[0], &tags2[0]};
|
||||
TAOS_STMT2_BIND* paramv[2] = {¶ms1[0], ¶ms2[0]};
|
||||
char* tbname[2] = {"tb3", "tb4"};
|
||||
TAOS_STMT2_BIND* tagv[2] = {&tags[0][0], &tags[1][0]};
|
||||
TAOS_STMT2_BIND* paramv[2] = {¶ms[0][0], ¶ms[1][0]};
|
||||
char* tbname[2] = {"tb5", "tb6"};
|
||||
TAOS_STMT2_BINDV bindv = {2, &tbname[0], &tagv[0], ¶mv[0]};
|
||||
code = taos_stmt2_bind_param(stmt, &bindv, -1);
|
||||
checkError(stmt, code);
|
||||
|
@ -1894,4 +1938,158 @@ TEST(stmt2Case, async_order) {
|
|||
}
|
||||
}
|
||||
|
||||
TEST(stmt2Case, rowformat_bind) {
|
||||
TAOS* taos = taos_connect("localhost", "root", "taosdata", "", 0);
|
||||
ASSERT_NE(taos, nullptr);
|
||||
|
||||
do_query(taos, "drop database if exists stmt2_testdb_16");
|
||||
do_query(taos, "create database IF NOT EXISTS stmt2_testdb_16");
|
||||
do_query(
|
||||
taos,
|
||||
"create stable stmt2_testdb_16.stb(ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 binary(8), c6 "
|
||||
"smallint, c7 "
|
||||
"tinyint, c8 bool, c9 nchar(8), c10 geometry(256))TAGS(tts timestamp, t1 int, t2 bigint, t3 float, t4 double, t5 "
|
||||
"binary(8), t6 smallint, t7 tinyint, t8 bool, t9 nchar(8), t10 geometry(256))");
|
||||
|
||||
TAOS_STMT2_OPTION option = {0};
|
||||
TAOS_STMT2* stmt = taos_stmt2_init(taos, &option);
|
||||
ASSERT_NE(stmt, nullptr);
|
||||
int code = 0;
|
||||
uintptr_t c10len = 0;
|
||||
struct {
|
||||
int64_t c1;
|
||||
int32_t c2;
|
||||
int64_t c3;
|
||||
float c4;
|
||||
double c5;
|
||||
unsigned char c6[8];
|
||||
int16_t c7;
|
||||
int8_t c8;
|
||||
int8_t c9;
|
||||
char c10[32];
|
||||
} v = {1591060628000, 1, 2, 3.0, 4.0, "abcdef", 5, 6, 7, "ijnop"};
|
||||
|
||||
struct {
|
||||
int32_t c1;
|
||||
int32_t c2;
|
||||
int32_t c3;
|
||||
int32_t c4;
|
||||
int32_t c5;
|
||||
int32_t c6;
|
||||
int32_t c7;
|
||||
int32_t c8;
|
||||
int32_t c9;
|
||||
int32_t c10;
|
||||
} v_len = {sizeof(int64_t), sizeof(int32_t),
|
||||
sizeof(int64_t), sizeof(float),
|
||||
sizeof(double), 8,
|
||||
sizeof(int16_t), sizeof(int8_t),
|
||||
sizeof(int8_t), 8};
|
||||
TAOS_STMT2_BIND params[11];
|
||||
params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
params[0].length = (int32_t*)&v_len.c1;
|
||||
params[0].buffer = &v.c1;
|
||||
params[0].is_null = NULL;
|
||||
params[0].num = 1;
|
||||
|
||||
params[1].buffer_type = TSDB_DATA_TYPE_INT;
|
||||
params[1].buffer = &v.c2;
|
||||
params[1].length = (int32_t*)&v_len.c2;
|
||||
params[1].is_null = NULL;
|
||||
params[1].num = 1;
|
||||
|
||||
params[2].buffer_type = TSDB_DATA_TYPE_BIGINT;
|
||||
params[2].buffer = &v.c3;
|
||||
params[2].length = (int32_t*)&v_len.c3;
|
||||
params[2].is_null = NULL;
|
||||
params[2].num = 1;
|
||||
|
||||
params[3].buffer_type = TSDB_DATA_TYPE_FLOAT;
|
||||
params[3].buffer = &v.c4;
|
||||
params[3].length = (int32_t*)&v_len.c4;
|
||||
params[3].is_null = NULL;
|
||||
params[3].num = 1;
|
||||
|
||||
params[4].buffer_type = TSDB_DATA_TYPE_DOUBLE;
|
||||
params[4].buffer = &v.c5;
|
||||
params[4].length = (int32_t*)&v_len.c5;
|
||||
params[4].is_null = NULL;
|
||||
params[4].num = 1;
|
||||
|
||||
params[5].buffer_type = TSDB_DATA_TYPE_BINARY;
|
||||
params[5].buffer = &v.c6;
|
||||
params[5].length = (int32_t*)&v_len.c6;
|
||||
params[5].is_null = NULL;
|
||||
params[5].num = 1;
|
||||
|
||||
params[6].buffer_type = TSDB_DATA_TYPE_SMALLINT;
|
||||
params[6].buffer = &v.c7;
|
||||
params[6].length = (int32_t*)&v_len.c7;
|
||||
params[6].is_null = NULL;
|
||||
params[6].num = 1;
|
||||
|
||||
params[7].buffer_type = TSDB_DATA_TYPE_TINYINT;
|
||||
params[7].buffer = &v.c8;
|
||||
params[7].length = (int32_t*)&v_len.c8;
|
||||
params[7].is_null = NULL;
|
||||
params[7].num = 1;
|
||||
|
||||
params[8].buffer_type = TSDB_DATA_TYPE_BOOL;
|
||||
params[8].buffer = &v.c9;
|
||||
params[8].length = (int32_t*)&v_len.c9;
|
||||
params[8].is_null = NULL;
|
||||
params[8].num = 1;
|
||||
|
||||
params[9].buffer_type = TSDB_DATA_TYPE_NCHAR;
|
||||
params[9].buffer = &v.c10;
|
||||
params[9].length = (int32_t*)&v_len.c10;
|
||||
params[9].is_null = NULL;
|
||||
params[9].num = 1;
|
||||
|
||||
unsigned char* outputGeom1;
|
||||
size_t size1;
|
||||
initCtxMakePoint();
|
||||
code = doMakePoint(1.000, 2.000, &outputGeom1, &size1);
|
||||
checkError(stmt, code);
|
||||
params[10].buffer_type = TSDB_DATA_TYPE_GEOMETRY;
|
||||
params[10].buffer = outputGeom1;
|
||||
params[10].length = (int32_t*)&size1;
|
||||
params[10].is_null = NULL;
|
||||
params[10].num = 1;
|
||||
|
||||
char* stmt_sql = "insert into stmt2_testdb_16.? using stb tags(?,?,?,?,?,?,?,?,?,?,?)values (?,?,?,?,?,?,?,?,?,?,?)";
|
||||
code = taos_stmt2_prepare(stmt, stmt_sql, 0);
|
||||
checkError(stmt, code);
|
||||
|
||||
char* tbname[1] = {"tb1"};
|
||||
TAOS_STMT2_BIND* tags = ¶ms[0];
|
||||
TAOS_STMT2_BIND* cols = ¶ms[0];
|
||||
TAOS_STMT2_BINDV bindv = {1, &tbname[0], &tags, &cols};
|
||||
code = taos_stmt2_bind_param(stmt, &bindv, -2);
|
||||
checkError(stmt, code);
|
||||
|
||||
int affected_rows;
|
||||
code = taos_stmt2_exec(stmt, &affected_rows);
|
||||
checkError(stmt, code);
|
||||
ASSERT_EQ(affected_rows, 1);
|
||||
|
||||
int64_t ts2 = 1591060628000;
|
||||
params[0].buffer = &ts2;
|
||||
code = taos_stmt2_bind_param(stmt, &bindv, -2);
|
||||
checkError(stmt, code);
|
||||
|
||||
code = taos_stmt2_exec(stmt, &affected_rows);
|
||||
checkError(stmt, code);
|
||||
ASSERT_EQ(affected_rows, 1);
|
||||
|
||||
params[0].buffer = &ts2;
|
||||
code = taos_stmt2_bind_param(stmt, &bindv, -1);
|
||||
ASSERT_EQ(code, TSDB_CODE_TSC_STMT_API_ERROR);
|
||||
|
||||
geosFreeBuffer(outputGeom1);
|
||||
taos_stmt2_close(stmt);
|
||||
do_query(taos, "drop database if exists stmt2_testdb_16");
|
||||
taos_close(taos);
|
||||
}
|
||||
|
||||
#pragma GCC diagnostic pop
|
||||
|
|
|
@ -1478,6 +1478,8 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
|
|||
}
|
||||
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->timestamp));
|
||||
|
||||
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->timestamp));
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
_exit:
|
||||
|
@ -4691,6 +4693,8 @@ int32_t tSerializeSCompactDbReq(void *buf, int32_t bufLen, SCompactDbReq *pReq)
|
|||
}
|
||||
}
|
||||
|
||||
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->metaOnly));
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
_exit:
|
||||
|
@ -4734,6 +4738,12 @@ int32_t tDeserializeSCompactDbReq(void *buf, int32_t bufLen, SCompactDbReq *pReq
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!tDecodeIsEnd(&decoder)) {
|
||||
TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->metaOnly));
|
||||
} else {
|
||||
pReq->metaOnly = false;
|
||||
}
|
||||
tEndDecode(&decoder);
|
||||
|
||||
_exit:
|
||||
|
@ -7161,6 +7171,7 @@ int32_t tSerializeSCompactVnodeReq(void *buf, int32_t bufLen, SCompactVnodeReq *
|
|||
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->tw.ekey));
|
||||
|
||||
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->compactId));
|
||||
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->metaOnly));
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
|
@ -7198,6 +7209,12 @@ int32_t tDeserializeSCompactVnodeReq(void *buf, int32_t bufLen, SCompactVnodeReq
|
|||
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->compactId));
|
||||
}
|
||||
|
||||
if (!tDecodeIsEnd(&decoder)) {
|
||||
TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->metaOnly));
|
||||
} else {
|
||||
pReq->metaOnly = false;
|
||||
}
|
||||
|
||||
tEndDecode(&decoder);
|
||||
_exit:
|
||||
tDecoderClear(&decoder);
|
||||
|
|
|
@ -3269,7 +3269,7 @@ int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorte
|
|||
}
|
||||
|
||||
if (!infoSorted) {
|
||||
taosqsort_r(infos, numOfInfos, sizeof(SBindInfo), NULL, tBindInfoCompare);
|
||||
taosqsort_r(infos, numOfInfos, sizeof(SBindInfo2), NULL, tBindInfoCompare);
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
|
|
|
@ -170,8 +170,6 @@ static void dmSetSignalHandle() {
|
|||
#endif
|
||||
}
|
||||
|
||||
extern bool generateNewMeta;
|
||||
|
||||
static int32_t dmParseArgs(int32_t argc, char const *argv[]) {
|
||||
global.startTime = taosGetTimestampMs();
|
||||
|
||||
|
@ -210,8 +208,6 @@ static int32_t dmParseArgs(int32_t argc, char const *argv[]) {
|
|||
global.dumpSdb = true;
|
||||
} else if (strcmp(argv[i], "-dTxn") == 0) {
|
||||
global.deleteTrans = true;
|
||||
} else if (strcmp(argv[i], "-r") == 0) {
|
||||
generateNewMeta = true;
|
||||
} else if (strcmp(argv[i], "-E") == 0) {
|
||||
if (i < argc - 1) {
|
||||
if (strlen(argv[++i]) >= PATH_MAX) {
|
||||
|
|
|
@ -47,7 +47,7 @@ int32_t mndSetMoveVgroupsInfoToTrans(SMnode *, STrans *pTrans, int32_t dropDnode
|
|||
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
|
||||
SArray *pArray, SVgObj* pNewVgroup);
|
||||
int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
|
||||
STimeWindow tw);
|
||||
STimeWindow tw, bool metaOnly);
|
||||
int32_t mndBuildRaftAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
|
||||
SArray *pArray);
|
||||
|
||||
|
|
|
@ -927,7 +927,8 @@ static int32_t mndCompactDispatchAudit(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pD
|
|||
return 0;
|
||||
}
|
||||
|
||||
extern int32_t mndCompactDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow tw, SArray *vgroupIds);
|
||||
extern int32_t mndCompactDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow tw, SArray *vgroupIds,
|
||||
bool metaOnly);
|
||||
static int32_t mndCompactDispatch(SRpcMsg *pReq) {
|
||||
int32_t code = 0;
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
|
@ -982,7 +983,7 @@ static int32_t mndCompactDispatch(SRpcMsg *pReq) {
|
|||
.skey = convertTimePrecision(curMs + compactStartTime * 60000LL, TSDB_TIME_PRECISION_MILLI, pDb->cfg.precision),
|
||||
.ekey = convertTimePrecision(curMs + compactEndTime * 60000LL, TSDB_TIME_PRECISION_MILLI, pDb->cfg.precision)};
|
||||
|
||||
if ((code = mndCompactDb(pMnode, NULL, pDb, tw, NULL)) == 0) {
|
||||
if ((code = mndCompactDb(pMnode, NULL, pDb, tw, NULL, false)) == 0) {
|
||||
mInfo("db:%p,%s, succeed to dispatch compact with range:[%" PRIi64 ",%" PRIi64 "], interval:%dm, start:%" PRIi64
|
||||
"m, end:%" PRIi64 "m, offset:%" PRIi8 "h",
|
||||
pDb, pDb->name, tw.skey, tw.ekey, pDb->cfg.compactInterval, compactStartTime, compactEndTime,
|
||||
|
|
|
@ -3632,11 +3632,12 @@ bool mndVgroupInDnode(SVgObj *pVgroup, int32_t dnodeId) {
|
|||
}
|
||||
|
||||
static void *mndBuildCompactVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, int64_t compactTs,
|
||||
STimeWindow tw) {
|
||||
STimeWindow tw, bool metaOnly) {
|
||||
SCompactVnodeReq compactReq = {0};
|
||||
compactReq.dbUid = pDb->uid;
|
||||
compactReq.compactStartTime = compactTs;
|
||||
compactReq.tw = tw;
|
||||
compactReq.metaOnly = metaOnly;
|
||||
tstrncpy(compactReq.db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||
|
||||
mInfo("vgId:%d, build compact vnode config req", pVgroup->vgId);
|
||||
|
@ -3667,13 +3668,13 @@ static void *mndBuildCompactVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgrou
|
|||
}
|
||||
|
||||
static int32_t mndAddCompactVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
|
||||
STimeWindow tw) {
|
||||
STimeWindow tw, bool metaOnly) {
|
||||
int32_t code = 0;
|
||||
STransAction action = {0};
|
||||
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
|
||||
int32_t contLen = 0;
|
||||
void *pReq = mndBuildCompactVnodeReq(pMnode, pDb, pVgroup, &contLen, compactTs, tw);
|
||||
void *pReq = mndBuildCompactVnodeReq(pMnode, pDb, pVgroup, &contLen, compactTs, tw, metaOnly);
|
||||
if (pReq == NULL) {
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
|
@ -3693,7 +3694,7 @@ static int32_t mndAddCompactVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *
|
|||
}
|
||||
|
||||
int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
|
||||
STimeWindow tw) {
|
||||
TAOS_CHECK_RETURN(mndAddCompactVnodeAction(pMnode, pTrans, pDb, pVgroup, compactTs, tw));
|
||||
STimeWindow tw, bool metaOnly) {
|
||||
TAOS_CHECK_RETURN(mndAddCompactVnodeAction(pMnode, pTrans, pDb, pVgroup, compactTs, tw, metaOnly));
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -96,6 +96,7 @@ if(TD_VNODE_PLUGINS)
|
|||
vnode
|
||||
PRIVATE
|
||||
${TD_ENTERPRISE_DIR}/src/plugins/vnode/src/tsdbCompact.c
|
||||
${TD_ENTERPRISE_DIR}/src/plugins/vnode/src/metaCompact.c
|
||||
${TD_ENTERPRISE_DIR}/src/plugins/vnode/src/tsdbCompactMonitor.c
|
||||
${TD_ENTERPRISE_DIR}/src/plugins/vnode/src/vnodeCompact.c
|
||||
)
|
||||
|
|
|
@ -24,7 +24,6 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef struct SMetaIdx SMetaIdx;
|
||||
typedef struct SMetaDB SMetaDB;
|
||||
typedef struct SMetaCache SMetaCache;
|
||||
|
||||
|
@ -103,8 +102,6 @@ struct SMeta {
|
|||
// stream
|
||||
TTB* pStreamDb;
|
||||
|
||||
SMetaIdx* pIdx;
|
||||
|
||||
SMetaCache* pCache;
|
||||
};
|
||||
|
||||
|
|
|
@ -168,7 +168,7 @@ int metaTtlFindExpired(SMeta* pMeta, int64_t timePointMs, SArray* tb
|
|||
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
|
||||
int metaUpdateChangeTimeWithLock(SMeta* pMeta, tb_uid_t uid, int64_t changeTimeMs);
|
||||
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);
|
||||
int64_t metaGetTableCreateTime(SMeta *pMeta, tb_uid_t uid, int lock);
|
||||
int64_t metaGetTableCreateTime(SMeta* pMeta, tb_uid_t uid, int lock);
|
||||
int32_t metaGetTbTSchemaNotNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema);
|
||||
int32_t metaGetTbTSchemaMaybeNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema);
|
||||
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);
|
||||
|
@ -487,7 +487,13 @@ struct SVnode {
|
|||
// commit variables
|
||||
SVATaskID commitTask;
|
||||
|
||||
SMeta* pMeta;
|
||||
struct {
|
||||
TdThreadRwlock metaRWLock;
|
||||
SMeta* pMeta;
|
||||
SMeta* pNewMeta;
|
||||
SVATaskID metaCompactTask;
|
||||
};
|
||||
|
||||
SSma* pSma;
|
||||
STsdb* pTsdb;
|
||||
SWal* pWal;
|
||||
|
|
|
@ -135,12 +135,17 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
|
||||
int metaDecodeEntryImpl(SDecoder *pCoder, SMetaEntry *pME, bool headerOnly) {
|
||||
TAOS_CHECK_RETURN(tStartDecode(pCoder));
|
||||
TAOS_CHECK_RETURN(tDecodeI64(pCoder, &pME->version));
|
||||
TAOS_CHECK_RETURN(tDecodeI8(pCoder, &pME->type));
|
||||
TAOS_CHECK_RETURN(tDecodeI64(pCoder, &pME->uid));
|
||||
|
||||
if (headerOnly) {
|
||||
tEndDecode(pCoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (pME->type > 0) {
|
||||
TAOS_CHECK_RETURN(tDecodeCStr(pCoder, &pME->name));
|
||||
|
||||
|
@ -209,6 +214,8 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) { return metaDecodeEntryImpl(pCoder, pME, false); }
|
||||
|
||||
static int32_t metaCloneSchema(const SSchemaWrapper *pSrc, SSchemaWrapper *pDst) {
|
||||
if (pSrc == NULL || pDst == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
|
|
|
@ -133,7 +133,7 @@ static void doScan(SMeta *pMeta) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t metaOpenImpl(SVnode *pVnode, SMeta **ppMeta, const char *metaDir, int8_t rollback) {
|
||||
int32_t metaOpenImpl(SVnode *pVnode, SMeta **ppMeta, const char *metaDir, int8_t rollback) {
|
||||
SMeta *pMeta = NULL;
|
||||
int32_t code = 0;
|
||||
int32_t lino;
|
||||
|
@ -251,187 +251,35 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
bool generateNewMeta = false;
|
||||
|
||||
static int32_t metaGenerateNewMeta(SMeta **ppMeta) {
|
||||
SMeta *pNewMeta = NULL;
|
||||
SMeta *pMeta = *ppMeta;
|
||||
SVnode *pVnode = pMeta->pVnode;
|
||||
|
||||
metaInfo("vgId:%d start to generate new meta", TD_VID(pMeta->pVnode));
|
||||
|
||||
// Open a new meta for orgainzation
|
||||
int32_t code = metaOpenImpl(pMeta->pVnode, &pNewMeta, VNODE_META_TMP_DIR, false);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
code = metaBegin(pNewMeta, META_BEGIN_HEAP_NIL);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
// i == 0, scan super table
|
||||
// i == 1, scan normal table and child table
|
||||
for (int i = 0; i < 2; i++) {
|
||||
TBC *uidCursor = NULL;
|
||||
int32_t counter = 0;
|
||||
|
||||
code = tdbTbcOpen(pMeta->pUidIdx, &uidCursor, NULL);
|
||||
if (code) {
|
||||
metaError("vgId:%d failed to open uid index cursor, reason:%s", TD_VID(pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
code = tdbTbcMoveToFirst(uidCursor);
|
||||
if (code) {
|
||||
metaError("vgId:%d failed to move to first, reason:%s", TD_VID(pVnode), tstrerror(code));
|
||||
tdbTbcClose(uidCursor);
|
||||
return code;
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
const void *pKey;
|
||||
int kLen;
|
||||
const void *pVal;
|
||||
int vLen;
|
||||
|
||||
if (tdbTbcGet(uidCursor, &pKey, &kLen, &pVal, &vLen) < 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
tb_uid_t uid = *(tb_uid_t *)pKey;
|
||||
SUidIdxVal *pUidIdxVal = (SUidIdxVal *)pVal;
|
||||
if ((i == 0 && (pUidIdxVal->suid && pUidIdxVal->suid == uid)) // super table
|
||||
|| (i == 1 && (pUidIdxVal->suid == 0 || pUidIdxVal->suid != uid)) // normal table and child table
|
||||
) {
|
||||
counter++;
|
||||
if (i == 0) {
|
||||
metaInfo("vgId:%d counter:%d new meta handle %s table uid:%" PRId64, TD_VID(pVnode), counter, "super", uid);
|
||||
} else {
|
||||
metaInfo("vgId:%d counter:%d new meta handle %s table uid:%" PRId64, TD_VID(pVnode), counter,
|
||||
pUidIdxVal->suid == 0 ? "normal" : "child", uid);
|
||||
}
|
||||
|
||||
// fetch table entry
|
||||
void *value = NULL;
|
||||
int valueSize = 0;
|
||||
if (tdbTbGet(pMeta->pTbDb,
|
||||
&(STbDbKey){
|
||||
.version = pUidIdxVal->version,
|
||||
.uid = uid,
|
||||
},
|
||||
sizeof(uid), &value, &valueSize) == 0) {
|
||||
SDecoder dc = {0};
|
||||
SMetaEntry me = {0};
|
||||
tDecoderInit(&dc, value, valueSize);
|
||||
if (metaDecodeEntry(&dc, &me) == 0) {
|
||||
if (me.type == TSDB_CHILD_TABLE &&
|
||||
tdbTbGet(pMeta->pUidIdx, &me.ctbEntry.suid, sizeof(me.ctbEntry.suid), NULL, NULL) != 0) {
|
||||
metaError("vgId:%d failed to get super table uid:%" PRId64 " for child table uid:%" PRId64,
|
||||
TD_VID(pVnode), me.ctbEntry.suid, uid);
|
||||
} else if (metaHandleEntry2(pNewMeta, &me) != 0) {
|
||||
metaError("vgId:%d failed to handle entry, uid:%" PRId64, TD_VID(pVnode), uid);
|
||||
}
|
||||
}
|
||||
tDecoderClear(&dc);
|
||||
}
|
||||
tdbFree(value);
|
||||
}
|
||||
|
||||
code = tdbTbcMoveToNext(uidCursor);
|
||||
if (code) {
|
||||
metaError("vgId:%d failed to move to next, reason:%s", TD_VID(pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
tdbTbcClose(uidCursor);
|
||||
}
|
||||
|
||||
code = metaCommit(pNewMeta, pNewMeta->txn);
|
||||
if (code) {
|
||||
metaError("vgId:%d failed to commit, reason:%s", TD_VID(pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
code = metaFinishCommit(pNewMeta, pNewMeta->txn);
|
||||
if (code) {
|
||||
metaError("vgId:%d failed to finish commit, reason:%s", TD_VID(pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
if ((code = metaBegin(pNewMeta, META_BEGIN_HEAP_NIL)) != 0) {
|
||||
metaError("vgId:%d failed to begin new meta, reason:%s", TD_VID(pVnode), tstrerror(code));
|
||||
}
|
||||
metaClose(&pNewMeta);
|
||||
metaInfo("vgId:%d finish to generate new meta", TD_VID(pVnode));
|
||||
return 0;
|
||||
void vnodeGetMetaPath(SVnode *pVnode, const char *metaDir, char *fname) {
|
||||
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, fname, TSDB_FILENAME_LEN);
|
||||
int32_t offset = strlen(fname);
|
||||
snprintf(fname + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, metaDir);
|
||||
}
|
||||
|
||||
int32_t metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
|
||||
if (generateNewMeta) {
|
||||
char path[TSDB_FILENAME_LEN] = {0};
|
||||
char oldMetaPath[TSDB_FILENAME_LEN] = {0};
|
||||
char newMetaPath[TSDB_FILENAME_LEN] = {0};
|
||||
char backupMetaPath[TSDB_FILENAME_LEN] = {0};
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
char metaDir[TSDB_FILENAME_LEN] = {0};
|
||||
char metaTempDir[TSDB_FILENAME_LEN] = {0};
|
||||
|
||||
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, path, TSDB_FILENAME_LEN);
|
||||
snprintf(oldMetaPath, sizeof(oldMetaPath) - 1, "%s%s%s", path, TD_DIRSEP, VNODE_META_DIR);
|
||||
snprintf(newMetaPath, sizeof(newMetaPath) - 1, "%s%s%s", path, TD_DIRSEP, VNODE_META_TMP_DIR);
|
||||
snprintf(backupMetaPath, sizeof(backupMetaPath) - 1, "%s%s%s", path, TD_DIRSEP, VNODE_META_BACKUP_DIR);
|
||||
vnodeGetMetaPath(pVnode, VNODE_META_DIR, metaDir);
|
||||
vnodeGetMetaPath(pVnode, VNODE_META_TMP_DIR, metaTempDir);
|
||||
|
||||
bool oldMetaExist = taosCheckExistFile(oldMetaPath);
|
||||
bool newMetaExist = taosCheckExistFile(newMetaPath);
|
||||
bool backupMetaExist = taosCheckExistFile(backupMetaPath);
|
||||
|
||||
if ((!backupMetaExist && !oldMetaExist && newMetaExist) // case 2
|
||||
|| (backupMetaExist && !oldMetaExist && !newMetaExist) // case 4
|
||||
|| (backupMetaExist && oldMetaExist && newMetaExist) // case 8
|
||||
) {
|
||||
metaError("vgId:%d invalid meta state, please check", TD_VID(pVnode));
|
||||
return TSDB_CODE_FAILED;
|
||||
} else if ((backupMetaExist && oldMetaExist && !newMetaExist) // case 7
|
||||
|| (!backupMetaExist && !oldMetaExist && !newMetaExist) // case 1
|
||||
) {
|
||||
return metaOpenImpl(pVnode, ppMeta, VNODE_META_DIR, rollback);
|
||||
} else if (backupMetaExist && !oldMetaExist && newMetaExist) {
|
||||
if (taosRenameFile(newMetaPath, oldMetaPath) != 0) {
|
||||
metaError("vgId:%d failed to rename new meta to old meta, reason:%s", TD_VID(pVnode), tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
return metaOpenImpl(pVnode, ppMeta, VNODE_META_DIR, rollback);
|
||||
} else {
|
||||
int32_t code = metaOpenImpl(pVnode, ppMeta, VNODE_META_DIR, rollback);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
code = metaGenerateNewMeta(ppMeta);
|
||||
if (code) {
|
||||
metaError("vgId:%d failed to generate new meta, reason:%s", TD_VID(pVnode), tstrerror(code));
|
||||
}
|
||||
|
||||
metaClose(ppMeta);
|
||||
if (taosRenameFile(oldMetaPath, backupMetaPath) != 0) {
|
||||
metaError("vgId:%d failed to rename old meta to backup, reason:%s", TD_VID(pVnode), tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
|
||||
// rename the new meta to old meta
|
||||
if (taosRenameFile(newMetaPath, oldMetaPath) != 0) {
|
||||
metaError("vgId:%d failed to rename new meta to old meta, reason:%s", TD_VID(pVnode), tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
code = metaOpenImpl(pVnode, ppMeta, VNODE_META_DIR, false);
|
||||
if (code) {
|
||||
metaError("vgId:%d failed to open new meta, reason:%s", TD_VID(pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
// Check file states
|
||||
if (!taosCheckExistFile(metaDir) && taosCheckExistFile(metaTempDir)) {
|
||||
code = taosRenameFile(metaTempDir, metaDir);
|
||||
if (code) {
|
||||
metaError("vgId:%d, %s failed at %s:%d since %s: rename %s to %s failed", TD_VID(pVnode), __func__, __FILE__,
|
||||
__LINE__, tstrerror(code), metaTempDir, metaDir);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
return metaOpenImpl(pVnode, ppMeta, VNODE_META_DIR, rollback);
|
||||
// Do open meta
|
||||
code = metaOpenImpl(pVnode, ppMeta, VNODE_META_DIR, rollback);
|
||||
if (code) {
|
||||
metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pVnode), __func__, __FILE__, __LINE__, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -21,6 +21,7 @@ struct SMetaSnapReader {
|
|||
int64_t sver;
|
||||
int64_t ever;
|
||||
TBC* pTbc;
|
||||
int32_t iLoop;
|
||||
};
|
||||
|
||||
int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader) {
|
||||
|
@ -65,6 +66,22 @@ void metaSnapReaderClose(SMetaSnapReader** ppReader) {
|
|||
}
|
||||
}
|
||||
|
||||
extern int metaDecodeEntryImpl(SDecoder* pCoder, SMetaEntry* pME, bool headerOnly);
|
||||
|
||||
static int32_t metaDecodeEntryHeader(void* data, int32_t size, SMetaEntry* entry) {
|
||||
SDecoder decoder = {0};
|
||||
tDecoderInit(&decoder, (uint8_t*)data, size);
|
||||
|
||||
int32_t code = metaDecodeEntryImpl(&decoder, entry, true);
|
||||
if (code) {
|
||||
tDecoderClear(&decoder);
|
||||
return code;
|
||||
}
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
|
||||
int32_t code = 0;
|
||||
const void* pKey = NULL;
|
||||
|
@ -72,19 +89,47 @@ int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
|
|||
int32_t nKey = 0;
|
||||
int32_t nData = 0;
|
||||
STbDbKey key;
|
||||
int32_t c;
|
||||
|
||||
*ppData = NULL;
|
||||
for (;;) {
|
||||
if (tdbTbcGet(pReader->pTbc, &pKey, &nKey, &pData, &nData)) {
|
||||
while (pReader->iLoop < 2) {
|
||||
if (tdbTbcGet(pReader->pTbc, &pKey, &nKey, &pData, &nData) != 0 || ((STbDbKey*)pKey)->version > pReader->ever) {
|
||||
pReader->iLoop++;
|
||||
|
||||
// Reopen the cursor to read from the beginning
|
||||
tdbTbcClose(pReader->pTbc);
|
||||
pReader->pTbc = NULL;
|
||||
code = tdbTbcOpen(pReader->pMeta->pTbDb, &pReader->pTbc, NULL);
|
||||
if (code) {
|
||||
metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pReader->pMeta->pVnode), __func__, __FILE__, __LINE__,
|
||||
tstrerror(code));
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
code = tdbTbcMoveTo(pReader->pTbc, &(STbDbKey){.version = pReader->sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c);
|
||||
if (code) {
|
||||
metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pReader->pMeta->pVnode), __func__, __FILE__, __LINE__,
|
||||
tstrerror(code));
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
// Decode meta entry
|
||||
SMetaEntry entry = {0};
|
||||
code = metaDecodeEntryHeader((void*)pData, nData, &entry);
|
||||
if (code) {
|
||||
metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pReader->pMeta->pVnode), __func__, __FILE__, __LINE__,
|
||||
tstrerror(code));
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
key = ((STbDbKey*)pKey)[0];
|
||||
if (key.version > pReader->ever) {
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
if (key.version < pReader->sver) {
|
||||
if (key.version < pReader->sver //
|
||||
|| (pReader->iLoop == 0 && TABS(entry.type) != TSDB_SUPER_TABLE) // First loop send super table entry
|
||||
|| (pReader->iLoop == 1 && TABS(entry.type) == TSDB_SUPER_TABLE) // Second loop send non-super table entry
|
||||
) {
|
||||
if (tdbTbcMoveToNext(pReader->pTbc) != 0) {
|
||||
metaTrace("vgId:%d, vnode snapshot meta read data done", TD_VID(pReader->pMeta->pVnode));
|
||||
}
|
||||
|
|
|
@ -449,6 +449,7 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
|
|||
}
|
||||
|
||||
// open meta
|
||||
(void)taosThreadRwlockInit(&pVnode->metaRWLock, NULL);
|
||||
vInfo("vgId:%d, start to open vnode meta", TD_VID(pVnode));
|
||||
if (metaOpen(pVnode, &pVnode->pMeta, rollback) < 0) {
|
||||
vError("vgId:%d, failed to open vnode meta since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
|
@ -548,6 +549,7 @@ _err:
|
|||
if (pVnode->pMeta) metaClose(&pVnode->pMeta);
|
||||
if (pVnode->freeList) vnodeCloseBufPool(pVnode);
|
||||
|
||||
(void)taosThreadRwlockDestroy(&pVnode->metaRWLock);
|
||||
taosMemoryFree(pVnode);
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -49,7 +49,7 @@ int32_t fillTableColCmpr(SMetaReader *reader, SSchemaExt *pExt, int32_t numOfCol
|
|||
return 0;
|
||||
}
|
||||
|
||||
void vnodePrintTableMeta(STableMetaRsp* pMeta) {
|
||||
void vnodePrintTableMeta(STableMetaRsp *pMeta) {
|
||||
if (!(qDebugFlag & DEBUG_DEBUG)) {
|
||||
return;
|
||||
}
|
||||
|
@ -70,14 +70,13 @@ void vnodePrintTableMeta(STableMetaRsp* pMeta) {
|
|||
qDebug("sysInfo:%d", pMeta->sysInfo);
|
||||
if (pMeta->pSchemas) {
|
||||
for (int32_t i = 0; i < (pMeta->numOfColumns + pMeta->numOfTags); ++i) {
|
||||
SSchema* pSchema = pMeta->pSchemas + i;
|
||||
qDebug("%d col/tag: type:%d, flags:%d, colId:%d, bytes:%d, name:%s", i, pSchema->type, pSchema->flags, pSchema->colId, pSchema->bytes, pSchema->name);
|
||||
SSchema *pSchema = pMeta->pSchemas + i;
|
||||
qDebug("%d col/tag: type:%d, flags:%d, colId:%d, bytes:%d, name:%s", i, pSchema->type, pSchema->flags,
|
||||
pSchema->colId, pSchema->bytes, pSchema->name);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
int32_t vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
|
||||
STableInfoReq infoReq = {0};
|
||||
STableMetaRsp metaRsp = {0};
|
||||
|
@ -528,6 +527,13 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
#define VNODE_DO_META_QUERY(pVnode, cmd) \
|
||||
do { \
|
||||
(void)taosThreadRwlockRdlock(&(pVnode)->metaRWLock); \
|
||||
cmd; \
|
||||
(void)taosThreadRwlockUnlock(&(pVnode)->metaRWLock); \
|
||||
} while (0)
|
||||
|
||||
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
|
||||
SSyncState state = syncGetState(pVnode->sync);
|
||||
pLoad->syncAppliedIndex = pVnode->state.applied;
|
||||
|
@ -543,8 +549,8 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
|
|||
pLoad->learnerProgress = state.progress;
|
||||
pLoad->cacheUsage = tsdbCacheGetUsage(pVnode);
|
||||
pLoad->numOfCachedTables = tsdbCacheGetElems(pVnode);
|
||||
pLoad->numOfTables = metaGetTbNum(pVnode->pMeta);
|
||||
pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta, 1);
|
||||
VNODE_DO_META_QUERY(pVnode, pLoad->numOfTables = metaGetTbNum(pVnode->pMeta));
|
||||
VNODE_DO_META_QUERY(pVnode, pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta, 1));
|
||||
pLoad->totalStorage = (int64_t)3 * 1073741824;
|
||||
pLoad->compStorage = (int64_t)2 * 1073741824;
|
||||
pLoad->pointsWritten = 100;
|
||||
|
|
|
@ -202,9 +202,9 @@ SNode* createAlterDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName, SNode*
|
|||
SNode* createFlushDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName);
|
||||
SNode* createTrimDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName, int32_t maxSpeed);
|
||||
SNode* createS3MigrateDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName);
|
||||
SNode* createCompactStmt(SAstCreateContext* pCxt, SToken* pDbName, SNode* pStart, SNode* pEnd);
|
||||
SNode* createCompactStmt(SAstCreateContext* pCxt, SToken* pDbName, SNode* pStart, SNode* pEnd, bool metaOnly);
|
||||
SNode* createCompactVgroupsStmt(SAstCreateContext* pCxt, SNode* pDbName, SNodeList* vgidList, SNode* pStart,
|
||||
SNode* pEnd);
|
||||
SNode* pEnd, bool metaOnly);
|
||||
SNode* createDefaultTableOptions(SAstCreateContext* pCxt);
|
||||
SNode* createAlterTableOptions(SAstCreateContext* pCxt);
|
||||
SNode* setTableOption(SAstCreateContext* pCxt, SNode* pOptions, ETableOptionType type, void* pVal);
|
||||
|
|
|
@ -242,8 +242,13 @@ cmd ::= ALTER DATABASE db_name(A) alter_db_options(B).
|
|||
cmd ::= FLUSH DATABASE db_name(A). { pCxt->pRootNode = createFlushDatabaseStmt(pCxt, &A); }
|
||||
cmd ::= TRIM DATABASE db_name(A) speed_opt(B). { pCxt->pRootNode = createTrimDatabaseStmt(pCxt, &A, B); }
|
||||
cmd ::= S3MIGRATE DATABASE db_name(A). { pCxt->pRootNode = createS3MigrateDatabaseStmt(pCxt, &A); }
|
||||
cmd ::= COMPACT DATABASE db_name(A) start_opt(B) end_opt(C). { pCxt->pRootNode = createCompactStmt(pCxt, &A, B, C); }
|
||||
cmd ::= COMPACT db_name_cond_opt(A) VGROUPS IN NK_LP integer_list(B) NK_RP start_opt(C) end_opt(D). { pCxt->pRootNode = createCompactVgroupsStmt(pCxt, A, B, C, D); }
|
||||
cmd ::= COMPACT DATABASE db_name(A) start_opt(B) end_opt(C) meta_only(D). { pCxt->pRootNode = createCompactStmt(pCxt, &A, B, C, D); }
|
||||
cmd ::= COMPACT db_name_cond_opt(A) VGROUPS IN NK_LP integer_list(B) NK_RP start_opt(C) end_opt(D) meta_only(E). { pCxt->pRootNode = createCompactVgroupsStmt(pCxt, A, B, C, D, E); }
|
||||
|
||||
%type meta_only { bool }
|
||||
%destructor meta_only { }
|
||||
meta_only(A) ::= . { A = false; }
|
||||
meta_only(A) ::= META_ONLY. { A = true; }
|
||||
|
||||
%type not_exists_opt { bool }
|
||||
%destructor not_exists_opt { }
|
||||
|
|
|
@ -2212,7 +2212,7 @@ _err:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
SNode* createCompactStmt(SAstCreateContext* pCxt, SToken* pDbName, SNode* pStart, SNode* pEnd) {
|
||||
SNode* createCompactStmt(SAstCreateContext* pCxt, SToken* pDbName, SNode* pStart, SNode* pEnd, bool metaOnly) {
|
||||
CHECK_PARSER_STATUS(pCxt);
|
||||
CHECK_NAME(checkDbName(pCxt, pDbName, false));
|
||||
SCompactDatabaseStmt* pStmt = NULL;
|
||||
|
@ -2221,6 +2221,7 @@ SNode* createCompactStmt(SAstCreateContext* pCxt, SToken* pDbName, SNode* pStart
|
|||
COPY_STRING_FORM_ID_TOKEN(pStmt->dbName, pDbName);
|
||||
pStmt->pStart = pStart;
|
||||
pStmt->pEnd = pEnd;
|
||||
pStmt->metaOnly = metaOnly;
|
||||
return (SNode*)pStmt;
|
||||
_err:
|
||||
nodesDestroyNode(pStart);
|
||||
|
@ -2229,7 +2230,7 @@ _err:
|
|||
}
|
||||
|
||||
SNode* createCompactVgroupsStmt(SAstCreateContext* pCxt, SNode* pDbName, SNodeList* vgidList, SNode* pStart,
|
||||
SNode* pEnd) {
|
||||
SNode* pEnd, bool metaOnly) {
|
||||
CHECK_PARSER_STATUS(pCxt);
|
||||
if (NULL == pDbName) {
|
||||
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "database not specified");
|
||||
|
@ -2243,6 +2244,7 @@ SNode* createCompactVgroupsStmt(SAstCreateContext* pCxt, SNode* pDbName, SNodeLi
|
|||
pStmt->vgidList = vgidList;
|
||||
pStmt->pStart = pStart;
|
||||
pStmt->pEnd = pEnd;
|
||||
pStmt->metaOnly = metaOnly;
|
||||
return (SNode*)pStmt;
|
||||
_err:
|
||||
nodesDestroyNode(pDbName);
|
||||
|
|
|
@ -54,8 +54,12 @@ int32_t qCloneCurrentTbData(STableDataCxt* pDataBlock, SSubmitTbData** pData) {
|
|||
|
||||
int32_t colNum = taosArrayGetSize(pNew->aCol);
|
||||
for (int32_t i = 0; i < colNum; ++i) {
|
||||
SColData* pCol = (SColData*)taosArrayGet(pNew->aCol, i);
|
||||
tColDataDeepClear(pCol);
|
||||
if (pDataBlock->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
|
||||
SColData* pCol = (SColData*)taosArrayGet(pNew->aCol, i);
|
||||
tColDataDeepClear(pCol);
|
||||
} else {
|
||||
pNew->aCol = taosArrayInit(20, POINTER_BYTES);
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -324,7 +328,7 @@ int32_t qBindStmtStbColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind
|
|||
int16_t lastColId = -1;
|
||||
bool colInOrder = true;
|
||||
|
||||
if (NULL == *pTSchema) {
|
||||
if (NULL == pTSchema || NULL == *pTSchema) {
|
||||
*pTSchema = tBuildTSchema(pSchema, pDataBlock->pMeta->tableInfo.numOfColumns, pDataBlock->pMeta->sversion);
|
||||
}
|
||||
|
||||
|
@ -693,7 +697,7 @@ int32_t qBindStmtStbColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bin
|
|||
bool colInOrder = true;
|
||||
int ncharColNums = 0;
|
||||
|
||||
if (NULL == *pTSchema) {
|
||||
if (NULL == pTSchema || NULL == *pTSchema) {
|
||||
*pTSchema = tBuildTSchema(pSchema, pDataBlock->pMeta->tableInfo.numOfColumns, pDataBlock->pMeta->sversion);
|
||||
}
|
||||
|
||||
|
@ -739,6 +743,22 @@ int32_t qBindStmtStbColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bin
|
|||
goto _return;
|
||||
}
|
||||
pBindInfos[c].bind = taosArrayGetLast(ncharBinds);
|
||||
} else if (TSDB_DATA_TYPE_GEOMETRY == pColSchema->type) {
|
||||
code = initCtxAsText();
|
||||
if (code) {
|
||||
qError("geometry init failed:%s", tstrerror(code));
|
||||
goto _return;
|
||||
}
|
||||
uint8_t* buf = bind[c].buffer;
|
||||
for (int j = 0; j < bind[c].num; j++) {
|
||||
code = checkWKB(buf, bind[c].length[j]);
|
||||
if (code) {
|
||||
qError("geometry data must be in WKB format");
|
||||
goto _return;
|
||||
}
|
||||
buf += bind[c].length[j];
|
||||
}
|
||||
pBindInfos[c].bind = bind + c;
|
||||
} else {
|
||||
pBindInfos[c].bind = bind + c;
|
||||
}
|
||||
|
@ -816,7 +836,8 @@ static int32_t convertStmtNcharCol2(SMsgBuf* pMsgBuf, SSchema* pSchema, TAOS_STM
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qBindStmtColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen, void *charsetCxt) {
|
||||
int32_t qBindStmtColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen,
|
||||
void* charsetCxt) {
|
||||
STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
|
||||
SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta);
|
||||
SBoundColInfo* boundInfo = &pDataBlock->boundColsInfo;
|
||||
|
@ -834,7 +855,7 @@ int32_t qBindStmtColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind,
|
|||
goto _return;
|
||||
}
|
||||
|
||||
if(boundInfo->pColIndex[c]==0){
|
||||
if (boundInfo->pColIndex[c] == 0) {
|
||||
pCol->cflag |= COL_IS_KEY;
|
||||
}
|
||||
|
||||
|
@ -926,6 +947,94 @@ _return:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t qBindStmt2RowValue(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen,
|
||||
STSchema** pTSchema, SBindInfo2* pBindInfos, void* charsetCxt) {
|
||||
STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
|
||||
SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta);
|
||||
SBoundColInfo* boundInfo = &pDataBlock->boundColsInfo;
|
||||
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
|
||||
int32_t rowNum = bind->num;
|
||||
TAOS_STMT2_BIND ncharBind = {0};
|
||||
TAOS_STMT2_BIND* pBind = NULL;
|
||||
int32_t code = 0;
|
||||
int16_t lastColId = -1;
|
||||
bool colInOrder = true;
|
||||
|
||||
if (NULL == pTSchema || NULL == *pTSchema) {
|
||||
*pTSchema = tBuildTSchema(pSchema, pDataBlock->pMeta->tableInfo.numOfColumns, pDataBlock->pMeta->sversion);
|
||||
}
|
||||
|
||||
for (int c = 0; c < boundInfo->numOfBound; ++c) {
|
||||
SSchema* pColSchema = &pSchema[boundInfo->pColIndex[c]];
|
||||
if (pColSchema->colId <= lastColId) {
|
||||
colInOrder = false;
|
||||
} else {
|
||||
lastColId = pColSchema->colId;
|
||||
}
|
||||
|
||||
if (bind[c].num != rowNum) {
|
||||
code = buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
|
||||
goto _return;
|
||||
}
|
||||
|
||||
if ((!(rowNum == 1 && bind[c].is_null && *bind[c].is_null)) &&
|
||||
bind[c].buffer_type != pColSchema->type) { // for rowNum ==1 , connector may not set buffer_type
|
||||
code = buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
|
||||
goto _return;
|
||||
}
|
||||
|
||||
if (TSDB_DATA_TYPE_NCHAR == pColSchema->type) {
|
||||
code = convertStmtNcharCol2(&pBuf, pColSchema, bind + c, &ncharBind, charsetCxt);
|
||||
if (code) {
|
||||
goto _return;
|
||||
}
|
||||
pBindInfos[c].bind = &ncharBind;
|
||||
} else if (TSDB_DATA_TYPE_GEOMETRY == pColSchema->type) {
|
||||
code = initCtxAsText();
|
||||
if (code) {
|
||||
qError("geometry init failed:%s", tstrerror(code));
|
||||
goto _return;
|
||||
}
|
||||
uint8_t *buf = bind[c].buffer;
|
||||
for (int j = 0; j < bind[c].num; j++) {
|
||||
code = checkWKB(buf, bind[c].length[j]);
|
||||
if (code) {
|
||||
qError("geometry data must be in WKB format");
|
||||
goto _return;
|
||||
}
|
||||
buf += bind[c].length[j];
|
||||
}
|
||||
pBindInfos[c].bind = bind + c;
|
||||
} else {
|
||||
pBindInfos[c].bind = bind + c;
|
||||
}
|
||||
|
||||
pBindInfos[c].columnId = pColSchema->colId;
|
||||
pBindInfos[c].type = pColSchema->type;
|
||||
pBindInfos[c].bytes = pColSchema->bytes;
|
||||
|
||||
if (code) {
|
||||
goto _return;
|
||||
}
|
||||
}
|
||||
|
||||
pDataBlock->pData->flags &= ~SUBMIT_REQ_COLUMN_DATA_FORMAT;
|
||||
if (pDataBlock->pData->pCreateTbReq != NULL) {
|
||||
pDataBlock->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
|
||||
}
|
||||
|
||||
code = tRowBuildFromBind2(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols, &pDataBlock->ordered,
|
||||
&pDataBlock->duplicateTs);
|
||||
qDebug("stmt2 all %d columns bind %d rows data as row format", boundInfo->numOfBound, rowNum);
|
||||
|
||||
_return:
|
||||
|
||||
taosMemoryFree(ncharBind.buffer);
|
||||
taosMemoryFree(ncharBind.length);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t buildBoundFields(int32_t numOfBound, int16_t* boundColumns, SSchema* pSchema, int32_t* fieldNum,
|
||||
TAOS_FIELD_E** fields, uint8_t timePrec) {
|
||||
if (fields != NULL) {
|
||||
|
@ -1114,15 +1223,19 @@ int32_t qResetStmtDataBlock(STableDataCxt* block, bool deepClear) {
|
|||
int32_t colNum = taosArrayGetSize(pBlock->pData->aCol);
|
||||
|
||||
for (int32_t i = 0; i < colNum; ++i) {
|
||||
SColData* pCol = (SColData*)taosArrayGet(pBlock->pData->aCol, i);
|
||||
if (pCol == NULL) {
|
||||
qError("qResetStmtDataBlock column is NULL");
|
||||
return terrno;
|
||||
}
|
||||
if (deepClear) {
|
||||
tColDataDeepClear(pCol);
|
||||
if (pBlock->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
|
||||
SColData* pCol = (SColData*)taosArrayGet(pBlock->pData->aCol, i);
|
||||
if (pCol == NULL) {
|
||||
qError("qResetStmtDataBlock column is NULL");
|
||||
return terrno;
|
||||
}
|
||||
if (deepClear) {
|
||||
tColDataDeepClear(pCol);
|
||||
} else {
|
||||
tColDataClear(pCol);
|
||||
}
|
||||
} else {
|
||||
tColDataClear(pCol);
|
||||
pBlock->pData->aRowP = taosArrayInit(20, POINTER_BYTES);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -361,7 +361,8 @@ static SKeyword keywordTable[] = {
|
|||
{"NOTIFY_HISTORY", TK_NOTIFY_HISTORY},
|
||||
{"REGEXP", TK_REGEXP},
|
||||
{"ASSIGN", TK_ASSIGN},
|
||||
{"TRUE_FOR", TK_TRUE_FOR}
|
||||
{"TRUE_FOR", TK_TRUE_FOR},
|
||||
{"META_ONLY", TK_META_ONLY}
|
||||
};
|
||||
// clang-format on
|
||||
|
||||
|
|
|
@ -11371,9 +11371,11 @@ static int32_t translateCompactRange(STranslateContext* pCxt, const char* dbName
|
|||
}
|
||||
|
||||
static int32_t translateCompactDb(STranslateContext* pCxt, SCompactDatabaseStmt* pStmt) {
|
||||
SCompactDbReq compactReq = {0};
|
||||
SName name;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SCompactDbReq compactReq = {
|
||||
.metaOnly = pStmt->metaOnly,
|
||||
};
|
||||
SName name;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
code = tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->dbName, strlen(pStmt->dbName));
|
||||
if (TSDB_CODE_SUCCESS != code) return code;
|
||||
|
||||
|
@ -11440,7 +11442,9 @@ static int32_t translateVgroupList(STranslateContext* pCxt, SNodeList* vgroupLis
|
|||
static int32_t translateCompactVgroups(STranslateContext* pCxt, SCompactVgroupsStmt* pStmt) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SName name;
|
||||
SCompactDbReq req = {0};
|
||||
SCompactDbReq req = {
|
||||
.metaOnly = pStmt->metaOnly,
|
||||
};
|
||||
|
||||
code = tNameSetDbName(&name, pCxt->pParseCxt->acctId, ((SValueNode*)pStmt->pDbName)->literal,
|
||||
strlen(((SValueNode*)pStmt->pDbName)->literal));
|
||||
|
|
|
@ -1813,3 +1813,6 @@
|
|||
,,n,develop-test,python3 ./test.py -f 2-query/ts-range.py
|
||||
,,n,develop-test,python3 ./test.py -f 2-query/tag_scan.py
|
||||
,,n,develop-test,python3 ./test.py -f 2-query/show_create_db.py
|
||||
|
||||
# new test
|
||||
,,y,test_new,./pytest.sh python3 ./test.py -f storage/compact/test_compact_meta.py
|
||||
|
|
|
@ -22,6 +22,7 @@ from util.log import *
|
|||
import platform
|
||||
import ast
|
||||
|
||||
|
||||
class TDCase:
|
||||
def __init__(self, name, case):
|
||||
self.name = name
|
||||
|
@ -54,8 +55,9 @@ class TDCases:
|
|||
def get_local_classes_in_order(self, file_path):
|
||||
with open(file_path, "r", encoding="utf-8") as file:
|
||||
tree = ast.parse(file.read(), filename=file_path)
|
||||
|
||||
classes = [node.name for node in ast.walk(tree) if isinstance(node, ast.ClassDef)]
|
||||
|
||||
classes = [node.name for node in ast.walk(
|
||||
tree) if isinstance(node, ast.ClassDef)]
|
||||
return classes
|
||||
|
||||
def runAllLinux(self, conn):
|
||||
|
@ -123,7 +125,7 @@ class TDCases:
|
|||
class_names = self.get_local_classes_in_order(fileName)
|
||||
case_class = getattr(testModule, class_names[-1])
|
||||
case = case_class()
|
||||
case.init(conn, self._logSql,replicaVar)
|
||||
case.init(conn, self._logSql, replicaVar)
|
||||
try:
|
||||
case.run()
|
||||
except Exception as e:
|
||||
|
@ -196,19 +198,20 @@ class TDCases:
|
|||
else:
|
||||
tdLog.info("taosBenchmark found in %s" % paths[0])
|
||||
return paths[0]
|
||||
|
||||
|
||||
def taosBenchmarkExec(self, param):
|
||||
buildPath = tdCases.getTaosBenchmarkPath()
|
||||
|
||||
if (platform.system().lower() == 'windows'):
|
||||
cmdStr1 = ' mintty -h never %s %s '%(buildPath, param)
|
||||
cmdStr1 = ' mintty -h never %s %s ' % (buildPath, param)
|
||||
tdLog.info(cmdStr1)
|
||||
os.system(cmdStr1)
|
||||
else:
|
||||
cmdStr1 = '%s %s &'%(buildPath, param)
|
||||
cmdStr1 = '%s %s &' % (buildPath, param)
|
||||
tdLog.info(cmdStr1)
|
||||
os.system(cmdStr1)
|
||||
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
|
||||
|
||||
tdCases = TDCases()
|
||||
|
|
|
@ -0,0 +1,248 @@
|
|||
# tests/test_new/xxx/xxx/test_xxx.py
|
||||
# import ...
|
||||
'''
|
||||
./pytest.sh python3 ./test.py -f storage/compact/test_compact_meta.py
|
||||
'''
|
||||
|
||||
import taos
|
||||
import sys
|
||||
from math import inf
|
||||
|
||||
from util.dnodes import tdDnodes
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.log import *
|
||||
import inspect
|
||||
import random
|
||||
|
||||
sys.path.append("../tests/pytest")
|
||||
|
||||
|
||||
class TestCompactMeta:
|
||||
def caseDescription(self):
|
||||
'''
|
||||
case1<Hongze Cheng>: [TS-5445] Compact Meta Data
|
||||
'''
|
||||
return
|
||||
|
||||
def init(self, conn, logSql, replicaVer=1):
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor(), True)
|
||||
self.conn = conn
|
||||
|
||||
def run(self):
|
||||
self.test_case1()
|
||||
self.test_case2()
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
||||
def test_case1(self):
|
||||
"""
|
||||
Description:
|
||||
1. Alter child table tags
|
||||
2. Make sure compact meta works
|
||||
"""
|
||||
tdLog.info(f'case {inspect.currentframe().f_code.co_name} start')
|
||||
|
||||
db_name = 'db1'
|
||||
stb_name = 'stb1'
|
||||
ctb_name_prefix = 'ctb'
|
||||
num_child_tables = 10000
|
||||
|
||||
# Drop database
|
||||
sql = f'drop database if exists {db_name}'
|
||||
tdSql.execute(sql)
|
||||
|
||||
# Create database
|
||||
sql = f'create database {db_name} vgroups 1'
|
||||
tdSql.execute(sql)
|
||||
|
||||
# Create super table
|
||||
sql = f'create table {db_name}.{stb_name} (ts timestamp, c1 int, c2 int) tags(t1 int)'
|
||||
tdSql.execute(sql)
|
||||
|
||||
# Create child tables
|
||||
tdLog.info(
|
||||
f'case {inspect.currentframe().f_code.co_name}: create {num_child_tables} child tables')
|
||||
for i in range(1, num_child_tables+1):
|
||||
if i % 100 == 0:
|
||||
tdLog.info(
|
||||
f'case {inspect.currentframe().f_code.co_name}: create {i} child tables')
|
||||
sql = f'create table {db_name}.{ctb_name_prefix}{i} using {db_name}.{stb_name} tags({i})'
|
||||
tdSql.execute(sql)
|
||||
|
||||
# Insert some data
|
||||
tdLog.info(
|
||||
f'case {inspect.currentframe().f_code.co_name}: insert data to child tables')
|
||||
for i in range(1, num_child_tables+1):
|
||||
if i % 100 == 0:
|
||||
tdLog.info(
|
||||
f'case {inspect.currentframe().f_code.co_name}: insert data to {i} child tables')
|
||||
sql = f'insert into {db_name}.{ctb_name_prefix}{i} values(now, 1, 2)'
|
||||
tdSql.execute(sql)
|
||||
|
||||
# Alter child table tags
|
||||
tdLog.info(
|
||||
f'case {inspect.currentframe().f_code.co_name}: alter child table tags')
|
||||
for i in range(1, num_child_tables+1):
|
||||
if i % 100 == 0:
|
||||
tdLog.info(
|
||||
f'case {inspect.currentframe().f_code.co_name}: altered {i} child tables')
|
||||
sql = f'alter table {db_name}.{ctb_name_prefix}{i} set tag t1 = {i+1}'
|
||||
tdSql.execute(sql)
|
||||
|
||||
# Randomly select 100 child tables to do query
|
||||
tdLog.info(
|
||||
f'case {inspect.currentframe().f_code.co_name}: randomly select 100 child tables to query')
|
||||
selected_tables = random.sample(range(1, num_child_tables + 1), 100)
|
||||
for i, table_idx in enumerate(selected_tables):
|
||||
# Query data from the child table
|
||||
sql = f'select count(*) from {db_name}.{stb_name} where t1 = {table_idx + 1}'
|
||||
tdSql.query(sql)
|
||||
tdSql.checkData(0, 0, 1) # Check c2 column value
|
||||
|
||||
# Compact meta
|
||||
tdLog.info(
|
||||
f'case {inspect.currentframe().f_code.co_name}: start to compact meta')
|
||||
sql = f'compact database {db_name} meta_only'
|
||||
tdSql.execute(sql)
|
||||
|
||||
# Wait for the compact is done
|
||||
tdLog.info(
|
||||
f'case {inspect.currentframe().f_code.co_name}: wait compact is done')
|
||||
while True:
|
||||
sql = 'show compacts'
|
||||
rows = tdSql.query(sql)
|
||||
if rows == 0:
|
||||
break
|
||||
time.sleep(1)
|
||||
|
||||
# Write more data
|
||||
tdLog.info(
|
||||
f'case {inspect.currentframe().f_code.co_name}: insert more data to child tables')
|
||||
for i in range(1, num_child_tables+1):
|
||||
if i % 100 == 0:
|
||||
tdLog.info(
|
||||
f'case {inspect.currentframe().f_code.co_name}: insert data to {i} child tables')
|
||||
sql = f'insert into {db_name}.{ctb_name_prefix}{i} values(now, 1, 2)'
|
||||
tdSql.execute(sql)
|
||||
|
||||
# Randomly select 100 child tables to do query
|
||||
tdLog.info(
|
||||
f'case {inspect.currentframe().f_code.co_name}: query data again to verify')
|
||||
for i, table_idx in enumerate(selected_tables):
|
||||
# Query data from the child table
|
||||
sql = f'select count(*) from {db_name}.{stb_name} where t1 = {table_idx + 1}'
|
||||
tdSql.query(sql)
|
||||
tdSql.checkData(0, 0, 2) # Check c2 column value
|
||||
|
||||
def test_case2(self):
|
||||
"""
|
||||
Description:
|
||||
1. Alter super table schema
|
||||
2. Make sure compact meta works
|
||||
"""
|
||||
tdLog.info(f'case {inspect.currentframe().f_code.co_name} start')
|
||||
|
||||
db_name = 'db2'
|
||||
stb_name = 'stb2'
|
||||
ctb_name_prefix = 'ctb'
|
||||
num_child_tables = 1000
|
||||
|
||||
# Drop database
|
||||
sql = f'drop database if exists {db_name}'
|
||||
tdSql.execute(sql)
|
||||
|
||||
# Create database
|
||||
sql = f'create database {db_name} vgroups 1'
|
||||
tdSql.execute(sql)
|
||||
|
||||
# Create super table
|
||||
sql = f'create table {db_name}.{stb_name} (ts timestamp, c1 int, c2 int) tags(t1 int)'
|
||||
tdSql.execute(sql)
|
||||
|
||||
# Create child tables
|
||||
tdLog.info(
|
||||
f'case {inspect.currentframe().f_code.co_name}: create {num_child_tables} child tables')
|
||||
for i in range(1, num_child_tables+1):
|
||||
if i % 100 == 0:
|
||||
tdLog.info(
|
||||
f'case {inspect.currentframe().f_code.co_name}: create {i} child tables')
|
||||
sql = f'create table {db_name}.{ctb_name_prefix}{i} using {db_name}.{stb_name} tags({i})'
|
||||
tdSql.execute(sql)
|
||||
|
||||
# Insert some data
|
||||
tdLog.info(
|
||||
f'case {inspect.currentframe().f_code.co_name}: insert data to child tables')
|
||||
for i in range(1, num_child_tables+1):
|
||||
if i % 100 == 0:
|
||||
tdLog.info(
|
||||
f'case {inspect.currentframe().f_code.co_name}: insert data to {i} child tables')
|
||||
sql = f'insert into {db_name}.{ctb_name_prefix}{i} (ts, c1) values (now, 1)'
|
||||
tdSql.execute(sql)
|
||||
|
||||
# Alter super table schema
|
||||
tdLog.info(
|
||||
f'case {inspect.currentframe().f_code.co_name}: alter super table schema')
|
||||
for i in range(3, 2000):
|
||||
if i % 100 == 0:
|
||||
tdLog.info(
|
||||
f'case {inspect.currentframe().f_code.co_name}: altered {i} times of super table schema')
|
||||
# Add a column
|
||||
sql = f'alter table {db_name}.{stb_name} add column c{i} int'
|
||||
tdSql.execute(sql)
|
||||
|
||||
# Drop a column
|
||||
sql = f'alter table {db_name}.{stb_name} drop column c{i}'
|
||||
tdSql.execute(sql)
|
||||
|
||||
# Randomly select 100 child tables to do query
|
||||
tdLog.info(
|
||||
f'case {inspect.currentframe().f_code.co_name}: randomly select 100 child tables to query')
|
||||
selected_tables = random.sample(range(1, num_child_tables + 1), 100)
|
||||
for i, table_idx in enumerate(selected_tables):
|
||||
# Query data from the child table
|
||||
sql = f'select count(*) from {db_name}.{stb_name} where t1 = {table_idx + 1}'
|
||||
tdSql.query(sql)
|
||||
tdSql.checkData(0, 0, 1) # Check c2 column value
|
||||
|
||||
# Compact meta
|
||||
tdLog.info(
|
||||
f'case {inspect.currentframe().f_code.co_name}: start to compact meta')
|
||||
sql = f'compact database {db_name} meta_only'
|
||||
tdSql.execute(sql)
|
||||
|
||||
# Wait for the compact is done
|
||||
tdLog.info(
|
||||
f'case {inspect.currentframe().f_code.co_name}: wait compact is done')
|
||||
while True:
|
||||
sql = 'show compacts'
|
||||
rows = tdSql.query(sql)
|
||||
if rows == 0:
|
||||
break
|
||||
time.sleep(1)
|
||||
|
||||
# Write more data
|
||||
tdLog.info(
|
||||
f'case {inspect.currentframe().f_code.co_name}: insert more data to child tables')
|
||||
for i in range(1, num_child_tables+1):
|
||||
if i % 100 == 0:
|
||||
tdLog.info(
|
||||
f'case {inspect.currentframe().f_code.co_name}: insert data to {i} child tables')
|
||||
sql = f'insert into {db_name}.{ctb_name_prefix}{i} values(now, 1, 2)'
|
||||
tdSql.execute(sql)
|
||||
|
||||
# Randomly select 100 child tables to do query
|
||||
tdLog.info(
|
||||
f'case {inspect.currentframe().f_code.co_name}: query data again to verify')
|
||||
for i, table_idx in enumerate(selected_tables):
|
||||
# Query data from the child table
|
||||
sql = f'select count(*) from {db_name}.{stb_name} where t1 = {table_idx + 1}'
|
||||
tdSql.query(sql)
|
||||
tdSql.checkData(0, 0, 2) # Check c2 column value
|
||||
|
||||
|
||||
tdCases.addWindows(__file__, TestCompactMeta())
|
||||
tdCases.addLinux(__file__, TestCompactMeta())
|
|
@ -24,7 +24,6 @@ import platform
|
|||
import socket
|
||||
import threading
|
||||
import importlib
|
||||
import ast
|
||||
print(f"Python version: {sys.version}")
|
||||
print(f"Version info: {sys.version_info}")
|
||||
|
||||
|
@ -40,14 +39,15 @@ import taos
|
|||
import taosrest
|
||||
import taosws
|
||||
|
||||
|
||||
def checkRunTimeError():
|
||||
import win32gui
|
||||
timeCount = 0
|
||||
while 1:
|
||||
time.sleep(1)
|
||||
timeCount = timeCount + 1
|
||||
print("checkRunTimeError",timeCount)
|
||||
if (timeCount>1200):
|
||||
print("checkRunTimeError", timeCount)
|
||||
if (timeCount > 1200):
|
||||
print("stop the test.")
|
||||
os.system("TASKKILL /F /IM taosd.exe")
|
||||
os.system("TASKKILL /F /IM taos.exe")
|
||||
|
@ -55,15 +55,18 @@ def checkRunTimeError():
|
|||
os.system("TASKKILL /F /IM mintty.exe")
|
||||
os.system("TASKKILL /F /IM python.exe")
|
||||
quit(0)
|
||||
hwnd = win32gui.FindWindow(None, "Microsoft Visual C++ Runtime Library")
|
||||
hwnd = win32gui.FindWindow(
|
||||
None, "Microsoft Visual C++ Runtime Library")
|
||||
if hwnd:
|
||||
os.system("TASKKILL /F /IM taosd.exe")
|
||||
|
||||
|
||||
def get_local_classes_in_order(file_path):
|
||||
with open(file_path, "r", encoding="utf-8") as file:
|
||||
tree = ast.parse(file.read(), filename=file_path)
|
||||
|
||||
classes = [node.name for node in ast.walk(tree) if isinstance(node, ast.ClassDef)]
|
||||
|
||||
classes = [node.name for node in ast.walk(
|
||||
tree) if isinstance(node, ast.ClassDef)]
|
||||
return classes
|
||||
|
||||
|
||||
|
@ -74,6 +77,8 @@ def dynamicLoadModule(fileName):
|
|||
#
|
||||
# run case on previous cluster
|
||||
#
|
||||
|
||||
|
||||
def runOnPreviousCluster(host, config, fileName):
|
||||
print("enter run on previeous")
|
||||
|
||||
|
@ -85,7 +90,7 @@ def runOnPreviousCluster(host, config, fileName):
|
|||
uModule = dynamicLoadModule(fileName)
|
||||
class_names = get_local_classes_in_order(fileName)
|
||||
case_class = getattr(uModule, class_names[-1])
|
||||
case = case_class()
|
||||
case = case_class()
|
||||
|
||||
# create conn
|
||||
conn = taos.connect(host, config)
|
||||
|
@ -130,7 +135,7 @@ if __name__ == "__main__":
|
|||
previousCluster = False
|
||||
crashGen = False
|
||||
opts, args = getopt.gnu_getopt(sys.argv[1:], 'f:p:m:l:scghrd:k:e:N:M:Q:C:RWD:n:i:aP:G', [
|
||||
'file=', 'path=', 'master', 'logSql', 'stop', 'cluster', 'valgrind', 'help', 'restart', 'updateCfgDict', 'killv', 'execCmd','dnodeNums','mnodeNums','queryPolicy','createDnodeNums','restful','websocket','adaptercfgupdate','replicaVar','independentMnode','previous',"crashGen"])
|
||||
'file=', 'path=', 'master', 'logSql', 'stop', 'cluster', 'valgrind', 'help', 'restart', 'updateCfgDict', 'killv', 'execCmd', 'dnodeNums', 'mnodeNums', 'queryPolicy', 'createDnodeNums', 'restful', 'websocket', 'adaptercfgupdate', 'replicaVar', 'independentMnode', 'previous', "crashGen"])
|
||||
for key, value in opts:
|
||||
if key in ['-h', '--help']:
|
||||
tdLog.printNoPrefix(
|
||||
|
@ -156,7 +161,8 @@ if __name__ == "__main__":
|
|||
tdLog.printNoPrefix('-n the number of replicas')
|
||||
tdLog.printNoPrefix('-i independentMnode Mnode')
|
||||
tdLog.printNoPrefix('-a address sanitizer mode')
|
||||
tdLog.printNoPrefix('-P run case with [P]revious cluster, do not create new cluster to run case.')
|
||||
tdLog.printNoPrefix(
|
||||
'-P run case with [P]revious cluster, do not create new cluster to run case.')
|
||||
tdLog.printNoPrefix('-G crashGen mode')
|
||||
|
||||
sys.exit(0)
|
||||
|
@ -234,7 +240,8 @@ if __name__ == "__main__":
|
|||
|
||||
if key in ['-D', '--adaptercfgupdate']:
|
||||
try:
|
||||
adaptercfgupdate = eval(base64.b64decode(value.encode()).decode())
|
||||
adaptercfgupdate = eval(
|
||||
base64.b64decode(value.encode()).decode())
|
||||
except:
|
||||
print('adapter cfg update convert fail.')
|
||||
sys.exit(0)
|
||||
|
@ -248,7 +255,6 @@ if __name__ == "__main__":
|
|||
if key in ['-G', '--crashGen']:
|
||||
crashGen = True
|
||||
|
||||
|
||||
#
|
||||
# do exeCmd command
|
||||
#
|
||||
|
@ -275,7 +281,7 @@ if __name__ == "__main__":
|
|||
psCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}'" % toBeKilled
|
||||
processID = subprocess.check_output(psCmd, shell=True)
|
||||
|
||||
while(processID):
|
||||
while (processID):
|
||||
os.system(killCmd)
|
||||
time.sleep(1)
|
||||
processID = subprocess.check_output(psCmd, shell=True)
|
||||
|
@ -302,7 +308,7 @@ if __name__ == "__main__":
|
|||
# psCmd = f"pgrep {toBeKilled}"
|
||||
processID = subprocess.check_output(psCmd, shell=True)
|
||||
|
||||
while(processID):
|
||||
while (processID):
|
||||
os.system(killCmd)
|
||||
time.sleep(1)
|
||||
processID = subprocess.check_output(psCmd, shell=True)
|
||||
|
@ -349,7 +355,7 @@ if __name__ == "__main__":
|
|||
if platform.system().lower() == 'windows':
|
||||
fileName = fileName.replace("/", os.sep)
|
||||
if (masterIp == "" and not fileName == "0-others\\udf_create.py"):
|
||||
threading.Thread(target=checkRunTimeError,daemon=True).start()
|
||||
threading.Thread(target=checkRunTimeError, daemon=True).start()
|
||||
tdLog.info("Procedures for testing self-deployment")
|
||||
tdDnodes.init(deployPath, masterIp)
|
||||
tdDnodes.setTestCluster(testCluster)
|
||||
|
@ -372,7 +378,8 @@ if __name__ == "__main__":
|
|||
ucase = case_class()
|
||||
if ((json.dumps(updateCfgDict) == '{}') and hasattr(ucase, 'updatecfgDict')):
|
||||
updateCfgDict = ucase.updatecfgDict
|
||||
updateCfgDictStr = "-d %s"%base64.b64encode(json.dumps(updateCfgDict).encode()).decode()
|
||||
updateCfgDictStr = "-d %s" % base64.b64encode(
|
||||
json.dumps(updateCfgDict).encode()).decode()
|
||||
if ((json.dumps(adapter_cfg_dict) == '{}') and hasattr(ucase, 'taosadapter_cfg_dict')):
|
||||
adapter_cfg_dict = ucase.taosadapter_cfg_dict
|
||||
# adapter_cfg_dict_str = f"-D {base64.b64encode(toml.dumps(adapter_cfg_dict).encode()).decode()}"
|
||||
|
@ -384,8 +391,8 @@ if __name__ == "__main__":
|
|||
tAdapter.init(deployPath, masterIp)
|
||||
tAdapter.stop(force_kill=True)
|
||||
|
||||
if dnodeNums == 1 :
|
||||
tdDnodes.deploy(1,updateCfgDict)
|
||||
if dnodeNums == 1:
|
||||
tdDnodes.deploy(1, updateCfgDict)
|
||||
tdDnodes.start(1)
|
||||
tdCases.logSql(logSql)
|
||||
if restful or websocket:
|
||||
|
@ -393,13 +400,15 @@ if __name__ == "__main__":
|
|||
tAdapter.start()
|
||||
|
||||
if queryPolicy != 1:
|
||||
queryPolicy=int(queryPolicy)
|
||||
queryPolicy = int(queryPolicy)
|
||||
if restful:
|
||||
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc")
|
||||
conn = taosrest.connect(
|
||||
url=f"http://{host}:6041", timezone="utc")
|
||||
elif websocket:
|
||||
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041")
|
||||
conn = taosws.connect(
|
||||
f"taosws://root:taosdata@{host}:6041")
|
||||
else:
|
||||
conn = taos.connect(host,config=tdDnodes.getSimCfgPath())
|
||||
conn = taos.connect(host, config=tdDnodes.getSimCfgPath())
|
||||
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("create qnode on dnode 1")
|
||||
|
@ -407,16 +416,20 @@ if __name__ == "__main__":
|
|||
cursor.execute("show local variables")
|
||||
res = cursor.fetchall()
|
||||
for i in range(cursor.rowcount):
|
||||
if res[i][0] == "queryPolicy" :
|
||||
if res[i][0] == "queryPolicy":
|
||||
if int(res[i][1]) == int(queryPolicy):
|
||||
tdLog.info(f'alter queryPolicy to {queryPolicy} successfully')
|
||||
tdLog.info(
|
||||
f'alter queryPolicy to {queryPolicy} successfully')
|
||||
cursor.close()
|
||||
else:
|
||||
tdLog.debug(res)
|
||||
tdLog.exit(f"alter queryPolicy to {queryPolicy} failed")
|
||||
else :
|
||||
tdLog.debug("create an cluster with %s nodes and make %s dnode as independent mnode"%(dnodeNums,mnodeNums))
|
||||
dnodeslist = cluster.configure_cluster(dnodeNums=dnodeNums, mnodeNums=mnodeNums, independentMnode=independentMnode)
|
||||
tdLog.exit(
|
||||
f"alter queryPolicy to {queryPolicy} failed")
|
||||
else:
|
||||
tdLog.debug("create an cluster with %s nodes and make %s dnode as independent mnode" % (
|
||||
dnodeNums, mnodeNums))
|
||||
dnodeslist = cluster.configure_cluster(
|
||||
dnodeNums=dnodeNums, mnodeNums=mnodeNums, independentMnode=independentMnode)
|
||||
tdDnodes = ClusterDnodes(dnodeslist)
|
||||
tdDnodes.init(deployPath, masterIp)
|
||||
tdDnodes.setTestCluster(testCluster)
|
||||
|
@ -433,31 +446,34 @@ if __name__ == "__main__":
|
|||
tAdapter.start()
|
||||
|
||||
if restful:
|
||||
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc")
|
||||
conn = taosrest.connect(
|
||||
url=f"http://{host}:6041", timezone="utc")
|
||||
elif websocket:
|
||||
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041")
|
||||
else:
|
||||
conn = taos.connect(host,config=tdDnodes.getSimCfgPath())
|
||||
conn = taos.connect(host, config=tdDnodes.getSimCfgPath())
|
||||
# tdLog.info(tdDnodes.getSimCfgPath(),host)
|
||||
if createDnodeNums == 1:
|
||||
createDnodeNums=dnodeNums
|
||||
createDnodeNums = dnodeNums
|
||||
else:
|
||||
createDnodeNums=createDnodeNums
|
||||
cluster.create_dnode(conn,createDnodeNums)
|
||||
cluster.create_mnode(conn,mnodeNums)
|
||||
createDnodeNums = createDnodeNums
|
||||
cluster.create_dnode(conn, createDnodeNums)
|
||||
cluster.create_mnode(conn, mnodeNums)
|
||||
try:
|
||||
if cluster.check_dnode(conn) :
|
||||
if cluster.check_dnode(conn):
|
||||
print("check dnode ready")
|
||||
except Exception as r:
|
||||
print(r)
|
||||
if queryPolicy != 1:
|
||||
queryPolicy=int(queryPolicy)
|
||||
queryPolicy = int(queryPolicy)
|
||||
if restful:
|
||||
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc")
|
||||
conn = taosrest.connect(
|
||||
url=f"http://{host}:6041", timezone="utc")
|
||||
elif websocket:
|
||||
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041")
|
||||
conn = taosws.connect(
|
||||
f"taosws://root:taosdata@{host}:6041")
|
||||
else:
|
||||
conn = taos.connect(host,config=tdDnodes.getSimCfgPath())
|
||||
conn = taos.connect(host, config=tdDnodes.getSimCfgPath())
|
||||
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("create qnode on dnode 1")
|
||||
|
@ -465,23 +481,27 @@ if __name__ == "__main__":
|
|||
cursor.execute("show local variables")
|
||||
res = cursor.fetchall()
|
||||
for i in range(cursor.rowcount):
|
||||
if res[i][0] == "queryPolicy" :
|
||||
if res[i][0] == "queryPolicy":
|
||||
if int(res[i][1]) == int(queryPolicy):
|
||||
tdLog.info(f'alter queryPolicy to {queryPolicy} successfully')
|
||||
tdLog.info(
|
||||
f'alter queryPolicy to {queryPolicy} successfully')
|
||||
cursor.close()
|
||||
else:
|
||||
tdLog.debug(res)
|
||||
tdLog.exit(f"alter queryPolicy to {queryPolicy} failed")
|
||||
tdLog.exit(
|
||||
f"alter queryPolicy to {queryPolicy} failed")
|
||||
|
||||
if ucase is not None and hasattr(ucase, 'noConn') and ucase.noConn == True:
|
||||
conn = None
|
||||
else:
|
||||
if restful:
|
||||
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc")
|
||||
conn = taosrest.connect(
|
||||
url=f"http://{host}:6041", timezone="utc")
|
||||
elif websocket:
|
||||
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041")
|
||||
else:
|
||||
conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath())
|
||||
conn = taos.connect(
|
||||
host=f"{host}", config=tdDnodes.getSimCfgPath())
|
||||
|
||||
if testCluster:
|
||||
tdLog.info("Procedures for testing cluster")
|
||||
|
@ -492,11 +512,13 @@ if __name__ == "__main__":
|
|||
else:
|
||||
tdLog.info("Procedures for testing self-deployment")
|
||||
if restful:
|
||||
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc")
|
||||
conn = taosrest.connect(
|
||||
url=f"http://{host}:6041", timezone="utc")
|
||||
elif websocket:
|
||||
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041")
|
||||
else:
|
||||
conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath())
|
||||
conn = taos.connect(
|
||||
host=f"{host}", config=tdDnodes.getSimCfgPath())
|
||||
|
||||
if fileName == "all":
|
||||
tdCases.runAllWindows(conn)
|
||||
|
@ -513,14 +535,19 @@ if __name__ == "__main__":
|
|||
tdDnodes.start(1)
|
||||
time.sleep(1)
|
||||
if restful:
|
||||
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc")
|
||||
conn = taosrest.connect(
|
||||
url=f"http://{host}:6041", timezone="utc")
|
||||
elif websocket:
|
||||
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041")
|
||||
conn = taosws.connect(
|
||||
f"taosws://root:taosdata@{host}:6041")
|
||||
else:
|
||||
conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath())
|
||||
tdLog.info("Procedures for tdengine deployed in %s" % (host))
|
||||
conn = taos.connect(
|
||||
host=f"{host}", config=tdDnodes.getSimCfgPath())
|
||||
tdLog.info(
|
||||
"Procedures for tdengine deployed in %s" % (host))
|
||||
tdLog.info("query test after taosd restart")
|
||||
tdCases.runOneWindows(conn, sp[0] + "_" + "restart.py", replicaVar)
|
||||
tdCases.runOneWindows(
|
||||
conn, sp[0] + "_" + "restart.py", replicaVar)
|
||||
else:
|
||||
tdLog.info("not need to query")
|
||||
else:
|
||||
|
@ -542,7 +569,7 @@ if __name__ == "__main__":
|
|||
try:
|
||||
class_names = get_local_classes_in_order(fileName)
|
||||
case_class = getattr(uModule, class_names[-1])
|
||||
ucase = case_class()
|
||||
ucase = case_class()
|
||||
if (json.dumps(updateCfgDict) == '{}'):
|
||||
updateCfgDict = ucase.updatecfgDict
|
||||
if (json.dumps(adapter_cfg_dict) == '{}'):
|
||||
|
@ -554,9 +581,9 @@ if __name__ == "__main__":
|
|||
tAdapter.init(deployPath, masterIp)
|
||||
tAdapter.stop(force_kill=True)
|
||||
|
||||
if dnodeNums == 1 :
|
||||
if dnodeNums == 1:
|
||||
# dnode is one
|
||||
tdDnodes.deploy(1,updateCfgDict)
|
||||
tdDnodes.deploy(1, updateCfgDict)
|
||||
tdDnodes.start(1)
|
||||
tdCases.logSql(logSql)
|
||||
|
||||
|
@ -565,13 +592,16 @@ if __name__ == "__main__":
|
|||
tAdapter.start()
|
||||
|
||||
if queryPolicy != 1:
|
||||
queryPolicy=int(queryPolicy)
|
||||
queryPolicy = int(queryPolicy)
|
||||
if restful:
|
||||
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc")
|
||||
conn = taosrest.connect(
|
||||
url=f"http://{host}:6041", timezone="utc")
|
||||
elif websocket:
|
||||
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041")
|
||||
conn = taosws.connect(
|
||||
f"taosws://root:taosdata@{host}:6041")
|
||||
else:
|
||||
conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath())
|
||||
conn = taos.connect(
|
||||
host=f"{host}", config=tdDnodes.getSimCfgPath())
|
||||
# tdSql.init(conn.cursor())
|
||||
# tdSql.execute("create qnode on dnode 1")
|
||||
# tdSql.execute('alter local "queryPolicy" "%d"'%queryPolicy)
|
||||
|
@ -590,19 +620,23 @@ if __name__ == "__main__":
|
|||
cursor.execute("show local variables")
|
||||
res = cursor.fetchall()
|
||||
for i in range(cursor.rowcount):
|
||||
if res[i][0] == "queryPolicy" :
|
||||
if res[i][0] == "queryPolicy":
|
||||
if int(res[i][1]) == int(queryPolicy):
|
||||
tdLog.info(f'alter queryPolicy to {queryPolicy} successfully')
|
||||
tdLog.info(
|
||||
f'alter queryPolicy to {queryPolicy} successfully')
|
||||
cursor.close()
|
||||
else:
|
||||
tdLog.debug(res)
|
||||
tdLog.exit(f"alter queryPolicy to {queryPolicy} failed")
|
||||
tdLog.exit(
|
||||
f"alter queryPolicy to {queryPolicy} failed")
|
||||
|
||||
else :
|
||||
else:
|
||||
# dnode > 1 cluster
|
||||
tdLog.debug("create an cluster with %s nodes and make %s dnode as independent mnode"%(dnodeNums,mnodeNums))
|
||||
print(independentMnode,"independentMnode valuse")
|
||||
dnodeslist = cluster.configure_cluster(dnodeNums=dnodeNums, mnodeNums=mnodeNums, independentMnode=independentMnode)
|
||||
tdLog.debug("create an cluster with %s nodes and make %s dnode as independent mnode" % (
|
||||
dnodeNums, mnodeNums))
|
||||
print(independentMnode, "independentMnode valuse")
|
||||
dnodeslist = cluster.configure_cluster(
|
||||
dnodeNums=dnodeNums, mnodeNums=mnodeNums, independentMnode=independentMnode)
|
||||
tdDnodes = ClusterDnodes(dnodeslist)
|
||||
tdDnodes.init(deployPath, masterIp)
|
||||
tdDnodes.setTestCluster(testCluster)
|
||||
|
@ -610,7 +644,7 @@ if __name__ == "__main__":
|
|||
tdDnodes.setAsan(asan)
|
||||
tdDnodes.stopAll()
|
||||
for dnode in tdDnodes.dnodes:
|
||||
tdDnodes.deploy(dnode.index,updateCfgDict)
|
||||
tdDnodes.deploy(dnode.index, updateCfgDict)
|
||||
for dnode in tdDnodes.dnodes:
|
||||
tdDnodes.starttaosd(dnode.index)
|
||||
tdCases.logSql(logSql)
|
||||
|
@ -621,34 +655,39 @@ if __name__ == "__main__":
|
|||
|
||||
# create taos connect
|
||||
if restful:
|
||||
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc")
|
||||
conn = taosrest.connect(
|
||||
url=f"http://{host}:6041", timezone="utc")
|
||||
elif websocket:
|
||||
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041")
|
||||
else:
|
||||
conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath())
|
||||
print(tdDnodes.getSimCfgPath(),host)
|
||||
conn = taos.connect(
|
||||
host=f"{host}", config=tdDnodes.getSimCfgPath())
|
||||
print(tdDnodes.getSimCfgPath(), host)
|
||||
if createDnodeNums == 1:
|
||||
createDnodeNums=dnodeNums
|
||||
createDnodeNums = dnodeNums
|
||||
else:
|
||||
createDnodeNums=createDnodeNums
|
||||
cluster.create_dnode(conn,createDnodeNums)
|
||||
cluster.create_mnode(conn,mnodeNums)
|
||||
createDnodeNums = createDnodeNums
|
||||
cluster.create_dnode(conn, createDnodeNums)
|
||||
cluster.create_mnode(conn, mnodeNums)
|
||||
|
||||
try:
|
||||
if cluster.check_dnode(conn) :
|
||||
if cluster.check_dnode(conn):
|
||||
print("check dnode ready")
|
||||
except Exception as r:
|
||||
print(r)
|
||||
|
||||
# do queryPolicy option
|
||||
if queryPolicy != 1:
|
||||
queryPolicy=int(queryPolicy)
|
||||
queryPolicy = int(queryPolicy)
|
||||
if restful:
|
||||
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc")
|
||||
conn = taosrest.connect(
|
||||
url=f"http://{host}:6041", timezone="utc")
|
||||
elif websocket:
|
||||
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041")
|
||||
conn = taosws.connect(
|
||||
f"taosws://root:taosdata@{host}:6041")
|
||||
else:
|
||||
conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath())
|
||||
conn = taos.connect(
|
||||
host=f"{host}", config=tdDnodes.getSimCfgPath())
|
||||
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("create qnode on dnode 1")
|
||||
|
@ -656,14 +695,15 @@ if __name__ == "__main__":
|
|||
cursor.execute("show local variables")
|
||||
res = cursor.fetchall()
|
||||
for i in range(cursor.rowcount):
|
||||
if res[i][0] == "queryPolicy" :
|
||||
if res[i][0] == "queryPolicy":
|
||||
if int(res[i][1]) == int(queryPolicy):
|
||||
tdLog.info(f'alter queryPolicy to {queryPolicy} successfully')
|
||||
tdLog.info(
|
||||
f'alter queryPolicy to {queryPolicy} successfully')
|
||||
cursor.close()
|
||||
else:
|
||||
tdLog.debug(res)
|
||||
tdLog.exit(f"alter queryPolicy to {queryPolicy} failed")
|
||||
|
||||
tdLog.exit(
|
||||
f"alter queryPolicy to {queryPolicy} failed")
|
||||
|
||||
# run case
|
||||
if testCluster:
|
||||
|
@ -675,11 +715,13 @@ if __name__ == "__main__":
|
|||
else:
|
||||
tdLog.info("Procedures for testing self-deployment")
|
||||
if restful:
|
||||
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc")
|
||||
conn = taosrest.connect(
|
||||
url=f"http://{host}:6041", timezone="utc")
|
||||
elif websocket:
|
||||
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041")
|
||||
else:
|
||||
conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath())
|
||||
conn = taos.connect(
|
||||
host=f"{host}", config=tdDnodes.getSimCfgPath())
|
||||
|
||||
if fileName == "all":
|
||||
tdCases.runAllLinux(conn)
|
||||
|
@ -697,14 +739,19 @@ if __name__ == "__main__":
|
|||
tdDnodes.start(1)
|
||||
time.sleep(1)
|
||||
if restful:
|
||||
conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc")
|
||||
conn = taosrest.connect(
|
||||
url=f"http://{host}:6041", timezone="utc")
|
||||
elif websocket:
|
||||
conn = taosws.connect(f"taosws://root:taosdata@{host}:6041")
|
||||
conn = taosws.connect(
|
||||
f"taosws://root:taosdata@{host}:6041")
|
||||
else:
|
||||
conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath())
|
||||
tdLog.info("Procedures for tdengine deployed in %s" % (host))
|
||||
conn = taos.connect(
|
||||
host=f"{host}", config=tdDnodes.getSimCfgPath())
|
||||
tdLog.info(
|
||||
"Procedures for tdengine deployed in %s" % (host))
|
||||
tdLog.info("query test after taosd restart")
|
||||
tdCases.runOneLinux(conn, sp[0] + "_" + "restart.py", replicaVar)
|
||||
tdCases.runOneLinux(
|
||||
conn, sp[0] + "_" + "restart.py", replicaVar)
|
||||
else:
|
||||
tdLog.info("not need to query")
|
||||
|
||||
|
|
Loading…
Reference in New Issue