feat(keep): support keep on super table level. (#30097)

* Feat: support use keep while create super table.

* Test(keep): add test for create super table with keep option.

* Feat(keep): Add tmsg for create keep.

* Feat(keep): support alter table option keep.

* Fix(keep): Add baisc test for alter table option.

* Fix(keep): memory leek.

* Feat(keep): add keep to metaEntry&metaCache and fix earliestTs with stn keep.

* Test(keep): add some cases for select with stb keep.

* Fix: fix ci core while alter stb.

* Feat(keep): delete expired data in super table level.

* Feat: remove get stb keep while query.

* Fix : build error.

* Revert "Fix : build error."

This reverts commit 0ed66e4e848e5528d57f5a7c9fa7600c4742a71c.

* Revert "Feat(keep): delete expired data in super table level."

This reverts commit 36330f6b4c587bc126d3f75480d81e637d7601e6.

* Fix : build errors.

* Feat : support restart taosd.

* Fix : alter table comment problems.

* Test : add tests for super table keep.

* Fix: change sdb stb reserve size.

* Test: add more tests.

* Feat: Disable normal tables and sub tables from setting the keep parameter

* Fix: add more checks to avoid unknown address.

* Docs: Add docs for stable keep.

* Fix: some review changes.

* Fix: review errors.
This commit is contained in:
Zhixiao Bao 2025-03-14 16:10:13 +08:00 committed by GitHub
parent b3e38dbc96
commit af7e34e189
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 1253 additions and 19 deletions

View File

@ -20,6 +20,7 @@ table_options:
table_option: {
COMMENT 'string_value'
| SMA(col_name [, col_name] ...)
| KEEP value
}
```
@ -34,6 +35,7 @@ table_option: {
- TAGS can have up to 128 columns, at least 1, with a total length not exceeding 16 KB.
4. For the use of `ENCODE` and `COMPRESS`, please refer to [Column Compression](../manage-data-compression/)
5. For explanations of parameters in table_option, please refer to [Table SQL Description](../manage-tables/)
6. Regarding the keep parameter in table_option, it only takes effect for super tables. For detailed explanation of the keep parameter, please refer to [Database Description](02-database.md). The only difference is that the super table's keep parameter does not immediately affect query results, but only takes effect after compaction.
## View Supertables
@ -144,6 +146,7 @@ alter_table_options:
alter_table_option: {
COMMENT 'string_value'
| KEEP value
}
```

View File

@ -21,6 +21,7 @@ table_options:
table_option: {
COMMENT 'string_value'
| SMA(col_name [, col_name] ...)
| KEEP value
}
```
@ -34,6 +35,7 @@ table_option: {
- TAGS 最多允许 128 个,至少 1 个,总长度不超过 16 KB。
4. 关于 `ENCODE``COMPRESS` 的使用,请参考 [按列压缩](../compress)
5. 关于 table_option 中的参数说明,请参考 [建表 SQL 说明](../table)
6. 关于 table_option 中的 keep 参数仅对超级表生效keep 参数的详细说明可以参考 [数据库说明](02-database.md),唯一不同的是超级表 keep 不会立即影响查询结果,仅在 compact 后生效。
## 查看超级表
@ -145,6 +147,7 @@ alter_table_options:
alter_table_option: {
COMMENT 'string_value'
| KEEP value
}
```

View File

@ -948,6 +948,7 @@ typedef struct {
int64_t deleteMark2;
int32_t sqlLen;
char* sql;
int64_t keep;
} SMCreateStbReq;
int32_t tSerializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq);
@ -986,6 +987,7 @@ typedef struct {
char* comment;
int32_t sqlLen;
char* sql;
int64_t keep;
} SMAlterStbReq;
int32_t tSerializeSMAlterStbReq(void* buf, int32_t bufLen, SMAlterStbReq* pReq);
@ -3240,6 +3242,7 @@ typedef struct SVCreateStbReq {
int8_t source;
int8_t colCmpred;
SColCmprWrapper colCmpr;
int64_t keep;
} SVCreateStbReq;
int tEncodeSVCreateStbReq(SEncoder* pCoder, const SVCreateStbReq* pReq);

View File

@ -58,6 +58,7 @@ typedef struct SMetaEntry {
SSchemaWrapper schemaRow;
SSchemaWrapper schemaTag;
SRSmaParam rsmaParam;
int64_t keep;
} stbEntry;
struct {
int64_t btime;
@ -154,6 +155,7 @@ typedef struct {
int64_t uid;
int64_t ctbNum;
int32_t colNum;
int64_t keep;
} SMetaStbStats;
// clang-format off

View File

@ -197,6 +197,8 @@ typedef struct STableOptions {
SNodeList* pRollupFuncs;
int32_t ttl;
SNodeList* pSma;
SValueNode* pKeepNode;
int32_t keep;
} STableOptions;
typedef struct SColumnOptions {

View File

@ -699,6 +699,7 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
}
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->deleteMark1));
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->deleteMark2));
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->keep));
ENCODESQL();
@ -809,6 +810,9 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->deleteMark1));
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->deleteMark2));
if (!tDecodeIsEnd(&decoder)) {
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->keep));
}
DECODESQL();
@ -916,6 +920,7 @@ int32_t tSerializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq)
if (pReq->commentLen > 0) {
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->comment));
}
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->keep));
ENCODESQL();
tEndEncode(&encoder);
@ -978,7 +983,9 @@ int32_t tDeserializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq
}
TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->comment));
}
if (!tDecodeIsEnd(&decoder)) {
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->keep));
}
DECODESQL();
tEndDecode(&decoder);
@ -10453,6 +10460,7 @@ int tEncodeSVCreateStbReq(SEncoder *pCoder, const SVCreateStbReq *pReq) {
TAOS_CHECK_EXIT(tEncodeI8(pCoder, pReq->colCmpred));
TAOS_CHECK_EXIT(tEncodeSColCmprWrapper(pCoder, &pReq->colCmpr));
TAOS_CHECK_EXIT(tEncodeI64(pCoder, pReq->keep));
tEndEncode(pCoder);
_exit:
@ -10487,6 +10495,9 @@ int tDecodeSVCreateStbReq(SDecoder *pCoder, SVCreateStbReq *pReq) {
if (!tDecodeIsEnd(pCoder)) {
TAOS_CHECK_EXIT(tDecodeSColCmprWrapperEx(pCoder, &pReq->colCmpr));
}
if (!tDecodeIsEnd(pCoder)) {
TAOS_CHECK_EXIT(tDecodeI64(pCoder, &pReq->keep));
}
}
tEndDecode(pCoder);

View File

@ -590,6 +590,7 @@ typedef struct {
SRWLatch lock;
int8_t source;
SColCmpr* pCmpr;
int64_t keep;
} SStbObj;
typedef struct {

View File

@ -34,7 +34,7 @@
#include "tname.h"
#define STB_VER_NUMBER 2
#define STB_RESERVE_SIZE 64
#define STB_RESERVE_SIZE 56
static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw);
static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb);
@ -190,6 +190,8 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
SDB_SET_INT32(pRaw, dataPos, p->alg, _OVER)
}
}
SDB_SET_INT64(pRaw, dataPos, pStb->keep, _OVER)
SDB_SET_RESERVE(pRaw, dataPos, STB_RESERVE_SIZE, _OVER)
SDB_SET_DATALEN(pRaw, dataPos, _OVER)
@ -316,6 +318,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, (int32_t *)&pCmpr->alg, _OVER) // compatiable
}
}
SDB_GET_INT64(pRaw, dataPos, &pStb->keep, _OVER)
SDB_GET_RESERVE(pRaw, dataPos, STB_RESERVE_SIZE, _OVER)
@ -431,6 +434,8 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
pOld->smaVer = pNew->smaVer;
pOld->nextColId = pNew->nextColId;
pOld->ttl = pNew->ttl;
pOld->keep = pNew->keep;
if (pNew->numOfColumns > 0) {
pOld->numOfColumns = pNew->numOfColumns;
memcpy(pOld->pColumns, pNew->pColumns, pOld->numOfColumns * sizeof(SSchema));
@ -527,6 +532,7 @@ void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int3
req.colCmpred = 1;
SColCmprWrapper *pCmpr = &req.colCmpr;
req.keep = pStb->keep;
pCmpr->version = pStb->colVer;
pCmpr->nCols = pStb->numOfColumns;
@ -878,6 +884,7 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
pDst->commentLen = pCreate->commentLen;
pDst->pFuncs = pCreate->pFuncs;
pDst->source = pCreate->source;
pDst->keep = pCreate->keep;
pCreate->pFuncs = NULL;
if (pDst->commentLen > 0) {
@ -1422,6 +1429,7 @@ static int32_t mndCheckAlterStbReq(SMAlterStbReq *pAlter) {
int32_t code = 0;
if (pAlter->commentLen >= 0) return 0;
if (pAlter->ttl != 0) return 0;
if (pAlter->keep != -1) return 0;
if (pAlter->numOfFields < 1 || pAlter->numOfFields != (int32_t)taosArrayGetSize(pAlter->pFields)) {
code = TSDB_CODE_MND_INVALID_STB_OPTION;
@ -1454,8 +1462,8 @@ int32_t mndAllocStbSchemas(const SStbObj *pOld, SStbObj *pNew) {
TAOS_RETURN(0);
}
static int32_t mndUpdateStbCommentAndTTL(const SStbObj *pOld, SStbObj *pNew, char *pComment, int32_t commentLen,
int32_t ttl) {
static int32_t mndUpdateTableOptions(const SStbObj *pOld, SStbObj *pNew, char *pComment, int32_t commentLen,
int32_t ttl, int64_t keep) {
int32_t code = 0;
if (commentLen > 0) {
pNew->commentLen = commentLen;
@ -1474,6 +1482,10 @@ static int32_t mndUpdateStbCommentAndTTL(const SStbObj *pOld, SStbObj *pNew, cha
pNew->ttl = ttl;
}
if (keep > 0) {
pNew->keep = keep;
}
if ((code = mndAllocStbSchemas(pOld, pNew)) != 0) {
TAOS_RETURN(code);
}
@ -2625,7 +2637,7 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p
break;
case TSDB_ALTER_TABLE_UPDATE_OPTIONS:
needRsp = false;
code = mndUpdateStbCommentAndTTL(pOld, &stbObj, pAlter->comment, pAlter->commentLen, pAlter->ttl);
code = mndUpdateTableOptions(pOld, &stbObj, pAlter->comment, pAlter->commentLen, pAlter->ttl, pAlter->keep);
break;
case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS:
code = mndUpdateSuperTableColumnCompress(pMnode, pOld, &stbObj, pAlter->pFields, pAlter->numOfFields);

View File

@ -94,6 +94,7 @@ void *vnodeGetIvtIdx(void *pVnode);
int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num);
int32_t vnodeGetStbColumnNum(SVnode *pVnode, tb_uid_t suid, int *num);
int32_t vnodeGetStbKeep(SVnode *pVnode, tb_uid_t suid, int64_t *keep);
int32_t vnodeGetTimeSeriesNum(SVnode *pVnode, int64_t *num);
int32_t vnodeGetAllCtbNum(SVnode *pVnode, int64_t *num);
@ -350,6 +351,9 @@ int32_t tsdbFileSetReaderNext(struct SFileSetReader *pReader);
int32_t tsdbFileSetGetEntryField(struct SFileSetReader *pReader, const char *field, void *value);
void tsdbFileSetReaderClose(struct SFileSetReader **ppReader);
int32_t metaFetchEntryByUid(SMeta *pMeta, int64_t uid, SMetaEntry **ppEntry);
void metaFetchEntryFree(SMetaEntry **ppEntry);
#ifdef __cplusplus
}
#endif

View File

@ -71,7 +71,8 @@ int32_t metaCacheDrop(SMeta* pMeta, int64_t uid);
int32_t metaStatsCacheUpsert(SMeta* pMeta, SMetaStbStats* pInfo);
int32_t metaStatsCacheDrop(SMeta* pMeta, int64_t uid);
int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo);
void metaUpdateStbStats(SMeta* pMeta, int64_t uid, int64_t deltaCtb, int32_t deltaCol);
int64_t metaGetStbKeep(SMeta* pMeta, int64_t uid);
void metaUpdateStbStats(SMeta* pMeta, int64_t uid, int64_t deltaCtb, int32_t deltaCol, int64_t deltaKeep);
int32_t metaUidFilterCacheGet(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, LRUHandle** pHandle);
struct SMeta {

View File

@ -403,6 +403,7 @@ int32_t metaStatsCacheUpsert(SMeta* pMeta, SMetaStbStats* pInfo) {
if (*ppEntry) { // update
(*ppEntry)->info.ctbNum = pInfo->ctbNum;
(*ppEntry)->info.colNum = pInfo->colNum;
(*ppEntry)->info.keep = pInfo->keep;
} else { // insert
if (pCache->sStbStatsCache.nEntry >= pCache->sStbStatsCache.nBucket) {
TAOS_UNUSED(metaRehashStatsCache(pCache, 1));
@ -902,3 +903,23 @@ int32_t metaInitTbFilterCache(SMeta* pMeta) {
#endif
return 0;
}
int64_t metaGetStbKeep(SMeta* pMeta, int64_t uid) {
SMetaStbStats stats = {0};
if (metaStatsCacheGet(pMeta, uid, &stats) == TSDB_CODE_SUCCESS) {
return stats.keep;
}
SMetaEntry* pEntry = NULL;
if (metaFetchEntryByUid(pMeta, uid, &pEntry) == TSDB_CODE_SUCCESS) {
int64_t keep = -1;
if (pEntry->type == TSDB_SUPER_TABLE) {
keep = pEntry->stbEntry.keep;
}
metaFetchEntryFree(&pEntry);
return keep;
}
return -1;
}

View File

@ -130,6 +130,9 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
}
TAOS_CHECK_RETURN(meteEncodeColCmprEntry(pCoder, pME));
}
if (pME->type == TSDB_SUPER_TABLE) {
TAOS_CHECK_RETURN(tEncodeI64(pCoder, pME->stbEntry.keep));
}
tEndEncode(pCoder);
return 0;
@ -209,6 +212,11 @@ int metaDecodeEntryImpl(SDecoder *pCoder, SMetaEntry *pME, bool headerOnly) {
TABLE_SET_COL_COMPRESSED(pME->flags);
}
}
if (pME->type == TSDB_SUPER_TABLE) {
if (!tDecodeIsEnd(pCoder)) {
TAOS_CHECK_RETURN(tDecodeI64(pCoder, &pME->stbEntry.keep));
}
}
tEndDecode(pCoder);
return 0;
@ -310,6 +318,7 @@ int32_t metaCloneEntry(const SMetaEntry *pEntry, SMetaEntry **ppEntry) {
metaCloneEntryFree(ppEntry);
return code;
}
(*ppEntry)->stbEntry.keep = pEntry->stbEntry.keep;
} else if (pEntry->type == TSDB_CHILD_TABLE) {
(*ppEntry)->ctbEntry.btime = pEntry->ctbEntry.btime;
(*ppEntry)->ctbEntry.ttlDays = pEntry->ctbEntry.ttlDays;

View File

@ -1191,7 +1191,7 @@ static int32_t metaHandleChildTableCreateImpl(SMeta *pMeta, const SMetaEntry *pE
}
if (TSDB_CODE_SUCCESS == code) {
metaUpdateStbStats(pMeta, pSuperEntry->uid, 1, 0);
metaUpdateStbStats(pMeta, pSuperEntry->uid, 1, 0, -1);
int32_t ret = metaUidCacheClear(pMeta, pSuperEntry->uid);
if (ret < 0) {
metaErr(TD_VID(pMeta->pVnode), ret);
@ -1357,7 +1357,7 @@ static int32_t metaHandleChildTableDropImpl(SMeta *pMeta, const SMetaHandleParam
}
--pMeta->pVnode->config.vndStats.numOfCTables;
metaUpdateStbStats(pMeta, pParam->pSuperEntry->uid, -1, 0);
metaUpdateStbStats(pMeta, pParam->pSuperEntry->uid, -1, 0, -1);
int32_t ret = metaUidCacheClear(pMeta, pSuper->uid);
if (ret < 0) {
metaErr(TD_VID(pMeta->pVnode), ret);
@ -1613,7 +1613,8 @@ static int32_t metaHandleSuperTableUpdateImpl(SMeta *pMeta, SMetaHandleParam *pP
}
if (TSDB_CODE_SUCCESS == code) {
metaUpdateStbStats(pMeta, pEntry->uid, 0, pEntry->stbEntry.schemaRow.nCols - pOldEntry->stbEntry.schemaRow.nCols);
metaUpdateStbStats(pMeta, pEntry->uid, 0, pEntry->stbEntry.schemaRow.nCols - pOldEntry->stbEntry.schemaRow.nCols,
pEntry->stbEntry.keep);
}
return code;

View File

@ -1681,10 +1681,14 @@ int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables, int32_t
// slow path: search TDB
int64_t ctbNum = 0;
int32_t colNum = 0;
int64_t keep = 0;
code = vnodeGetCtbNum(pVnode, uid, &ctbNum);
if (TSDB_CODE_SUCCESS == code) {
code = vnodeGetStbColumnNum(pVnode, uid, &colNum);
}
if (TSDB_CODE_SUCCESS == code) {
code = vnodeGetStbKeep(pVnode, uid, &keep);
}
metaULock(pVnodeObj->pMeta);
if (TSDB_CODE_SUCCESS != code) {
goto _exit;
@ -1696,13 +1700,14 @@ int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables, int32_t
state.uid = uid;
state.ctbNum = ctbNum;
state.colNum = colNum;
state.keep = keep;
// upsert the cache
metaWLock(pVnodeObj->pMeta);
int32_t ret = metaStatsCacheUpsert(pVnodeObj->pMeta, &state);
if (ret) {
metaError("failed to upsert stats, uid:%" PRId64 ", ctbNum:%" PRId64 ", colNum:%d", uid, ctbNum, colNum);
metaError("failed to upsert stats, uid:%" PRId64 ", ctbNum:%" PRId64 ", colNum:%d, keep:%" PRId64, uid, ctbNum,
colNum, keep);
}
metaULock(pVnodeObj->pMeta);
@ -1711,16 +1716,20 @@ _exit:
return code;
}
void metaUpdateStbStats(SMeta *pMeta, int64_t uid, int64_t deltaCtb, int32_t deltaCol) {
void metaUpdateStbStats(SMeta *pMeta, int64_t uid, int64_t deltaCtb, int32_t deltaCol, int64_t deltaKeep) {
SMetaStbStats stats = {0};
if (metaStatsCacheGet(pMeta, uid, &stats) == TSDB_CODE_SUCCESS) {
stats.ctbNum += deltaCtb;
stats.colNum += deltaCol;
if (deltaKeep > 0) {
stats.keep = deltaKeep;
}
int32_t code = metaStatsCacheUpsert(pMeta, &stats);
if (code) {
metaError("vgId:%d, failed to update stats, uid:%" PRId64 ", ctbNum:%" PRId64 ", colNum:%d",
TD_VID(pMeta->pVnode), uid, deltaCtb, deltaCol);
metaError("vgId:%d, failed to update stats, uid:%" PRId64 ", ctbNum:%" PRId64 ", colNum:%d, keep:%" PRId64,
TD_VID(pMeta->pVnode), uid, deltaCtb, deltaCol, deltaKeep > 0 ? deltaKeep : stats.keep);
}
}
}

View File

@ -589,7 +589,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *p
}
--pMeta->pVnode->config.vndStats.numOfCTables;
metaUpdateStbStats(pMeta, e.ctbEntry.suid, -1, 0);
metaUpdateStbStats(pMeta, e.ctbEntry.suid, -1, 0, -1);
ret = metaUidCacheClear(pMeta, e.ctbEntry.suid);
if (ret < 0) {
metaError("vgId:%d, failed to clear uid cache:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), e.name,

View File

@ -180,6 +180,7 @@ int32_t metaCreateSuperTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq
.name = pReq->name,
.stbEntry.schemaRow = pReq->schemaRow,
.stbEntry.schemaTag = pReq->schemaTag,
.stbEntry.keep = pReq->keep,
};
if (pReq->rollup) {
TABLE_SET_ROLLUP(entry.flags);
@ -1760,6 +1761,7 @@ int32_t metaAlterSuperTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq)
.name = pReq->name,
.stbEntry.schemaRow = pReq->schemaRow,
.stbEntry.schemaTag = pReq->schemaTag,
.stbEntry.keep = pReq->keep,
.colCmpr = pReq->colCmpr,
};
TABLE_SET_COL_COMPRESSED(entry.flags);

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "meta.h"
#include "tsdb.h"
#include "tsdbFS2.h"

View File

@ -764,6 +764,21 @@ int32_t vnodeGetStbColumnNum(SVnode *pVnode, tb_uid_t suid, int *num) {
return TSDB_CODE_SUCCESS;
}
int32_t vnodeGetStbKeep(SVnode *pVnode, tb_uid_t suid, int64_t *keep) {
SMetaReader mr = {0};
metaReaderDoInit(&mr, pVnode->pMeta, META_READER_NOLOCK);
int32_t code = metaReaderGetTableEntryByUid(&mr, suid);
if (code == TSDB_CODE_SUCCESS) {
*keep = mr.me.stbEntry.keep;
} else {
*keep = 0; // Default value if not found
}
metaReaderClear(&mr);
return TSDB_CODE_SUCCESS;
}
#ifdef TD_ENTERPRISE
const char *tkLogStb[] = {"cluster_info",
"data_dir",

View File

@ -1202,6 +1202,7 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyList(pOptions->pRollupFuncs);
nodesDestroyList(pOptions->pSma);
nodesDestroyList(pOptions->pDeleteMark);
nodesDestroyNode((SNode*)pOptions->pKeepNode);
break;
}
case QUERY_NODE_COLUMN_OPTIONS: {

View File

@ -82,7 +82,8 @@ typedef enum ETableOptionType {
TABLE_OPTION_ROLLUP,
TABLE_OPTION_TTL,
TABLE_OPTION_SMA,
TABLE_OPTION_DELETE_MARK
TABLE_OPTION_DELETE_MARK,
TABLE_OPTION_KEEP
} ETableOptionType;
typedef enum EColumnOptionType {

View File

@ -532,6 +532,8 @@ table_options(A) ::= table_options(B) ROLLUP NK_LP rollup_func_list(C) NK_RP.
table_options(A) ::= table_options(B) TTL NK_INTEGER(C). { A = setTableOption(pCxt, B, TABLE_OPTION_TTL, &C); }
table_options(A) ::= table_options(B) SMA NK_LP col_name_list(C) NK_RP. { A = setTableOption(pCxt, B, TABLE_OPTION_SMA, C); }
table_options(A) ::= table_options(B) DELETE_MARK duration_list(C). { A = setTableOption(pCxt, B, TABLE_OPTION_DELETE_MARK, C); }
table_options(A) ::= table_options(B) KEEP NK_INTEGER(C). { A = setTableOption(pCxt, B, TABLE_OPTION_KEEP, &C); }
table_options(A) ::= table_options(B) KEEP NK_VARIABLE(C). { A = setTableOption(pCxt, B, TABLE_OPTION_KEEP, &C); }
alter_table_options(A) ::= alter_table_option(B). { A = createAlterTableOptions(pCxt); A = setTableOption(pCxt, A, B.type, &B.val); }
alter_table_options(A) ::= alter_table_options(B) alter_table_option(C). { A = setTableOption(pCxt, B, C.type, &C.val); }
@ -540,6 +542,9 @@ alter_table_options(A) ::= alter_table_options(B) alter_table_option(C).
%destructor alter_table_option { }
alter_table_option(A) ::= COMMENT NK_STRING(B). { A.type = TABLE_OPTION_COMMENT; A.val = B; }
alter_table_option(A) ::= TTL NK_INTEGER(B). { A.type = TABLE_OPTION_TTL; A.val = B; }
alter_table_option(A) ::= KEEP NK_INTEGER(B). { A.type = TABLE_OPTION_KEEP; A.val = B; }
alter_table_option(A) ::= KEEP NK_VARIABLE(B). { A.type = TABLE_OPTION_KEEP; A.val = B; }
%type duration_list { SNodeList* }
%destructor duration_list { nodesDestroyList($$); }

View File

@ -1,4 +1,3 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
@ -625,8 +624,7 @@ bool addHintNodeToList(SAstCreateContext* pCxt, SNodeList** ppHintList, EHintOpt
break;
}
case HINT_SORT_FOR_GROUP:
if (paramNum > 0) return true;
if (hasHint(*ppHintList, HINT_PARTITION_FIRST)) return true;
if (paramNum > 0 || hasHint(*ppHintList, HINT_PARTITION_FIRST)) return true;
break;
case HINT_PARTITION_FIRST:
if (paramNum > 0 || hasHint(*ppHintList, HINT_SORT_FOR_GROUP)) return true;
@ -2321,6 +2319,7 @@ SNode* createDefaultTableOptions(SAstCreateContext* pCxt) {
pOptions->watermark1 = TSDB_DEFAULT_ROLLUP_WATERMARK;
pOptions->watermark2 = TSDB_DEFAULT_ROLLUP_WATERMARK;
pOptions->ttl = TSDB_DEFAULT_TABLE_TTL;
pOptions->keep = -1;
pOptions->commentNull = true; // mark null
return (SNode*)pOptions;
_err:
@ -2334,6 +2333,7 @@ SNode* createAlterTableOptions(SAstCreateContext* pCxt) {
CHECK_MAKE_NODE(pOptions);
pOptions->ttl = -1;
pOptions->commentNull = true; // mark null
pOptions->keep = -1;
return (SNode*)pOptions;
_err:
return NULL;
@ -2373,6 +2373,13 @@ SNode* setTableOption(SAstCreateContext* pCxt, SNode* pOptions, ETableOptionType
case TABLE_OPTION_DELETE_MARK:
((STableOptions*)pOptions)->pDeleteMark = pVal;
break;
case TABLE_OPTION_KEEP:
if (TK_NK_INTEGER == ((SToken*)pVal)->type) {
((STableOptions*)pOptions)->keep = taosStr2Int32(((SToken*)pVal)->z, NULL, 10) * 1440;
} else {
((STableOptions*)pOptions)->pKeepNode = (SValueNode*)createDurationValueNode(pCxt, (SToken*)pVal);
}
break;
default:
break;
}

View File

@ -9405,6 +9405,36 @@ static int32_t checkColumnOptions(SNodeList* pList) {
}
return TSDB_CODE_SUCCESS;
}
static int32_t checkTableKeepOption(STranslateContext* pCxt, STableOptions* pOptions, bool createStable) {
if (pOptions == NULL || (pOptions->keep == -1 && pOptions->pKeepNode == NULL)) {
return TSDB_CODE_SUCCESS;
}
if (!createStable) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TABLE_OPTION,
"KEEP parameter is not allowed when creating normal table");
}
if (pOptions && pOptions->pKeepNode) {
if (DEAL_RES_ERROR == translateValue(pCxt, pOptions->pKeepNode)) {
return pCxt->errCode;
}
if (pOptions->pKeepNode->unit != TIME_UNIT_DAY && pOptions->pKeepNode->unit != TIME_UNIT_HOUR &&
pOptions->pKeepNode->unit != TIME_UNIT_MINUTE) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
"Invalid option keep unit: %c, only %c, %c, %c allowed", pOptions->pKeepNode->unit,
TIME_UNIT_DAY, TIME_UNIT_HOUR, TIME_UNIT_MINUTE);
}
pOptions->keep = pOptions->pKeepNode->datum.i / 60 / 1000;
}
if (pOptions->keep < TSDB_MIN_KEEP || pOptions->keep > TSDB_MAX_KEEP) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_TSC_VALUE_OUT_OF_RANGE,
"Invalid option keep value: %lld, should be in range [%d, %d]", pOptions->keep,
TSDB_MIN_KEEP, TSDB_MAX_KEEP);
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateS3MigrateDatabase(STranslateContext* pCxt, SS3MigrateDatabaseStmt* pStmt) {
SS3MigrateDbReq req = {0};
SName name = {0};
@ -9853,6 +9883,9 @@ static int32_t checkCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt
if (TSDB_CODE_SUCCESS == code) {
code = checkColumnOptions(pStmt->pCols);
}
if (TSDB_CODE_SUCCESS == code) {
code = checkTableKeepOption(pCxt, pStmt->pOptions, createStable);
}
if (TSDB_CODE_SUCCESS == code) {
if (createStable && pStmt->pOptions->ttl != 0) {
code = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TABLE_OPTION,
@ -10251,6 +10284,7 @@ static int32_t buildCreateStbReq(STranslateContext* pCxt, SCreateTableStmt* pStm
pReq->watermark2 = pStmt->pOptions->watermark2;
pReq->deleteMark1 = pStmt->pOptions->deleteMark1;
pReq->deleteMark2 = pStmt->pOptions->deleteMark2;
pReq->keep = pStmt->pOptions->keep;
pReq->colVer = 1;
pReq->tagVer = 1;
pReq->source = TD_REQ_FROM_APP;
@ -10347,6 +10381,10 @@ static int32_t buildAlterSuperTableReq(STranslateContext* pCxt, SAlterTableStmt*
pAlterReq->commentLen = -1;
}
if (pStmt->pOptions->keep > 0) {
pAlterReq->keep = pStmt->pOptions->keep;
}
return TSDB_CODE_SUCCESS;
}
@ -10615,6 +10653,9 @@ static int32_t checkAlterSuperTable(STranslateContext* pCxt, SAlterTableStmt* pS
if (TSDB_CODE_SUCCESS == code) {
code = checkAlterSuperTableBySchema(pCxt, pStmt, pTableMeta);
}
if (TSDB_CODE_SUCCESS == code) {
code = checkTableKeepOption(pCxt, pStmt->pOptions, true);
}
taosMemoryFree(pTableMeta);
return code;
}
@ -15604,6 +15645,10 @@ static int32_t checkCreateSubTable(STranslateContext* pCxt, SCreateSubTableClaus
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME,
"The table name cannot contain '.'");
}
if (pStmt->pOptions && (pStmt->pOptions->keep >= 0 || pStmt->pOptions->pKeepNode != NULL)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TABLE_OPTION,
"child table cannot set keep duration");
}
return TSDB_CODE_SUCCESS;
}
static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableClause* pStmt, SHashObj* pVgroupHashmap) {
@ -17047,6 +17092,10 @@ static int32_t rewriteAlterTableImpl(STranslateContext* pCxt, SAlterTableStmt* p
} else if (TSDB_CHILD_TABLE != pTableMeta->tableType && TSDB_NORMAL_TABLE != pTableMeta->tableType) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE);
}
if (pStmt->pOptions && (pStmt->pOptions->keep >= 0 || pStmt->pOptions->pKeepNode != NULL)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TABLE_OPTION,
"only super table can alter keep duration");
}
const SSchema* pSchema = getNormalColSchema(pTableMeta, pStmt->colName);
if (hasPkInTable(pTableMeta) && pSchema && (pSchema->flags & COL_IS_KEY) &&

View File

@ -0,0 +1,132 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import time
import random
import taos
import frame
import frame.etool
from frame.log import *
from frame.cases import *
from frame.sql import *
from frame.caseBase import *
from frame import *
class TDTestCase(TBase):
def prepare_database(self):
tdLog.info(f"prepare database")
tdSql.execute("DROP DATABASE IF EXISTS test")
tdSql.execute("CREATE DATABASE IF NOT EXISTS test")
tdSql.execute("USE test")
def check_create_stb_with_keep(self):
tdLog.info(f"check create stb with keep")
tdSql.execute("USE test")
tdSql.execute(f"CREATE STABLE IF NOT EXISTS stb_0 (ts TIMESTAMP, a INT, b FLOAT, c BINARY(10)) TAGS (e_id INT) KEEP 1d")
tdSql.execute(f"CREATE STABLE IF NOT EXISTS stb_1 (ts TIMESTAMP, a INT, b FLOAT, c BINARY(10)) TAGS (e_id INT) KEEP 1440m")
tdSql.execute(f"CREATE STABLE IF NOT EXISTS stb_2 (ts TIMESTAMP, a INT, b FLOAT, c BINARY(10)) TAGS (e_id INT) KEEP 24h")
tdSql.execute(f"CREATE STABLE IF NOT EXISTS stb_3 (ts TIMESTAMP, a INT, b FLOAT, c BINARY(10)) TAGS (e_id INT) KEEP 7d")
tdSql.execute(f"CREATE STABLE IF NOT EXISTS stb_4 (ts TIMESTAMP, a INT, b FLOAT, c BINARY(10)) TAGS (e_id INT) KEEP 30d")
tdSql.execute(f"CREATE STABLE IF NOT EXISTS stb_5 (ts TIMESTAMP, a INT, b FLOAT, c BINARY(10)) TAGS (e_id INT) KEEP 365000d")
tdSql.execute(f"CREATE STABLE IF NOT EXISTS stb_6 (ts TIMESTAMP, a INT, b FLOAT, c BINARY(10)) TAGS (e_id INT) KEEP 365")
def check_create_stb_with_err_keep_duration(self):
tdLog.info(f"check create stb with err keep duration")
tdSql.execute("USE test")
tdSql.error(f"CREATE STABLE IF NOT EXISTS stb_7 (ts TIMESTAMP, a INT, b FLOAT, c BINARY(10)) TAGS (e_id INT) KEEP 0d",expectErrInfo="Invalid option keep value")
tdSql.error(f"CREATE STABLE IF NOT EXISTS stb_8 (ts TIMESTAMP, a INT, b FLOAT, c BINARY(10)) TAGS (e_id INT) KEEP -1d",expectErrInfo="syntax error")
tdSql.error(f"CREATE STABLE IF NOT EXISTS stb_9 (ts TIMESTAMP, a INT, b FLOAT, c BINARY(10)) TAGS (e_id INT) KEEP -1",expectErrInfo="syntax error")
tdSql.error(f"CREATE STABLE IF NOT EXISTS stb_10 (ts TIMESTAMP, a INT, b FLOAT, c BINARY(10)) TAGS (e_id INT) KEEP 1m",expectErrInfo="Invalid option keep value")
tdSql.error(f"CREATE STABLE IF NOT EXISTS stb_11 (ts TIMESTAMP, a INT, b FLOAT, c BINARY(10)) TAGS (e_id INT) KEEP 1h",expectErrInfo="Invalid option keep value")
tdSql.error(f"CREATE STABLE IF NOT EXISTS stb_12 (ts TIMESTAMP, a INT, b FLOAT, c BINARY(10)) TAGS (e_id INT) KEEP 365001d",expectErrInfo="Invalid option keep value")
tdSql.error(f"CREATE STABLE IF NOT EXISTS stb_13 (ts TIMESTAMP, a INT, b FLOAT, c BINARY(10)) TAGS (e_id INT) KEEP 365001",expectErrInfo="Invalid option keep value")
tdSql.error(f"CREATE STABLE IF NOT EXISTS stb_14 (ts TIMESTAMP, a INT, b FLOAT, c BINARY(10)) TAGS (e_id INT) KEEP 1f",expectErrInfo="syntax error")
tdSql.error(f"CREATE STABLE IF NOT EXISTS stb_15 (ts TIMESTAMP, a INT, b FLOAT, c BINARY(10)) TAGS (e_id INT) KEEP 1d1",expectErrInfo="syntax error")
tdSql.error(f"CREATE STABLE IF NOT EXISTS stb_16 (ts TIMESTAMP, a INT, b FLOAT, c BINARY(10)) TAGS (e_id INT) KEEP 21474836479999",expectErrInfo="Invalid option keep value")
def check_alter_stb_with_keep(self):
tdLog.info(f"check alter stb with keep")
tdSql.execute("USE test")
tdSql.execute(f"ALTER STABLE stb_0 KEEP 1440m")
tdSql.execute(f"ALTER STABLE stb_0 KEEP 24h")
tdSql.execute(f"ALTER STABLE stb_0 KEEP 7d")
tdSql.execute(f"ALTER STABLE stb_0 KEEP 30d")
tdSql.execute(f"ALTER STABLE stb_0 KEEP 365000d")
tdSql.execute(f"ALTER STABLE stb_0 KEEP 365")
def check_alter_stb_with_keep_err(self):
tdLog.info(f"check alter stb with keep err")
tdSql.execute("USE test")
tdSql.error(f"ALTER STABLE stb_0 KEEP 0d",expectErrInfo="Invalid option keep value")
tdSql.error(f"ALTER STABLE stb_0 KEEP -1d",expectErrInfo="syntax error")
tdSql.error(f"ALTER STABLE stb_0 KEEP -1",expectErrInfo="syntax error")
tdSql.error(f"ALTER STABLE stb_0 KEEP 1m",expectErrInfo="Invalid option keep value")
tdSql.error(f"ALTER STABLE stb_0 KEEP 1h",expectErrInfo="Invalid option keep value")
tdSql.error(f"ALTER STABLE stb_0 KEEP 365001d",expectErrInfo="Invalid option keep value")
tdSql.error(f"ALTER STABLE stb_0 KEEP 365001",expectErrInfo="Invalid option keep value")
tdSql.error(f"ALTER STABLE stb_0 KEEP 1f",expectErrInfo="syntax error")
tdSql.error(f"ALTER STABLE stb_0 KEEP 1d1",expectErrInfo="syntax error")
def check_child_table_with_keep(self):
tdLog.info(f"check child table with keep")
tdSql.execute("USE test")
tdSql.execute("CREATE DATABASE db")
tdSql.execute("USE db")
tdSql.execute("CREATE STABLE stb (ts TIMESTAMP, a INT, b FLOAT, c BINARY(10)) TAGS (e_id INT) KEEP 1d")
tdSql.error(f"CREATE TABLE ctb USING stb TAGS (1) KEEP 1d",expectErrInfo="child table cannot set keep duration")
tdSql.execute(f"CREATE TABLE ctb USING stb TAGS (1)")
tdSql.error(f"ALTER TABLE ctb keep 1d",expectErrInfo="only super table can alter keep duration")
def check_normal_table_with_keep(self):
tdLog.info(f"check normal table with keep")
tdSql.execute("USE test")
tdSql.error("CREATE TABLE ntb (ts TIMESTAMP, a INT, b FLOAT, c BINARY(10)) KEEP 1d",expectErrInfo="KEEP parameter is not allowed when creating normal table")
tdSql.execute("CREATE TABLE ntb (ts TIMESTAMP, a INT, b FLOAT, c BINARY(10))")
tdSql.error("ALTER TABLE ntb keep 1d",expectErrInfo="only super table can alter keep duration")
# run
def run(self):
tdLog.debug(f"start to excute {__file__}")
# prepare database
self.prepare_database()
# check create stb with keep
self.check_create_stb_with_keep()
# check create stb with err keep duration
self.check_create_stb_with_err_keep_duration()
# check alter stb with keep
self.check_alter_stb_with_keep()
# check alter stb with keep err
self.check_alter_stb_with_keep_err()
# check child table with keep
self.check_child_table_with_keep()
# check normal table with keep
self.check_normal_table_with_keep()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -0,0 +1,226 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import time
import datetime
import taos
import frame
import frame.etool
from frame.log import *
from frame.cases import *
from frame.sql import *
from frame.caseBase import *
from frame import *
class TDTestCase(TBase):
def prepare_database_and_data(self):
tdLog.info("===== Preparing database and tables for testing keep parameter =====")
# Create database with 7-day retention period
tdLog.info("Creating database with 7-day retention period")
tdSql.execute("DROP DATABASE IF EXISTS test_keep")
tdSql.execute("CREATE DATABASE test_keep DURATION 1 KEEP 7")
tdSql.execute("USE test_keep")
tdLog.info("Database created successfully")
# Create super table with 5-day retention period
tdLog.info("Creating super table with 5-day retention period")
tdSql.execute("CREATE STABLE stb_keep5 (ts TIMESTAMP, val INT) TAGS (t_id INT) KEEP 5d")
# Create super table with 2-day retention period
tdLog.info("Creating super table with 2-day retention period")
tdSql.execute("CREATE STABLE stb_keep2 (ts TIMESTAMP, val INT) TAGS (t_id INT) KEEP 2d")
# Create child tables
tdLog.info("Creating child tables")
tdSql.execute("CREATE TABLE tb_keep5_1 USING stb_keep5 TAGS(1)")
tdSql.execute("CREATE TABLE tb_keep2_1 USING stb_keep2 TAGS(1)")
# Get current timestamp
now = int(time.time() * 1000)
# Insert current data
tdLog.info("Inserting current data")
tdSql.execute(f"INSERT INTO tb_keep5_1 VALUES ({now}, 100)")
tdSql.execute(f"INSERT INTO tb_keep2_1 VALUES ({now}, 100)")
# Insert data from 1 day ago (relative to base_time)
day1_before = now - 24 * 3600 * 1000
tdLog.info("Inserting data from 1 day ago")
tdSql.execute(f"INSERT INTO tb_keep5_1 VALUES ({day1_before}, 90)")
tdSql.execute(f"INSERT INTO tb_keep2_1 VALUES ({day1_before}, 90)")
# Insert data from 3 days ago (relative to base_time)
day3_before = now - 3 * 24 * 3600 * 1000
tdLog.info("Inserting data from 3 days ago")
tdSql.execute(f"INSERT INTO tb_keep5_1 VALUES ({day3_before}, 70)")
tdSql.execute(f"INSERT INTO tb_keep2_1 VALUES ({day3_before}, 70)")
# Insert data from 6 days ago (relative to base_time)
day6_before = now - 6 * 24 * 3600 * 1000
tdLog.info("Inserting data from 6 days ago")
tdSql.execute(f"INSERT INTO tb_keep5_1 VALUES ({day6_before}, 40)")
tdSql.execute(f"INSERT INTO tb_keep2_1 VALUES ({day6_before}, 40)")
# Log the timestamps of inserted data points
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(now/1000))
day1_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(day1_before/1000))
day3_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(day3_before/1000))
day6_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(day6_before/1000))
tdLog.info(f"Inserted data points at: Base time ({current_time}), 1 day before ({day1_time}), 3 days before ({day3_time}), 6 days before ({day6_time})")
# Verify data was properly inserted
tdLog.info("Verifying all data was properly inserted (ignoring keep settings for now)")
tdSql.query("SELECT COUNT(*) FROM tb_keep5_1")
count1 = tdSql.getData(0, 0)
tdLog.info(f"tb_keep5_1 has {count1} rows inserted")
tdSql.query("SELECT COUNT(*) FROM tb_keep2_1")
count2 = tdSql.getData(0, 0)
tdLog.info(f"tb_keep2_1 has {count2} rows inserted")
def check_stb_keep_influences_query(self):
tdLog.info("===== Testing scenario 1: Super table keep parameter influences query results =====")
tdSql.execute("USE test_keep")
# Verify data visibility in stb_keep5
tdLog.info("Checking data visibility in stb_keep5 (keep=5d)")
tdSql.query("SELECT * FROM stb_keep5")
tdSql.checkRows(3)
tdLog.info("Checking oldest visible data in stb_keep5")
tdSql.query("SELECT val FROM stb_keep5 ORDER BY ts ASC")
tdSql.checkData(0, 0, 70)
# Verify data visibility in stb_keep2
tdLog.info("Checking data visibility in stb_keep2 (keep=2d)")
tdSql.query("SELECT * FROM stb_keep2")
tdSql.checkRows(2)
tdLog.info("Checking oldest visible data in stb_keep2")
tdSql.query("SELECT val FROM stb_keep2 ORDER BY ts ASC")
tdSql.checkData(0, 0, 90)
tdLog.info("Super table keep parameter successfully influences query results")
def prepare_db_keep_override_test(self):
tdLog.info("===== Preparing database and tables for testing DB keep parameter override =====")
# Create database with 1-day retention period
tdLog.info("Creating database with 1-day retention period")
tdSql.execute("DROP DATABASE IF EXISTS test_db_keep")
tdSql.execute("CREATE DATABASE test_db_keep DURATION 60m KEEP 1")
tdSql.execute("USE test_db_keep")
# Create super table with 7-day retention period
tdLog.info("Creating super table with 7-day retention period")
tdSql.execute("CREATE STABLE stb_keep7 (ts TIMESTAMP, val INT) TAGS (t_id INT) KEEP 7d")
# Create child table
tdLog.info("Creating child table")
tdSql.execute("CREATE TABLE tb_keep7_1 USING stb_keep7 TAGS(1)")
# Get current timestamp for data insertion
# We'll use the real current time to ensure we're within the keep window
now = int(time.time() * 1000)
# Insert current data
tdLog.info("Inserting current data")
tdSql.execute(f"INSERT INTO tb_keep7_1 VALUES ({now}, 100)")
# Insert data from 8 hours ago (safely within the 1-day keep period)
hours8_before = now - 8 * 3600 * 1000
tdLog.info("Inserting data from 8 hours ago (within DB keep=1d)")
tdSql.execute(f"INSERT INTO tb_keep7_1 VALUES ({hours8_before}, 90)")
# Log information about the timestamps
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(now/1000))
hours8_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(hours8_before/1000))
tdLog.info(f"Inserted data points at: Current time ({current_time}), 8 hours ago ({hours8_time})")
# Verify data was properly inserted
tdSql.query("SELECT COUNT(*) FROM tb_keep7_1")
count = tdSql.getData(0, 0)
tdLog.info(f"tb_keep7_1 has {count} rows inserted")
# For demonstration purposes, calculate what the 2-day timestamp would be
# (we can't insert it, but we'll use it for our test explanation)
day2_before = now - 2 * 24 * 3600 * 1000
day2_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(day2_before/1000))
tdLog.info(f"Note: Data from 2 days ago ({day2_time}) cannot be inserted because it exceeds the database keep=1d setting")
tdLog.info(f"We will verify the database keep value (1d) overrides the super table keep value (7d) in the next test")
def check_db_keep_overrides_stb_keep(self):
tdLog.info("===== Testing scenario 2: Database keep parameter overrides super table keep parameter =====")
tdSql.execute("USE test_db_keep")
# Verify the data we inserted is visible
tdLog.info("Checking data visibility in stb_keep7 (DB keep=1d, STB keep=7d)")
tdSql.query("SELECT COUNT(*) FROM stb_keep7")
count = tdSql.getData(0, 0)
tdLog.info(f"stb_keep7 returned {count} rows")
tdSql.checkEqual(count, 2) # Should see both recent records
# Attempt to demonstrate that DB keep overrides STB keep
tdLog.info("Verifying database keep (1d) overrides super table keep (7d):")
# Method 1: Check that trying to insert data beyond DB keep fails
tdLog.info("Method 1: Checking that inserting data beyond DB keep (1d) fails even though STB keep is 7d")
now = int(time.time() * 1000)
day2_before = now - 2 * 24 * 3600 * 1000
day2_query = f"INSERT INTO tb_keep7_1 VALUES ({day2_before}, 80)"
try:
# This should fail with "Timestamp data out of range" because it's beyond DB keep
tdSql.error(day2_query, expectErrInfo="Timestamp data out of range")
tdLog.info("Success: Database rejected data beyond keep period (1d) as expected")
except Exception as e:
tdLog.info(f"Test validation failed: {e}")
# Method 2: Verify we can't query data that would be valid under STB keep but invalid under DB keep
tdLog.info("Method 2: Verifying data from 2 days ago is not visible (if it existed)")
day2_time = time.strftime("%Y-%m-%d", time.localtime(day2_before/1000))
query = f"SELECT COUNT(*) FROM stb_keep7 WHERE ts <= '{day2_time} 23:59:59.999' AND ts >= '{day2_time} 00:00:00.000'"
tdSql.query(query)
count = tdSql.getData(0, 0)
tdLog.info(f"Found {count} rows for 2-day old data (expecting 0)")
tdSql.checkEqual(count, 0)
tdLog.info("Conclusion: Database keep parameter (1d) successfully overrides super table keep parameter (7d)")
# Run tests
def run(self):
tdLog.debug(f"Start to execute {__file__}")
# Prepare test data
self.prepare_database_and_data()
# Test scenario 1: Super table keep parameter influences query results
self.check_stb_keep_influences_query()
# Prepare test data for database keep override
self.prepare_db_keep_override_test()
# Test scenario 2: Database keep parameter overrides super table keep parameter
self.check_db_keep_overrides_stb_keep()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -0,0 +1,708 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import time
import datetime
import taos
import frame
import frame.etool
from frame.log import *
from frame.cases import *
from frame.sql import *
from frame.caseBase import *
from frame import *
class TDTestCase(TBase):
"""
Test case to verify super table keep parameter behavior with compaction
This test verifies that:
1. Super table keep parameter only takes effect during compaction
2. Before compaction, all historical data is visible regardless of keep settings
3. After compaction, data older than the keep period is removed
4. Different combinations of database keep and super table keep behave as expected
"""
def prepare_database_with_keep(self, db_name, db_keep):
"""Create a database with specified keep value"""
tdLog.info(f"Creating database {db_name} with keep={db_keep}")
tdSql.execute(f"DROP DATABASE IF EXISTS {db_name}")
# Ensure duration is small enough to satisfy the rule keep > 3*duration
# Calculate the minimum legal duration (ensure keep > 3*duration)
max_legal_duration = int(db_keep / 3)
if max_legal_duration < 1:
# If keep is too small, use smaller time unit
# Use hours as unit, 1 day = 24 hours
duration_hours = 12 # Use 12 hours
tdLog.info(f"Setting duration={duration_hours}h to ensure keep({db_keep}) > 3*duration")
tdSql.execute(f"CREATE DATABASE {db_name} DURATION {duration_hours}h KEEP {db_keep}")
else:
duration = max(1, max_legal_duration - 1) # Conservatively, use smaller value, ensure it's an integer and not zero
tdLog.info(f"Setting duration={duration}d to ensure keep({db_keep}) > 3*duration")
tdSql.execute(f"CREATE DATABASE {db_name} DURATION {duration}d KEEP {db_keep}")
tdSql.execute(f"USE {db_name}")
return True
def create_super_table_with_keep(self, stb_name, keep_days):
"""Create a super table with specified keep value in days"""
tdLog.info(f"Creating super table {stb_name} with keep={keep_days}d")
create_sql = f"CREATE STABLE {stb_name} (ts TIMESTAMP, val INT) TAGS (t_id INT) KEEP {keep_days}d"
tdSql.execute(create_sql)
return True
def create_tables_and_insert_data(self, stb_name, table_prefix, table_count=1):
"""Create child tables and insert data at different time points"""
# Current time and historical data points
now = int(time.time() * 1000) # Get current time directly, not relying on self
day1_ts = now - 1 * 24 * 3600 * 1000 # 1 day ago
day3_ts = now - 3 * 24 * 3600 * 1000 # 3 days ago
day5_ts = now - 5 * 24 * 3600 * 1000 # 5 days ago
day7_ts = now - 7 * 24 * 3600 * 1000 # 7 days ago
for i in range(1, table_count + 1):
tb_name = f"{table_prefix}_{i}"
tdLog.info(f"Creating child table {tb_name} under {stb_name}")
tdSql.execute(f"CREATE TABLE {tb_name} USING {stb_name} TAGS({i})")
# Insert data at different time points
tdLog.info(f"Inserting data into {tb_name} at different time points")
tdSql.execute(f"INSERT INTO {tb_name} VALUES ({now}, 100)")
tdSql.execute(f"INSERT INTO {tb_name} VALUES ({day1_ts}, 90)")
tdSql.execute(f"INSERT INTO {tb_name} VALUES ({day3_ts}, 70)")
tdSql.execute(f"INSERT INTO {tb_name} VALUES ({day5_ts}, 50)")
tdSql.execute(f"INSERT INTO {tb_name} VALUES ({day7_ts}, 30)")
# Log timestamps for reference
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(now/1000))
day1_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(day1_ts/1000))
day3_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(day3_ts/1000))
day5_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(day5_ts/1000))
day7_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(day7_ts/1000))
tdLog.info(f"Inserted data at: Current ({current_time}), 1 day ago ({day1_time}), " +
f"3 days ago ({day3_time}), 5 days ago ({day5_time}), 7 days ago ({day7_time})")
return {
"now": now,
"day1_ts": day1_ts,
"day3_ts": day3_ts,
"day5_ts": day5_ts,
"day7_ts": day7_ts,
"day1_time": day1_time,
"day3_time": day3_time,
"day5_time": day5_time,
"day7_time": day7_time
}
def create_tables_and_insert_data_within_days(self, stb_name, table_prefix, max_days, table_count=1):
"""Create child tables and insert data at different time points within specified max days"""
# Current time and historical data points
now = int(time.time() * 1000) # Get current time directly, not relying on self
day1_ts = now - 1 * 24 * 3600 * 1000 # 1 day ago
day3_ts = now - 3 * 24 * 3600 * 1000 # 3 days ago
day5_ts = now - 5 * 24 * 3600 * 1000 # 5 days ago
for i in range(1, table_count + 1):
tb_name = f"{table_prefix}_{i}"
tdLog.info(f"Creating child table {tb_name} under {stb_name}")
tdSql.execute(f"CREATE TABLE {tb_name} USING {stb_name} TAGS({i})")
# Insert data at different time points within max_days
tdLog.info(f"Inserting data into {tb_name} at different time points within {max_days} days")
tdSql.execute(f"INSERT INTO {tb_name} VALUES ({now}, 100)")
tdSql.execute(f"INSERT INTO {tb_name} VALUES ({day1_ts}, 90)")
tdSql.execute(f"INSERT INTO {tb_name} VALUES ({day3_ts}, 70)")
# Only insert 5-day old data if max_days >= 5
if max_days >= 5:
tdSql.execute(f"INSERT INTO {tb_name} VALUES ({day5_ts}, 50)")
# Log timestamps for reference
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(now/1000))
day1_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(day1_ts/1000))
day3_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(day3_ts/1000))
day5_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(day5_ts/1000))
tdLog.info(f"Inserted data at: Current ({current_time}), 1 day ago ({day1_time}), " +
f"3 days ago ({day3_time})" +
(f", 5 days ago ({day5_time})" if max_days >= 5 else ""))
return {
"now": now,
"day1_ts": day1_ts,
"day3_ts": day3_ts,
"day5_ts": day5_ts if max_days >= 5 else None,
"day1_time": day1_time,
"day3_time": day3_time,
"day5_time": day5_time if max_days >= 5 else None
}
def verify_data_count(self, table_name, expected_count, msg=""):
"""Verify the count of rows in a table"""
tdSql.query(f"SELECT COUNT(*) FROM {table_name}")
actual_count = tdSql.getData(0, 0)
tdLog.info(f"{msg} - Expected: {expected_count}, Actual: {actual_count}")
tdSql.checkEqual(actual_count, expected_count)
def verify_oldest_data(self, table_name, expected_val, msg=""):
"""Verify the oldest data value in a table"""
tdSql.query(f"SELECT val FROM {table_name} ORDER BY ts ASC LIMIT 1")
actual_val = tdSql.getData(0, 0)
tdLog.info(f"{msg} - Expected oldest value: {expected_val}, Actual: {actual_val}")
tdSql.checkEqual(actual_val, expected_val)
def trigger_compact(self, db_name):
"""Trigger database compaction"""
tdLog.info(f"Triggering compaction for database {db_name}...")
tdLog.info(f"FLUSH DATABASE {db_name}")
tdSql.execute(f"FLUSH DATABASE {db_name}")
time.sleep(10)
# Correct syntax includes database name
tdSql.execute(f"COMPACT DATABASE {db_name}")
# Give system enough time to complete compaction
time.sleep(10)
tdLog.info(f"Compaction operation for {db_name} completed")
def test_case1_stb_keep_2_db_keep_10(self):
"""Test case 1: STB keep=2, DB keep=10 - STB keep should determine data retention after compact"""
tdLog.info("=== Test Case 1: STB keep=2, DB keep=10 ===")
# Setup
self.prepare_database_with_keep("test_stb_compact1", 10)
self.create_super_table_with_keep("stb_keep2", 2)
self.create_tables_and_insert_data("stb_keep2", "tb_case1")
# Verify data before compact
self.verify_data_count("stb_keep2", 5, "Before compact - all data should be visible")
self.verify_oldest_data("stb_keep2", 30, "Before compact - 7-day old data should be visible")
# Trigger compact
self.trigger_compact("test_stb_compact1")
# Verify data after compact
# With STB keep=2, data older than 2 days should be removed
self.verify_data_count("stb_keep2", 2, "After compact - only data within 2 days should remain")
self.verify_oldest_data("stb_keep2", 90, "After compact - oldest data should be from 1 day ago")
def test_case2_stb_keep_6_db_keep_4(self):
"""Test case 2: STB keep=6, DB keep=4 - DB keep should override STB keep"""
tdLog.info("=== Test Case 2: STB keep=6, DB keep=4 ===")
# Setup
# Modify database keep value to 5, ensuring it satisfies keep > 3*duration rule
# Even with duration minimum of 1, keep=5 can satisfy the condition
db_keep = 5
self.prepare_database_with_keep("test_stb_compact2", db_keep)
self.create_super_table_with_keep("stb_keep6", 6)
# Only insert data within db_keep-1 days to avoid going beyond database keep range
safe_days = db_keep - 1 # Safe margin to avoid boundary condition issues
self.create_tables_and_insert_data_within_days("stb_keep6", "tb_case2", safe_days)
# Verify data before compact
# If safe_days=4, we only have at most 4 data points (current, 1 day ago, 3 days ago, not including 5 days ago)
expected_count = 3 # current, 1 day ago, 3 days ago
self.verify_data_count("stb_keep6", expected_count, "Before compact - all data should be visible")
self.verify_oldest_data("stb_keep6", 70, "Before compact - 3-day old data should be visible")
# Trigger compact
self.trigger_compact("test_stb_compact2")
# Verify data after compact
# Database keep=5, STB keep=6, all data is within retention range, so no data should be deleted
self.verify_data_count("stb_keep6", expected_count, "After compact - all data should remain")
self.verify_oldest_data("stb_keep6", 70, "After compact - oldest data should still be from 3 days ago")
def test_case3_multiple_stbs_with_different_keep(self):
"""Test case 3: Multiple STBs with different keep values in same database"""
tdLog.info("=== Test Case 3: Multiple STBs with different keep values ===")
# Setup
self.prepare_database_with_keep("test_stb_compact3", 10)
self.create_super_table_with_keep("stb_keep2", 2)
self.create_super_table_with_keep("stb_keep4", 4)
self.create_super_table_with_keep("stb_keep8", 8)
self.create_tables_and_insert_data("stb_keep2", "tb_keep2")
self.create_tables_and_insert_data("stb_keep4", "tb_keep4")
self.create_tables_and_insert_data("stb_keep8", "tb_keep8")
# Verify data before compact
for stb in ["stb_keep2", "stb_keep4", "stb_keep8"]:
self.verify_data_count(stb, 5, f"Before compact - all data should be visible in {stb}")
# Trigger compact
self.trigger_compact("test_stb_compact3")
# Verify data after compact
# Each STB should retain data according to its keep value
self.verify_data_count("stb_keep2", 2, "After compact - stb_keep2 should keep 2 days of data")
self.verify_oldest_data("stb_keep2", 90, "After compact - stb_keep2 oldest data from 1 day ago")
self.verify_data_count("stb_keep4", 3, "After compact - stb_keep4 should keep 4 days of data")
self.verify_oldest_data("stb_keep4", 70, "After compact - stb_keep4 oldest data from 3 days ago")
self.verify_data_count("stb_keep8", 5, "After compact - stb_keep8 should keep all data (within 8 days)")
self.verify_oldest_data("stb_keep8", 30, "After compact - stb_keep8 oldest data from 7 days ago")
def test_case4_boundary_keep_duration_ratio(self):
"""Test case 4: Testing boundary condition where keep is slightly above 3*duration"""
tdLog.info("=== Test Case 4: Boundary keep/duration ratio ===")
# Create a database with keep=10, duration=3 (exactly satisfying keep > 3*duration)
db_name = "test_stb_compact4"
db_keep = 10
duration = 3 # days
tdLog.info(f"Creating database with boundary condition: keep={db_keep}, duration={duration}d")
tdSql.execute(f"DROP DATABASE IF EXISTS {db_name}")
tdSql.execute(f"CREATE DATABASE {db_name} DURATION {duration}d KEEP {db_keep}")
tdSql.execute(f"USE {db_name}")
# Create STB with keep<5 to test extreme condition
self.create_super_table_with_keep("stb_keep3", 3)
self.create_tables_and_insert_data("stb_keep3", "tb_boundary_min")
# Create STB with data
self.create_super_table_with_keep("stb_keep7", 7)
self.create_tables_and_insert_data("stb_keep7", "tb_boundary")
# Verify data before compact
self.verify_data_count("stb_keep7", 5, "Before compact - all data should be visible")
self.verify_oldest_data("stb_keep7", 30, "Before compact - 7-day old data should be visible")
self.verify_data_count("stb_keep3", 5, "Before compact - all data should be visible in stb_keep3")
# Trigger compact
self.trigger_compact("test_stb_compact4")
# Verify data after compact
# Database keep=10, STB keep=7, STB keep should determine retention
self.verify_data_count("stb_keep7", 4, "After compact - data within 7 days should remain")
self.verify_oldest_data("stb_keep7", 50, "After compact - oldest data from 7 days ago should remain")
# Verify minimum keep value STB (keep=3)
self.verify_data_count("stb_keep3", 2, "After compact - only data within 3 days should remain")
self.verify_oldest_data("stb_keep3", 90, "After compact - oldest data should be from 3 days ago")
def test_case5_write_time_with_keep_restrictions(self):
"""Test case 5: Testing write behavior with keep restrictions"""
tdLog.info("=== Test Case 5: Write behavior with keep restrictions ===")
# Setup: database keep=8, STB keep values 3 and 10
db_name = "test_stb_write_keep"
db_keep = 8
# Create database
tdLog.info(f"Creating database with keep={db_keep}")
tdSql.execute(f"DROP DATABASE IF EXISTS {db_name}")
tdSql.execute(f"CREATE DATABASE {db_name} DURATION 2d KEEP {db_keep}")
tdSql.execute(f"USE {db_name}")
# Create two super tables: one with keep value less than database, one with greater
tdLog.info("Creating super tables with different keep values")
self.create_super_table_with_keep("stb_keep3", 3) # keep value less than database
self.create_super_table_with_keep("stb_keep10", 10) # keep value greater than database
# Create child tables
tdLog.info("Creating child tables")
tdSql.execute("CREATE TABLE tb_keep3_1 USING stb_keep3 TAGS(1)")
tdSql.execute("CREATE TABLE tb_keep10_1 USING stb_keep10 TAGS(1)")
# Get current time and historical timestamps
now = int(time.time() * 1000)
day6_ts = now - 6 * 24 * 3600 * 1000 # 6 days ago
day9_ts = now - 9 * 24 * 3600 * 1000 # 9 days ago
day6_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(day6_ts/1000))
day9_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(day9_ts/1000))
tdLog.info(f"Current time: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(now/1000))}")
tdLog.info(f"Time 6 days ago: {day6_time}")
tdLog.info(f"Time 9 days ago: {day9_time}")
# Scenario 1: Timestamp within database keep range (8d) but beyond STB keep range (3d)
tdLog.info("Scenario 1: Timestamp within database keep (8d) but beyond stable keep (3d)")
# Should be able to insert successfully
tdSql.execute(f"INSERT INTO tb_keep3_1 VALUES ({day6_ts}, 40)")
tdLog.info("Successfully inserted data beyond stable keep but within database keep - CORRECT BEHAVIOR")
# Check if data was inserted successfully
tdSql.query(f"SELECT * FROM tb_keep3_1 WHERE ts = {day6_ts}")
if tdSql.queryRows == 1:
tdLog.info("Verified data was inserted successfully")
else:
tdLog.info("ERROR: Failed to verify inserted data")
# Optional: Run compact to see if data beyond STB keep but within DB keep is removed
self.trigger_compact(db_name)
# Check if data still exists after compact
tdSql.query(f"SELECT * FROM tb_keep3_1 WHERE ts = {day6_ts}")
if tdSql.queryRows == 0:
tdLog.info("After compact: Data beyond STB keep (3d) was removed as expected")
else:
tdLog.info("After compact: Data beyond STB keep (3d) was retained (unexpected)")
# Scenario 2: Timestamp beyond database keep range (8d)
tdLog.info("Scenario 2: Timestamp beyond database keep (8d)")
expected_error = "Timestamp data out of range" # Expected error message
insert_sql = f"INSERT INTO tb_keep3_1 VALUES ({day9_ts}, 10)"
# Use tdSql.error to check if expected error is raised
try:
# This insertion should fail because it exceeds database keep range
tdSql.error(insert_sql)
tdLog.info("Insertion beyond database keep was correctly rejected - EXPECTED ERROR")
except Exception as e:
tdLog.info(f"ERROR: Expected error was not raised: {str(e)}")
# Don't raise exception, allow test to continue
# Scenario 3: Try to insert data beyond database keep into STB with keep value > database keep
tdLog.info("Scenario 3: Timestamp beyond database keep (8d) for table with stable keep (10d)")
insert_sql = f"INSERT INTO tb_keep10_1 VALUES ({day9_ts}, 10)"
# Use tdSql.error to check if expected error is raised
tdSql.error(insert_sql, expectErrInfo="Timestamp data out of range")
tdLog.info("Insertion beyond database keep for table with larger stable keep was successful - NEW EXPECTED BEHAVIOR")
def test_case6_db_keep_8_stb_keep_4(self):
"""Test case 6: DB keep=8, STB keep=4 - Testing data insertion before and after compaction"""
tdLog.info("=== Test Case 6: DB keep=8, STB keep=4 - Data insertion behavior ===")
# Setup
db_name = "test_stb_compact6"
db_keep = 8
stb_keep = 4
# Create database and super table
tdLog.info(f"Creating database with keep={db_keep}")
tdSql.execute(f"DROP DATABASE IF EXISTS {db_name}")
tdSql.execute(f"CREATE DATABASE {db_name} DURATION 2d KEEP {db_keep}")
tdSql.execute(f"USE {db_name}")
# Create super table with keep=4
tdLog.info(f"Creating super table with keep={stb_keep}d")
self.create_super_table_with_keep("stb_keep4", stb_keep)
# Create child table
tdLog.info("Creating child table")
tdSql.execute("CREATE TABLE tb_keep4_1 USING stb_keep4 TAGS(1)")
# Get current time and historical timestamps
now = int(time.time() * 1000)
day3_ts = now - 3 * 24 * 3600 * 1000 # 3 days ago
day7_ts = now - 7 * 24 * 3600 * 1000 # 7 days ago
day3_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(day3_ts/1000))
day7_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(day7_ts/1000))
tdLog.info(f"Current time: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(now/1000))}")
tdLog.info(f"Time 3 days ago: {day3_time}")
tdLog.info(f"Time 7 days ago: {day7_time}")
# Step 1: Insert data from 3 days ago and 7 days ago
tdLog.info("Step 1: Inserting data from 3 days ago and 7 days ago")
tdSql.execute(f"INSERT INTO tb_keep4_1 VALUES ({now}, 100)")
tdSql.execute(f"INSERT INTO tb_keep4_1 VALUES ({day3_ts}, 70)")
tdSql.execute(f"INSERT INTO tb_keep4_1 VALUES ({day7_ts}, 30)")
# Verify initial insertion
tdSql.query("SELECT COUNT(*) FROM stb_keep4")
tdLog.info(f"Initial data count: {tdSql.getData(0, 0)}")
tdSql.checkEqual(tdSql.getData(0, 0), 3)
# Step 2: Flush and compact the database
tdLog.info("Step 2: Flushing and compacting the database")
self.trigger_compact(db_name)
# Step 3: Query after compaction
tdLog.info("Step 3: Querying data after compaction")
tdSql.query("SELECT COUNT(*) FROM stb_keep4")
count_after_compact = tdSql.getData(0, 0)
tdLog.info(f"Data count after compaction: {count_after_compact}")
# Check if data from 7 days ago (beyond STB keep=4) is removed
tdSql.query(f"SELECT * FROM stb_keep4 WHERE ts = {day7_ts}")
if tdSql.queryRows == 0:
tdLog.info("Data from 7 days ago was correctly removed after compaction")
else:
tdLog.info("ERROR: Data from 7 days ago was not removed as expected")
# Check if data from 3 days ago (within STB keep=4) is retained
tdSql.query(f"SELECT * FROM stb_keep4 WHERE ts = {day3_ts}")
if tdSql.queryRows == 1:
tdLog.info("Data from 3 days ago was correctly retained after compaction")
else:
tdLog.info("ERROR: Data from 3 days ago was unexpectedly removed")
# Step 4: Try to insert data from 7 days ago again (after compaction)
tdLog.info("Step 4: Inserting data from 7 days ago after compaction")
tdSql.execute(f"INSERT INTO tb_keep4_1 VALUES ({day7_ts}, 35)")
# Verify new insertion
tdSql.query(f"SELECT * FROM stb_keep4 WHERE ts = {day7_ts}")
if tdSql.queryRows == 1:
tdLog.info("Successfully inserted data from 7 days ago after compaction")
tdLog.info(f"Value: {tdSql.getData(0, 1)}")
else:
tdLog.info("ERROR: Failed to insert data from 7 days ago after compaction")
# Get total count after new insertion
tdSql.query("SELECT COUNT(*) FROM stb_keep4")
count_after_insert = tdSql.getData(0, 0)
tdLog.info(f"Data count after new insertion: {count_after_insert}")
# Expected count: retained count from before + 1 new record
expected_count = count_after_compact + 1
tdSql.checkEqual(count_after_insert, expected_count)
def test_case7_alter_stb_keep(self):
"""Test case 7: Test ALTER STABLE KEEP parameter and its effect on data retention after compaction"""
tdLog.info("=== Test Case 7: ALTER STABLE KEEP parameter ===")
# Setup
db_name = "test_stb_alter_keep"
db_keep = 10 # Set database keep to a higher value to allow flexible STB keep testing
initial_stb_keep = 3 # Initial keep value
# Create database and super table with initial keep value
tdLog.info(f"Creating database with keep={db_keep}")
tdSql.execute(f"DROP DATABASE IF EXISTS {db_name}")
tdSql.execute(f"CREATE DATABASE {db_name} DURATION 2d KEEP {db_keep}")
tdSql.execute(f"USE {db_name}")
# Create super table with initial keep
tdLog.info(f"Creating super table with initial keep={initial_stb_keep}d")
self.create_super_table_with_keep("stb_alter_keep", initial_stb_keep)
# Create child table and insert data with safer time margins
tdLog.info("Creating child table and inserting data")
# For safety, we'll insert data with specific values that are clearly within boundaries
now = int(time.time() * 1000) # Current time in milliseconds
# Add margin to ensure day calculations don't fall on boundary
# Subtract a few hours from each day boundary for safety
margin_hours = 4 # 4 hours safety margin
margin_ms = margin_hours * 3600 * 1000 # Convert to milliseconds
# Calculate timestamps with safety margins
day1_ts = now - (1 * 24 * 3600 * 1000) - margin_ms # ~1.2 days ago
day2_ts = now - (2 * 24 * 3600 * 1000) - margin_ms # ~2.2 days ago
day4_ts = now - (4 * 24 * 3600 * 1000) - margin_ms # ~4.2 days ago
day6_ts = now - (6 * 24 * 3600 * 1000) - margin_ms # ~6.2 days ago
# Create table and insert data
tdSql.execute("CREATE TABLE tb_alter_keep_1 USING stb_alter_keep TAGS(1)")
# Insert data at different time points
tdLog.info("Inserting data at different time points with safety margins")
tdSql.execute(f"INSERT INTO tb_alter_keep_1 VALUES ({now}, 100)") # Current
tdSql.execute(f"INSERT INTO tb_alter_keep_1 VALUES ({day1_ts}, 90)") # ~1.2 days ago
tdSql.execute(f"INSERT INTO tb_alter_keep_1 VALUES ({day2_ts}, 80)") # ~2.2 days ago
tdSql.execute(f"INSERT INTO tb_alter_keep_1 VALUES ({day4_ts}, 60)") # ~4.2 days ago
tdSql.execute(f"INSERT INTO tb_alter_keep_1 VALUES ({day6_ts}, 40)") # ~6.2 days ago
# Log the timestamps for debugging
tdLog.info(f"Current time: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(now/1000))}")
tdLog.info(f"~1.2 days ago: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(day1_ts/1000))}")
tdLog.info(f"~2.2 days ago: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(day2_ts/1000))}")
tdLog.info(f"~4.2 days ago: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(day4_ts/1000))}")
tdLog.info(f"~6.2 days ago: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(day6_ts/1000))}")
# Verify initial data insertion - all data should be visible
tdSql.query("SELECT COUNT(*) FROM stb_alter_keep")
initial_count = tdSql.getData(0, 0)
tdLog.info(f"Initial data count: {initial_count}")
tdSql.checkEqual(initial_count, 5)
# Create a timestamp map for later use
timestamps = {
"now": now,
"day1_ts": day1_ts,
"day2_ts": day2_ts,
"day4_ts": day4_ts,
"day6_ts": day6_ts,
}
# Perform first compaction with initial keep value
tdLog.info(f"Performing first compaction with STB keep={initial_stb_keep}")
self.trigger_compact(db_name)
# Verify data after first compaction - data older than initial_stb_keep should be removed
# With keep=3, and our safety margin, we expect data within ~2.9 days to be kept
# This should definitely include current, day1_ts, day2_ts, but not day4_ts or day6_ts
tdSql.query("SELECT COUNT(*) FROM stb_alter_keep")
count_after_first_compact = tdSql.getData(0, 0)
tdLog.info(f"Data count after first compaction: {count_after_first_compact}")
# Check individual records to see what was preserved
tdSql.query(f"SELECT * FROM stb_alter_keep WHERE ts = {now}")
tdSql.checkEqual(tdSql.queryRows, 1)
tdSql.query(f"SELECT * FROM stb_alter_keep WHERE ts = {day1_ts}")
tdSql.checkEqual(tdSql.queryRows, 1)
tdSql.query(f"SELECT * FROM stb_alter_keep WHERE ts = {day2_ts}")
tdSql.checkEqual(tdSql.queryRows, 1)
tdSql.query(f"SELECT COUNT(*) FROM stb_alter_keep")
tdSql.checkEqual(tdSql.getData(0, 0), 3)
# Expected count should be the sum of records actually preserved
expected_preserved_count = 3
# Increase keep value
new_keep_value = 6 # Increase to 6 days
tdLog.info(f"Altering STB keep value from {initial_stb_keep} to {new_keep_value}")
tdSql.execute(f"ALTER STABLE stb_alter_keep KEEP {new_keep_value}d")
# Re-insert data at 4.2 days ago that was likely deleted in first compaction
tdLog.info("Re-inserting data from ~4.2 days ago")
tdSql.execute(f"INSERT INTO tb_alter_keep_1 VALUES ({day4_ts}, 65)") # Different value to track
# Verify data after insertion but before second compaction
tdSql.query("SELECT COUNT(*) FROM stb_alter_keep")
count_after_reinsertion = tdSql.getData(0, 0)
tdLog.info(f"Data count after reinsertion: {count_after_reinsertion}")
expected_count_after_reinsertion = expected_preserved_count + 1 # +1 for the reinserted record
tdSql.checkEqual(count_after_reinsertion, expected_count_after_reinsertion)
# Perform second compaction with new keep value
tdLog.info(f"Performing second compaction with increased STB keep={new_keep_value}")
self.trigger_compact(db_name)
# Verify data after second compaction
tdSql.query("SELECT COUNT(*) FROM stb_alter_keep")
count_after_second_compact = tdSql.getData(0, 0)
tdLog.info(f"Data count after second compaction: {count_after_second_compact}")
tdSql.checkEqual(count_after_second_compact, 4)
# Verify the re-inserted data (day4) is retained after second compaction
tdSql.query(f"SELECT val FROM stb_alter_keep WHERE ts = {day4_ts}")
tdSql.checkEqual(tdSql.queryRows, 1)
tdSql.checkEqual(tdSql.getData(0, 0), 65)
# Check if day6 data was either never inserted or was correctly removed
tdSql.query(f"SELECT * FROM stb_alter_keep WHERE ts = {day6_ts}")
tdSql.checkEqual(tdSql.queryRows, 0)
def test_case8_stb_keep_compact_with_db_keep(self):
"""Test case 8: Test STB keep compact with database keep"""
tdLog.info("=== Test Case 8: STB keep compact with database keep ===")
# Setup
db_name = "test_stb_alter_keep"
db_keep = 10 # Set database keep to a higher value to allow flexible STB keep testing
initial_stb_keep = 3 # Initial keep value
# Create database and super table with initial keep value
tdLog.info(f"Creating database with keep={db_keep}")
tdSql.execute(f"DROP DATABASE IF EXISTS {db_name}")
tdSql.execute(f"CREATE DATABASE {db_name} DURATION 2d KEEP {db_keep}")
tdSql.execute(f"USE {db_name}")
# Create super table with initial keep
tdLog.info(f"Creating super table with initial keep={initial_stb_keep}d")
self.create_super_table_with_keep("stb_alter_keep", initial_stb_keep)
# Create child table and insert data with safer time margins
tdLog.info("Creating child table and inserting data")
# For safety, we'll insert data with specific values that are clearly within boundaries
now = int(time.time() * 1000) # Current time in milliseconds
# Add margin to ensure day calculations don't fall on boundary
# Subtract a few hours from each day boundary for safety
margin_hours = 4 # 4 hours safety margin
margin_ms = margin_hours * 3600 * 1000 # Convert to milliseconds
# Calculate timestamps with safety margins
day0_ts = now - (0 * 24 * 3600 * 1000) - margin_ms # ~0.2 days ago
day1_ts = now - (1 * 24 * 3600 * 1000) - margin_ms # ~1.2 days ago
day2_ts = now - (2 * 24 * 3600 * 1000) - margin_ms # ~2.2 days ago
day4_ts = now - (4 * 24 * 3600 * 1000) - margin_ms # ~4.2 days ago
day6_ts = now - (6 * 24 * 3600 * 1000) - margin_ms # ~6.2 days ago
# Create table and insert data
tdSql.execute("CREATE TABLE tb_alter_keep_1 USING stb_alter_keep TAGS(1)")
# Insert data at different time points
tdLog.info("Inserting data at different time points with safety margins")
tdSql.execute(f"INSERT INTO tb_alter_keep_1 VALUES ({now}, 100)") # Current
tdSql.execute(f"INSERT INTO tb_alter_keep_1 VALUES ({day1_ts}, 90)") # ~1.2 days ago
tdSql.execute(f"INSERT INTO tb_alter_keep_1 VALUES ({day2_ts}, 80)") # ~2.2 days ago
tdSql.execute(f"INSERT INTO tb_alter_keep_1 VALUES ({day4_ts}, 60)") # ~4.2 days ago
tdSql.execute(f"INSERT INTO tb_alter_keep_1 VALUES ({day6_ts}, 40)") # ~6.2 days ago
# Decrease keep value to less than original
final_keep_value = 2 # Decrease to 2 days (less than original)
tdLog.info(f"Altering STB keep value from {initial_stb_keep} to {final_keep_value}")
tdSql.execute(f"ALTER STABLE stb_alter_keep KEEP {final_keep_value}d")
tdSql.execute(f"INSERT INTO tb_alter_keep_1 VALUES ({day0_ts}, 85)")
# Perform third compaction with reduced keep value
tdLog.info(f"Performing third compaction with decreased STB keep={final_keep_value}")
self.trigger_compact(db_name)
# Verify data after third compaction
tdSql.query("SELECT COUNT(*) FROM stb_alter_keep")
count_after_third_compact = tdSql.getData(0, 0)
tdLog.info(f"Data count after third compaction: {count_after_third_compact}")
# Check individual records to see what was preserved
tdSql.query(f"SELECT * FROM stb_alter_keep WHERE ts = {now}")
tdSql.checkEqual(tdSql.queryRows, 1)
tdSql.query(f"SELECT * FROM stb_alter_keep WHERE ts = {day0_ts}")
tdSql.checkEqual(tdSql.queryRows, 1)
tdSql.query(f"SELECT * FROM stb_alter_keep WHERE ts = {day1_ts}")
tdSql.checkEqual(tdSql.queryRows, 1)
# Expected count should be the sum of records actually preserved
expected_preserved_count = 3
tdSql.query(f"SELECT COUNT(*) FROM stb_alter_keep")
tdSql.checkEqual(tdSql.getData(0, 0), expected_preserved_count)
def run(self):
tdLog.debug(f"Start to execute {__file__}")
# 1. Test case 1: STB keep=2, DB keep=10
self.test_case1_stb_keep_2_db_keep_10()
# 2. Test case 2: STB keep=6, DB keep=4
self.test_case2_stb_keep_6_db_keep_4()
# 3. Test case 3: Multiple STBs with different keep values
self.test_case3_multiple_stbs_with_different_keep()
# 4. Test case 4: Boundary keep duration ratio
self.test_case4_boundary_keep_duration_ratio()
# 5. Test case 5: Write time with keep restrictions
self.test_case5_write_time_with_keep_restrictions()
# 6. Test case 6: DB keep=8, STB keep=4
self.test_case6_db_keep_8_stb_keep_4()
# 7. Test case 7: ALTER STABLE KEEP parameter
self.test_case7_alter_stb_keep()
# 8. Test case 8: STB keep compact with database keep
self.test_case8_stb_keep_compact_with_db_keep()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -75,6 +75,11 @@
,,n,army,python3 ./test.py -f tmq/drop_lost_comsumers.py
,,y,army,./pytest.sh python3 ./test.py -f cmdline/taosCli.py -B
,,n,army,python3 ./test.py -f whole/checkErrorCode.py
,,y,army,./pytest.sh python3 ./test.py -f create/create_stb_keep.py
,,y,army,./pytest.sh python3 ./test.py -f create/create_stb_keep.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f create/test_stb_keep_compact.py
,,y,army,./pytest.sh python3 ./test.py -f create/test_stb_keep_compact.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f create/test_stb_keep_compact.py -N 3 -M 3
#
# army/tools