diff --git a/docs/en/07-develop/07-tmq.mdx b/docs/en/07-develop/07-tmq.mdx index f5e0378a00..506a8dcc46 100644 --- a/docs/en/07-develop/07-tmq.mdx +++ b/docs/en/07-develop/07-tmq.mdx @@ -81,10 +81,6 @@ Set subscription() throws SQLException; ConsumerRecords poll(Duration timeout) throws SQLException; -void commitAsync(); - -void commitAsync(OffsetCommitCallback callback); - void commitSync() throws SQLException; void close() throws SQLException; diff --git a/docs/en/12-taos-sql/22-meta.md b/docs/en/12-taos-sql/22-meta.md index 4123bdfb58..f165470d10 100644 --- a/docs/en/12-taos-sql/22-meta.md +++ b/docs/en/12-taos-sql/22-meta.md @@ -283,6 +283,8 @@ Provides dnode configuration information. | 2 | consumer_group | BINARY(193) | Subscribed consumer group | | 3 | vgroup_id | INT | Vgroup ID for the consumer | | 4 | consumer_id | BIGINT | Consumer ID | +| 5 | offset | BINARY(64) | Consumption progress | +| 6 | rows | BIGINT | Number of consumption items | ## INS_STREAMS diff --git a/docs/en/14-reference/03-connector/04-java.mdx b/docs/en/14-reference/03-connector/04-java.mdx index e8c407b125..b68aeda94c 100644 --- a/docs/en/14-reference/03-connector/04-java.mdx +++ b/docs/en/14-reference/03-connector/04-java.mdx @@ -36,15 +36,16 @@ REST connection supports all platforms that can run Java. | taos-jdbcdriver version | major changes | TDengine version | | :---------------------: | :------------------------------------------------------------------------------------------------------------------------------------------------: | :--------------: | +| 3.2.4 | Subscription add the enable.auto.commit parameter and the unsubscribe() method in the WebSocket connection | 3.0.5.0 or later | | 3.2.3 | Fixed resultSet data parsing failure in some cases | 3.0.5.0 or later | -| 3.2.2 | subscription add seek function | 3.0.5.0 or later | +| 3.2.2 | Subscription add seek function | 3.0.5.0 or later | | 3.2.1 | JDBC REST connection supports schemaless/prepareStatement over WebSocket | 3.0.3.0 or later | | 3.2.0 | This version has been deprecated | - | | 3.1.0 | JDBC REST connection supports subscription over WebSocket | - | | 3.0.1 - 3.0.4 | fix the resultSet data is parsed incorrectly sometimes. 3.0.1 is compiled on JDK 11, you are advised to use other version in the JDK 8 environment | - | | 3.0.0 | Support for TDengine 3.0 | 3.0.0.0 or later | -| 2.0.42 | fix wasNull interface return value in WebSocket connection | - | -| 2.0.41 | fix decode method of username and password in REST connection | - | +| 2.0.42 | Fix wasNull interface return value in WebSocket connection | - | +| 2.0.41 | Fix decode method of username and password in REST connection | - | | 2.0.39 - 2.0.40 | Add REST connection/request timeout parameters | - | | 2.0.38 | JDBC REST connections add bulk pull function | - | | 2.0.37 | Support json tags | - | diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index 54a8af2287..38b91d7cea 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -81,10 +81,6 @@ Set subscription() throws SQLException; ConsumerRecords poll(Duration timeout) throws SQLException; -void commitAsync(); - -void commitAsync(OffsetCommitCallback callback); - void commitSync() throws SQLException; void close() throws SQLException; diff --git a/docs/zh/08-connector/14-java.mdx b/docs/zh/08-connector/14-java.mdx index c7da2bd4f5..96f8991eea 100644 --- a/docs/zh/08-connector/14-java.mdx +++ b/docs/zh/08-connector/14-java.mdx @@ -36,6 +36,7 @@ REST 连接支持所有能运行 Java 的平台。 | taos-jdbcdriver 版本 | 主要变化 | TDengine 版本 | | :------------------: | :----------------------------------------------------------------------------------------------------------------------------------------------------: | :----------------: | +| 3.2.4 | 数据订阅在 WebSocket 连接下增加 enable.auto.commit 参数,以及 unsubscribe() 方法。 | - | | 3.2.3 | 修复 ResultSet 在一些情况数据解析失败 | - | | 3.2.2 | 新增功能:数据订阅支持 seek 功能。 | 3.0.5.0 及更高版本 | | 3.2.1 | 新增功能:WebSocket 连接支持 schemaless 与 prepareStatement 写入。变更:consumer poll 返回结果集为 ConsumerRecord,可通过 value() 获取指定结果集数据。 | 3.0.3.0 及更高版本 | diff --git a/docs/zh/12-taos-sql/22-meta.md b/docs/zh/12-taos-sql/22-meta.md index 3fffbd0706..fe8d6d4c69 100644 --- a/docs/zh/12-taos-sql/22-meta.md +++ b/docs/zh/12-taos-sql/22-meta.md @@ -284,6 +284,8 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数 | 2 | consumer_group | BINARY(193) | 订阅者的消费者组 | | 3 | vgroup_id | INT | 消费者被分配的 vgroup id | | 4 | consumer_id | BIGINT | 消费者的唯一 id | +| 5 | offset | BINARY(64) | 消费者的消费进度 | +| 6 | rows | BIGINT | 消费者的消费的数据条数 | ## INS_STREAMS diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 6e182c1c35..126da5b4e8 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3025,6 +3025,7 @@ typedef struct { char* sql; char* ast; int64_t deleteMark; + int64_t lastTs; } SMCreateSmaReq; int32_t tSerializeSMCreateSmaReq(void* buf, int32_t bufLen, SMCreateSmaReq* pReq); diff --git a/include/libs/nodes/cmdnodes.h b/include/libs/nodes/cmdnodes.h index 3ac971344b..bd0b70c310 100644 --- a/include/libs/nodes/cmdnodes.h +++ b/include/libs/nodes/cmdnodes.h @@ -319,19 +319,22 @@ typedef struct SIndexOptions { SNode* pInterval; SNode* pOffset; SNode* pSliding; + int8_t tsPrecision; SNode* pStreamOptions; } SIndexOptions; typedef struct SCreateIndexStmt { - ENodeType type; - EIndexType indexType; - bool ignoreExists; - char indexDbName[TSDB_DB_NAME_LEN]; - char indexName[TSDB_INDEX_NAME_LEN]; - char dbName[TSDB_DB_NAME_LEN]; - char tableName[TSDB_TABLE_NAME_LEN]; - SNodeList* pCols; - SIndexOptions* pOptions; + ENodeType type; + EIndexType indexType; + bool ignoreExists; + char indexDbName[TSDB_DB_NAME_LEN]; + char indexName[TSDB_INDEX_NAME_LEN]; + char dbName[TSDB_DB_NAME_LEN]; + char tableName[TSDB_TABLE_NAME_LEN]; + SNodeList* pCols; + SIndexOptions* pOptions; + SNode* pPrevQuery; + SMCreateSmaReq* pReq; } SCreateIndexStmt; typedef struct SDropIndexStmt { diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 772a668f0f..0cd73f2d9a 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -706,6 +706,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_PAR_INVALID_TIMELINE_QUERY TAOS_DEF_ERROR_CODE(0, 0x2666) #define TSDB_CODE_PAR_INVALID_OPTR_USAGE TAOS_DEF_ERROR_CODE(0, 0x2667) #define TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x2668) +#define TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED TAOS_DEF_ERROR_CODE(0, 0x2669) #define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF) //planner diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index debb93e8ba..adb3dd48c6 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -835,6 +835,7 @@ int32_t tSerializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pReq if (tEncodeBinary(&encoder, pReq->ast, pReq->astLen) < 0) return -1; } if (tEncodeI64(&encoder, pReq->deleteMark) < 0) return -1; + if (tEncodeI64(&encoder, pReq->lastTs) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -884,6 +885,7 @@ int32_t tDeserializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pR if (tDecodeCStrTo(&decoder, pReq->ast) < 0) return -1; } if (tDecodeI64(&decoder, &pReq->deleteMark) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->lastTs) < 0) return -1; tEndDecode(&decoder); tDecoderClear(&decoder); return 0; diff --git a/source/dnode/vnode/src/inc/metaTtl.h b/source/dnode/vnode/src/inc/metaTtl.h index bf3b897c6f..a3d3ceab24 100644 --- a/source/dnode/vnode/src/inc/metaTtl.h +++ b/source/dnode/vnode/src/inc/metaTtl.h @@ -79,16 +79,18 @@ typedef struct { TXN* pTxn; } STtlDelTtlCtx; -int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback); -int ttlMgrClose(STtlManger* pTtlMgr); -int ttlMgrBegin(STtlManger* pTtlMgr, void* pMeta); +int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback); +void ttlMgrClose(STtlManger* pTtlMgr); +int ttlMgrPostOpen(STtlManger* pTtlMgr, void* pMeta); -int ttlMgrConvert(TTB* pOldTtlIdx, TTB* pNewTtlIdx, void* pMeta); -int ttlMgrFlush(STtlManger* pTtlMgr, TXN* pTxn); +bool ttlMgrNeedUpgrade(TDB* pEnv); +int ttlMgrUpgrade(STtlManger* pTtlMgr, void* pMeta); int ttlMgrInsertTtl(STtlManger* pTtlMgr, const STtlUpdTtlCtx* pUpdCtx); int ttlMgrDeleteTtl(STtlManger* pTtlMgr, const STtlDelTtlCtx* pDelCtx); int ttlMgrUpdateChangeTime(STtlManger* pTtlMgr, const STtlUpdCtimeCtx* pUpdCtimeCtx); + +int ttlMgrFlush(STtlManger* pTtlMgr, TXN* pTxn); int ttlMgrFindExpired(STtlManger* pTtlMgr, int64_t timePointMs, SArray* pTbUids); #ifdef __cplusplus diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 6b403ca498..efe5f358bf 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -136,6 +136,7 @@ typedef struct STbUidStore STbUidStore; #define META_BEGIN_HEAP_NIL 2 int metaOpen(SVnode* pVnode, SMeta** ppMeta, int8_t rollback); +int metaUpgrade(SVnode* pVnode, SMeta** ppMeta); int metaClose(SMeta** pMeta); int metaBegin(SMeta* pMeta, int8_t fromSys); TXN* metaGetTxn(SMeta* pMeta); diff --git a/source/dnode/vnode/src/meta/metaCommit.c b/source/dnode/vnode/src/meta/metaCommit.c index 1fa5b9c1e9..d262567953 100644 --- a/source/dnode/vnode/src/meta/metaCommit.c +++ b/source/dnode/vnode/src/meta/metaCommit.c @@ -40,10 +40,6 @@ int metaBegin(SMeta *pMeta, int8_t heap) { return -1; } - if (ttlMgrBegin(pMeta->pTtlMgr, pMeta) < 0) { - return -1; - } - tdbCommit(pMeta->pEnv, pMeta->txn); return 0; diff --git a/source/dnode/vnode/src/meta/metaOpen.c b/source/dnode/vnode/src/meta/metaOpen.c index fb17aff318..511cc8d6ec 100644 --- a/source/dnode/vnode/src/meta/metaOpen.c +++ b/source/dnode/vnode/src/meta/metaOpen.c @@ -29,6 +29,8 @@ static int ncolIdxCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen static int32_t metaInitLock(SMeta *pMeta) { return taosThreadRwlockInit(&pMeta->lock, NULL); } static int32_t metaDestroyLock(SMeta *pMeta) { return taosThreadRwlockDestroy(&pMeta->lock); } +static void metaCleanup(SMeta **ppMeta); + int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) { SMeta *pMeta = NULL; int ret; @@ -180,51 +182,43 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) { return 0; _err: - if (pMeta->pIdx) metaCloseIdx(pMeta); - if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb); - if (pMeta->pNcolIdx) tdbTbClose(pMeta->pNcolIdx); - if (pMeta->pBtimeIdx) tdbTbClose(pMeta->pBtimeIdx); - if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx); - if (pMeta->pTtlMgr) ttlMgrClose(pMeta->pTtlMgr); - if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx); - if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx); - if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx); - if (pMeta->pSuidIdx) tdbTbClose(pMeta->pSuidIdx); - if (pMeta->pNameIdx) tdbTbClose(pMeta->pNameIdx); - if (pMeta->pUidIdx) tdbTbClose(pMeta->pUidIdx); - if (pMeta->pSkmDb) tdbTbClose(pMeta->pSkmDb); - if (pMeta->pTbDb) tdbTbClose(pMeta->pTbDb); - if (pMeta->pEnv) tdbClose(pMeta->pEnv); - metaDestroyLock(pMeta); - taosMemoryFree(pMeta); + metaCleanup(&pMeta); return -1; } -int metaClose(SMeta **ppMeta) { +int metaUpgrade(SVnode *pVnode, SMeta **ppMeta) { + int code = TSDB_CODE_SUCCESS; SMeta *pMeta = *ppMeta; - if (pMeta) { - if (pMeta->pEnv) metaAbort(pMeta); - if (pMeta->pCache) metaCacheClose(pMeta); - if (pMeta->pIdx) metaCloseIdx(pMeta); - if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb); - if (pMeta->pNcolIdx) tdbTbClose(pMeta->pNcolIdx); - if (pMeta->pBtimeIdx) tdbTbClose(pMeta->pBtimeIdx); - if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx); - if (pMeta->pTtlMgr) ttlMgrClose(pMeta->pTtlMgr); - if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx); - if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx); - if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx); - if (pMeta->pSuidIdx) tdbTbClose(pMeta->pSuidIdx); - if (pMeta->pNameIdx) tdbTbClose(pMeta->pNameIdx); - if (pMeta->pUidIdx) tdbTbClose(pMeta->pUidIdx); - if (pMeta->pSkmDb) tdbTbClose(pMeta->pSkmDb); - if (pMeta->pTbDb) tdbTbClose(pMeta->pTbDb); - if (pMeta->pEnv) tdbClose(pMeta->pEnv); - metaDestroyLock(pMeta); - taosMemoryFreeClear(*ppMeta); + if (ttlMgrNeedUpgrade(pMeta->pEnv)) { + code = metaBegin(pMeta, META_BEGIN_HEAP_OS); + if (code < 0) { + metaError("vgId:%d, failed to upgrade meta, meta begin failed since %s", TD_VID(pVnode), tstrerror(terrno)); + goto _err; + } + + code = ttlMgrUpgrade(pMeta->pTtlMgr, pMeta); + if (code < 0) { + metaError("vgId:%d, failed to upgrade meta ttl since %s", TD_VID(pVnode), tstrerror(terrno)); + goto _err; + } + + code = metaCommit(pMeta, pMeta->txn); + if (code < 0) { + metaError("vgId:%d, failed to upgrade meta ttl, meta commit failed since %s", TD_VID(pVnode), tstrerror(terrno)); + goto _err; + } } + return TSDB_CODE_SUCCESS; + +_err: + metaCleanup(ppMeta); + return code; +} + +int metaClose(SMeta **ppMeta) { + metaCleanup(ppMeta); return 0; } @@ -270,6 +264,32 @@ int32_t metaULock(SMeta *pMeta) { return ret; } +static void metaCleanup(SMeta **ppMeta) { + SMeta *pMeta = *ppMeta; + if (pMeta) { + if (pMeta->pEnv) metaAbort(pMeta); + if (pMeta->pCache) metaCacheClose(pMeta); + if (pMeta->pIdx) metaCloseIdx(pMeta); + if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb); + if (pMeta->pNcolIdx) tdbTbClose(pMeta->pNcolIdx); + if (pMeta->pBtimeIdx) tdbTbClose(pMeta->pBtimeIdx); + if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx); + if (pMeta->pTtlMgr) ttlMgrClose(pMeta->pTtlMgr); + if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx); + if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx); + if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx); + if (pMeta->pSuidIdx) tdbTbClose(pMeta->pSuidIdx); + if (pMeta->pNameIdx) tdbTbClose(pMeta->pNameIdx); + if (pMeta->pUidIdx) tdbTbClose(pMeta->pUidIdx); + if (pMeta->pSkmDb) tdbTbClose(pMeta->pSkmDb); + if (pMeta->pTbDb) tdbTbClose(pMeta->pTbDb); + if (pMeta->pEnv) tdbClose(pMeta->pEnv); + metaDestroyLock(pMeta); + + taosMemoryFreeClear(*ppMeta); + } +} + static int tbDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) { STbDbKey *pTbDbKey1 = (STbDbKey *)pKey1; STbDbKey *pTbDbKey2 = (STbDbKey *)pKey2; diff --git a/source/dnode/vnode/src/meta/metaTtl.c b/source/dnode/vnode/src/meta/metaTtl.c index af4827a9c7..c6cb826149 100644 --- a/source/dnode/vnode/src/meta/metaTtl.c +++ b/source/dnode/vnode/src/meta/metaTtl.c @@ -21,6 +21,10 @@ typedef struct { SMeta *pMeta; } SConvertData; +static void ttlMgrCleanup(STtlManger *pTtlMgr); + +static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta); + static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid); static int ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int ttlIdxKeyV1Cmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); @@ -36,27 +40,17 @@ const char *ttlTbname = "ttl.idx"; const char *ttlV1Tbname = "ttlv1.idx"; int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) { - int ret; + int ret = TSDB_CODE_SUCCESS; + int64_t startNs = taosGetTimestampNs(); *ppTtlMgr = NULL; STtlManger *pTtlMgr = (STtlManger *)tdbOsCalloc(1, sizeof(*pTtlMgr)); - if (pTtlMgr == NULL) { - return -1; - } - - if (tdbTbExist(ttlTbname, pEnv)) { - ret = tdbTbOpen(ttlTbname, sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, pEnv, &pTtlMgr->pOldTtlIdx, rollback); - if (ret < 0) { - metaError("failed to open %s index since %s", ttlTbname, tstrerror(terrno)); - return ret; - } - } + if (pTtlMgr == NULL) return TSDB_CODE_OUT_OF_MEMORY; ret = tdbTbOpen(ttlV1Tbname, TDB_VARIANT_LEN, TDB_VARIANT_LEN, ttlIdxKeyV1Cmpr, pEnv, &pTtlMgr->pTtlIdx, rollback); if (ret < 0) { metaError("failed to open %s since %s", ttlV1Tbname, tstrerror(terrno)); - tdbOsFree(pTtlMgr); return ret; } @@ -66,42 +60,57 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) { taosThreadRwlockInit(&pTtlMgr->lock, NULL); + ret = ttlMgrFillCache(pTtlMgr); + if (ret < 0) { + metaError("failed to fill hash since %s", tstrerror(terrno)); + ttlMgrCleanup(pTtlMgr); + return ret; + } + + int64_t endNs = taosGetTimestampNs(); + metaInfo("ttl mgr open end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache), + endNs - startNs); + *ppTtlMgr = pTtlMgr; - return 0; + return TSDB_CODE_SUCCESS; } -int ttlMgrClose(STtlManger *pTtlMgr) { - taosHashCleanup(pTtlMgr->pTtlCache); - taosHashCleanup(pTtlMgr->pDirtyUids); - tdbTbClose(pTtlMgr->pTtlIdx); - taosThreadRwlockDestroy(&pTtlMgr->lock); - tdbOsFree(pTtlMgr); - return 0; +void ttlMgrClose(STtlManger *pTtlMgr) { ttlMgrCleanup(pTtlMgr); } + +bool ttlMgrNeedUpgrade(TDB *pEnv) { + bool needUpgrade = tdbTbExist(ttlTbname, pEnv); + if (needUpgrade) { + metaInfo("find ttl idx in old version , will convert"); + } + return needUpgrade; } -int ttlMgrBegin(STtlManger *pTtlMgr, void *pMeta) { - metaInfo("ttl mgr start open"); - int ret; +int ttlMgrUpgrade(STtlManger *pTtlMgr, void *pMeta) { + SMeta *meta = (SMeta *)pMeta; + int ret = TSDB_CODE_SUCCESS; + + if (!tdbTbExist(ttlTbname, meta->pEnv)) return TSDB_CODE_SUCCESS; + + metaInfo("ttl mgr start upgrade"); int64_t startNs = taosGetTimestampNs(); - SMeta *meta = (SMeta *)pMeta; + ret = tdbTbOpen(ttlTbname, sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, meta->pEnv, &pTtlMgr->pOldTtlIdx, 0); + if (ret < 0) { + metaError("failed to open %s index since %s", ttlTbname, tstrerror(terrno)); + goto _out; + } - if (pTtlMgr->pOldTtlIdx) { - ret = ttlMgrConvert(pTtlMgr->pOldTtlIdx, pTtlMgr->pTtlIdx, pMeta); - if (ret < 0) { - metaError("failed to convert ttl index since %s", tstrerror(terrno)); - goto _out; - } + ret = ttlMgrConvert(pTtlMgr->pOldTtlIdx, pTtlMgr->pTtlIdx, pMeta); + if (ret < 0) { + metaError("failed to convert ttl index since %s", tstrerror(terrno)); + goto _out; + } - ret = tdbTbDropByName(ttlTbname, meta->pEnv, meta->txn); - if (ret < 0) { - metaError("failed to drop old ttl index since %s", tstrerror(terrno)); - goto _out; - } - - tdbTbClose(pTtlMgr->pOldTtlIdx); - pTtlMgr->pOldTtlIdx = NULL; + ret = tdbTbDropByName(ttlTbname, meta->pEnv, meta->txn); + if (ret < 0) { + metaError("failed to drop old ttl index since %s", tstrerror(terrno)); + goto _out; } ret = ttlMgrFillCache(pTtlMgr); @@ -111,13 +120,23 @@ int ttlMgrBegin(STtlManger *pTtlMgr, void *pMeta) { } int64_t endNs = taosGetTimestampNs(); - - metaInfo("ttl mgr open end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache), + metaInfo("ttl mgr upgrade end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache), endNs - startNs); _out: + tdbTbClose(pTtlMgr->pOldTtlIdx); + pTtlMgr->pOldTtlIdx = NULL; + return ret; } +static void ttlMgrCleanup(STtlManger *pTtlMgr) { + taosHashCleanup(pTtlMgr->pTtlCache); + taosHashCleanup(pTtlMgr->pDirtyUids); + tdbTbClose(pTtlMgr->pTtlIdx); + taosThreadRwlockDestroy(&pTtlMgr->lock); + tdbOsFree(pTtlMgr); +} + static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid) { if (ttlDays <= 0) return; @@ -205,7 +224,7 @@ _out: return ret; } -int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta) { +static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta) { SMeta *meta = pMeta; metaInfo("ttlMgr convert ttl start."); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 64546bb4a1..8967e9dc62 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -76,7 +76,7 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *p } SSyncCfg *pCfg = &info.config.syncCfg; - + pCfg->replicaNum = 0; pCfg->totalReplicaNum = 0; memset(&pCfg->nodeInfo, 0, sizeof(pCfg->nodeInfo)); @@ -109,7 +109,7 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *p pCfg->myIndex = pReq->replica + pReq->learnerSelfIndex; } - vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d", + vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d", pReq->vgId, pCfg->replicaNum, pCfg->totalReplicaNum, pCfg->myIndex); info.config.syncCfg = *pCfg; @@ -372,6 +372,10 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { goto _err; } + if (metaUpgrade(pVnode, &pVnode->pMeta) < 0) { + vError("vgId:%d, failed to upgrade meta since %s", TD_VID(pVnode), tstrerror(terrno)); + } + // open tsdb if (!VND_IS_RSMA(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL, rollback) < 0) { vError("vgId:%d, failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno)); diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 15232b95b6..c8197721fb 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -907,6 +907,10 @@ void nodesDestroyNode(SNode* pNode) { SCreateIndexStmt* pStmt = (SCreateIndexStmt*)pNode; nodesDestroyNode((SNode*)pStmt->pOptions); nodesDestroyList(pStmt->pCols); + if (pStmt->pReq) { + tFreeSMCreateSmaReq(pStmt->pReq); + taosMemoryFreeClear(pStmt->pReq); + } break; } case QUERY_NODE_DROP_INDEX_STMT: // no pointer field @@ -1053,6 +1057,7 @@ void nodesDestroyNode(SNode* pNode) { } case QUERY_NODE_QUERY: { SQuery* pQuery = (SQuery*)pNode; + nodesDestroyNode(pQuery->pPrevRoot); nodesDestroyNode(pQuery->pRoot); nodesDestroyNode(pQuery->pPostRoot); taosMemoryFreeClear(pQuery->pResSchema); diff --git a/source/libs/parser/inc/parInt.h b/source/libs/parser/inc/parInt.h index d79aa84bb8..69253e62e2 100644 --- a/source/libs/parser/inc/parInt.h +++ b/source/libs/parser/inc/parInt.h @@ -35,6 +35,7 @@ int32_t translate(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* pMe int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema); int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery); int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, void** pResRow); +int32_t translatePostCreateSmaIndex(SParseContext* pParseCxt, SQuery* pQuery, void** pResRow); #ifdef __cplusplus } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 8fc4be5f95..6f7f052158 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3520,6 +3520,10 @@ static int32_t translateWindow(STranslateContext* pCxt, SSelectStmt* pSelect) { if (NULL == pSelect->pWindow) { return TSDB_CODE_SUCCESS; } + if (pSelect->pFromTable->type == QUERY_NODE_REAL_TABLE && + ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType == TSDB_SYSTEM_TABLE) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED, "WINDOW"); + } pCxt->currClause = SQL_CLAUSE_WINDOW; int32_t code = translateExpr(pCxt, &pSelect->pWindow); if (TSDB_CODE_SUCCESS == code) { @@ -5803,6 +5807,15 @@ static int32_t buildCreateSmaReq(STranslateContext* pCxt, SCreateIndexStmt* pStm if (TSDB_CODE_SUCCESS == code) { code = getSmaIndexAst(pCxt, pStmt, &pReq->ast, &pReq->astLen, &pReq->expr, &pReq->exprLen); } + if (TSDB_CODE_SUCCESS == code) { + STableMeta* pMetaCache = NULL; + code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pMetaCache); + if (TSDB_CODE_SUCCESS == code) { + pStmt->pOptions->tsPrecision = pMetaCache->tableInfo.precision; + code = createLastTsSelectStmt(pStmt->dbName, pStmt->tableName, pMetaCache, &pStmt->pPrevQuery); + } + taosMemoryFreeClear(pMetaCache); + } return code; } @@ -5828,15 +5841,60 @@ static int32_t checkCreateSmaIndex(STranslateContext* pCxt, SCreateIndexStmt* pS } static int32_t translateCreateSmaIndex(STranslateContext* pCxt, SCreateIndexStmt* pStmt) { - SMCreateSmaReq createSmaReq = {0}; int32_t code = checkCreateSmaIndex(pCxt, pStmt); + pStmt->pReq = taosMemoryCalloc(1, sizeof(SMCreateSmaReq)); + if (pStmt->pReq == NULL) code = TSDB_CODE_OUT_OF_MEMORY; if (TSDB_CODE_SUCCESS == code) { - code = buildCreateSmaReq(pCxt, pStmt, &createSmaReq); + code = buildCreateSmaReq(pCxt, pStmt, pStmt->pReq); + } + TSWAP(pCxt->pPrevRoot, pStmt->pPrevQuery); + return code; +} + +int32_t createIntervalFromCreateSmaIndexStmt(SCreateIndexStmt* pStmt, SInterval* pInterval) { + pInterval->interval = ((SValueNode*)pStmt->pOptions->pInterval)->datum.i; + pInterval->intervalUnit = ((SValueNode*)pStmt->pOptions->pInterval)->unit; + pInterval->offset = NULL != pStmt->pOptions->pOffset ? ((SValueNode*)pStmt->pOptions->pOffset)->datum.i : 0; + pInterval->sliding = NULL != pStmt->pOptions->pSliding ? ((SValueNode*)pStmt->pOptions->pSliding)->datum.i : pInterval->interval; + pInterval->slidingUnit = NULL != pStmt->pOptions->pSliding ? ((SValueNode*)pStmt->pOptions->pSliding)->unit : pInterval->intervalUnit; + pInterval->precision = pStmt->pOptions->tsPrecision; + return TSDB_CODE_SUCCESS; +} + +int32_t translatePostCreateSmaIndex(SParseContext* pParseCxt, SQuery* pQuery, void ** pResRow) { + int32_t code = TSDB_CODE_SUCCESS; + SCreateIndexStmt* pStmt = (SCreateIndexStmt*)pQuery->pRoot; + int64_t lastTs = 0; + SInterval interval = {0}; + STranslateContext pCxt = {0}; + code = initTranslateContext(pParseCxt, NULL, &pCxt); + if (TSDB_CODE_SUCCESS == code) { + code = createIntervalFromCreateSmaIndexStmt(pStmt, &interval); } if (TSDB_CODE_SUCCESS == code) { - code = buildCmdMsg(pCxt, TDMT_MND_CREATE_SMA, (FSerializeFunc)tSerializeSMCreateSmaReq, &createSmaReq); + if (pResRow && pResRow[0]) { + lastTs = *(int64_t*)pResRow[0]; + } else if (interval.interval > 0) { + lastTs = convertTimePrecision(taosGetTimestampMs(), TSDB_TIME_PRECISION_MILLI, interval.precision); + } else { + lastTs = taosGetTimestampMs(); + } } - tFreeSMCreateSmaReq(&createSmaReq); + if (TSDB_CODE_SUCCESS == code) { + if (interval.interval > 0) { + pStmt->pReq->lastTs = taosTimeTruncate(lastTs, &interval); + } else { + pStmt->pReq->lastTs = lastTs; + } + code = buildCmdMsg(&pCxt, TDMT_MND_CREATE_SMA, (FSerializeFunc)tSerializeSMCreateSmaReq, pStmt->pReq); + } + if (TSDB_CODE_SUCCESS == code) { + code = setQuery(&pCxt, pQuery); + } + setRefreshMate(&pCxt, pQuery); + destroyTranslateContext(&pCxt); + tFreeSMCreateSmaReq(pStmt->pReq); + taosMemoryFreeClear(pStmt->pReq); return code; } @@ -6989,7 +7047,7 @@ static int32_t translateCreateStream(STranslateContext* pCxt, SCreateStreamStmt* return code; } -int32_t buildIntervalForCreateStream(SCreateStreamStmt* pStmt, SInterval* pInterval) { +static int32_t buildIntervalForCreateStream(SCreateStreamStmt* pStmt, SInterval* pInterval) { int32_t code = TSDB_CODE_SUCCESS; if (QUERY_NODE_SELECT_STMT != nodeType(pStmt->pQuery)) { return code; diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index f82d56ac56..263318b92f 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -172,6 +172,8 @@ static char* getSyntaxErrFormat(int32_t errCode) { return "%s function is not supported in group query"; case TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED_FUNC: return "%s function is not supported in system table query"; + case TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED: + return "%s is not supported in system table query"; case TSDB_CODE_PAR_INVALID_INTERP_CLAUSE: return "Invalid usage of RANGE clause, EVERY clause or FILL clause"; case TSDB_CODE_PAR_NO_VALID_FUNC_IN_WIN: diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index cbddaf8115..10fda8741b 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -227,6 +227,8 @@ int32_t qContinueParsePostQuery(SParseContext* pCxt, SQuery* pQuery, void** pRes case QUERY_NODE_CREATE_STREAM_STMT: code = translatePostCreateStream(pCxt, pQuery, pResRow); break; + case QUERY_NODE_CREATE_INDEX_STMT: + code = translatePostCreateSmaIndex(pCxt, pQuery, pResRow); default: break; } diff --git a/source/libs/parser/test/parInitialCTest.cpp b/source/libs/parser/test/parInitialCTest.cpp index 6d27bb0d29..856fdb4804 100644 --- a/source/libs/parser/test/parInitialCTest.cpp +++ b/source/libs/parser/test/parInitialCTest.cpp @@ -542,6 +542,18 @@ TEST_F(ParserInitialCTest, createSmaIndex) { setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) { ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_CREATE_INDEX_STMT); SMCreateSmaReq req = {0}; + ASSERT_TRUE(pQuery->pPrevRoot); + ASSERT_EQ(QUERY_NODE_SELECT_STMT, nodeType(pQuery->pPrevRoot)); + + SCreateIndexStmt* pStmt = (SCreateIndexStmt*)pQuery->pRoot; + SCmdMsgInfo* pCmdMsg = (SCmdMsgInfo*)taosMemoryMalloc(sizeof(SCmdMsgInfo)); + if (NULL == pCmdMsg) FAIL(); + pCmdMsg->msgType = TDMT_MND_CREATE_SMA; + pCmdMsg->msgLen = tSerializeSMCreateSmaReq(NULL, 0, pStmt->pReq); + pCmdMsg->pMsg = taosMemoryMalloc(pCmdMsg->msgLen); + if (!pCmdMsg->pMsg) FAIL(); + tSerializeSMCreateSmaReq(pCmdMsg->pMsg, pCmdMsg->msgLen, pStmt->pReq); + ((SQuery*)pQuery)->pCmdMsg = pCmdMsg; ASSERT_TRUE(TSDB_CODE_SUCCESS == tDeserializeSMCreateSmaReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req)); ASSERT_EQ(std::string(req.name), std::string(expect.name)); diff --git a/source/libs/parser/test/parInitialDTest.cpp b/source/libs/parser/test/parInitialDTest.cpp index cddd2aa8f7..937f76176e 100644 --- a/source/libs/parser/test/parInitialDTest.cpp +++ b/source/libs/parser/test/parInitialDTest.cpp @@ -291,4 +291,13 @@ TEST_F(ParserInitialDTest, dropUser) { run("DROP USER wxy"); } +TEST_F(ParserInitialDTest, IntervalOnSysTable) { + login("root"); + run("SELECT count('reboot_time') FROM information_schema.ins_dnodes interval(14m) sliding(9m)", + TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED, PARSER_STAGE_TRANSLATE); + + run("SELECT count('create_time') FROM information_schema.ins_qnodes interval(14m) sliding(9m)", + TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED, PARSER_STAGE_TRANSLATE); +} + } // namespace ParserTest diff --git a/source/libs/planner/test/planTestUtil.cpp b/source/libs/planner/test/planTestUtil.cpp index d89e669a90..3b432b9890 100644 --- a/source/libs/planner/test/planTestUtil.cpp +++ b/source/libs/planner/test/planTestUtil.cpp @@ -441,6 +441,16 @@ class PlannerTestBaseImpl { pCxt->topicQuery = true; } else if (QUERY_NODE_CREATE_INDEX_STMT == nodeType(pQuery->pRoot)) { SMCreateSmaReq req = {0}; + SCreateIndexStmt* pStmt = (SCreateIndexStmt*)pQuery->pRoot; + SCmdMsgInfo* pCmdMsg = (SCmdMsgInfo*)taosMemoryMalloc(sizeof(SCmdMsgInfo)); + if (NULL == pCmdMsg) FAIL(); + pCmdMsg->msgType = TDMT_MND_CREATE_SMA; + pCmdMsg->msgLen = tSerializeSMCreateSmaReq(NULL, 0, pStmt->pReq); + pCmdMsg->pMsg = taosMemoryMalloc(pCmdMsg->msgLen); + if (!pCmdMsg->pMsg) FAIL(); + tSerializeSMCreateSmaReq(pCmdMsg->pMsg, pCmdMsg->msgLen, pStmt->pReq); + ((SQuery*)pQuery)->pCmdMsg = pCmdMsg; + tDeserializeSMCreateSmaReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req); g_mockCatalogService->createSmaIndex(&req); nodesStringToNode(req.ast, &pCxt->pAstRoot); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index bcb479e71e..d0d63215e6 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -351,9 +351,13 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId); - qDebug("s-task:%s scan history task end, update stream task:%s info, transfer exec state", pTask->id.idStr, pStreamTask->id.idStr); - - // todo handle stream task is dropped here + if (pStreamTask == NULL) { + qError("s-task:%s failed to find related stream task:0x%x, it may have been destoryed or closed", + pTask->id.idStr, pTask->streamTaskId.taskId); + return TSDB_CODE_STREAM_TASK_NOT_EXIST; + } else { + qDebug("s-task:%s scan history task end, update stream task:%s info, transfer exec state", pTask->id.idStr, pStreamTask->id.idStr); + } ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId); STimeWindow* pTimeWindow = &pStreamTask->dataRange.window; @@ -377,7 +381,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { // update the scan data range for source task. qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64 - ", status:%s, sched-status:%d", + ", status:%s, sched-status:%d", pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN, pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus); } else { @@ -473,6 +477,9 @@ int32_t streamExecForAll(SStreamTask* pTask) { ASSERT(batchSize == 0); if (pTask->info.fillHistory && pTask->status.transferState) { int32_t code = streamTransferStateToStreamTask(pTask); + if (code != TSDB_CODE_SUCCESS) { // todo handle this + return 0; + } } break; @@ -564,7 +571,7 @@ int32_t streamTryExec(SStreamTask* pTask) { if (schedStatus == TASK_SCHED_STATUS__WAITING) { int32_t code = streamExecForAll(pTask); - if (code < 0) { + if (code < 0) { // todo this status shoudl be removed atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED); return -1; } diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index c49b5726b6..08e61c2272 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -233,7 +233,11 @@ int tdbBtreeDelete(SBTree *pBt, const void *pKey, int kLen, TXN *pTxn) { int ret; tdbBtcOpen(&btc, pBt, pTxn); - + /* + btc.coder.ofps = taosArrayInit(8, sizeof(SPage *)); + // btc.coder.ofps = taosArrayInit(8, sizeof(SPgno)); + //pBtc->coder.ofps = taosArrayInit(8, sizeof(SPage *)); + */ tdbTrace("tdb delete, btc: %p, pTxn: %p", &btc, pTxn); // move the cursor @@ -254,7 +258,18 @@ int tdbBtreeDelete(SBTree *pBt, const void *pKey, int kLen, TXN *pTxn) { tdbBtcClose(&btc); return -1; } + /* + SArray *ofps = btc.coder.ofps; + if (ofps) { + for (int i = 0; i < TARRAY_SIZE(ofps); ++i) { + SPage *ofp = *(SPage **)taosArrayGet(ofps, i); + tdbPagerInsertFreePage(btc.pBt->pPager, ofp, btc.pTxn); + } + taosArrayDestroy(ofps); + btc.coder.ofps = NULL; + } + */ tdbBtcClose(&btc); return 0; } @@ -563,6 +578,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx } } // copy the parent key out if child pages are not leaf page + // childNotLeaf = !(TDB_BTREE_PAGE_IS_LEAF(pOlds[0]) || TDB_BTREE_PAGE_IS_OVFL(pOlds[0])); childNotLeaf = !TDB_BTREE_PAGE_IS_LEAF(pOlds[0]); if (childNotLeaf) { for (int i = 0; i < nOlds; i++) { @@ -592,7 +608,30 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx for (int i = 0; i < nOlds; i++) { nCells = TDB_PAGE_TOTAL_CELLS(pParent); if (sIdx < nCells) { + bool destroyOfps = false; + if (!childNotLeaf) { + if (!pParent->pPager->ofps) { + pParent->pPager->ofps = taosArrayInit(8, sizeof(SPage *)); + destroyOfps = true; + } + } + tdbPageDropCell(pParent, sIdx, pTxn, pBt); + + if (!childNotLeaf) { + SArray *ofps = pParent->pPager->ofps; + if (ofps) { + for (int i = 0; i < TARRAY_SIZE(ofps); ++i) { + SPage *ofp = *(SPage **)taosArrayGet(ofps, i); + tdbPagerInsertFreePage(pParent->pPager, ofp, pTxn); + } + + if (destroyOfps) { + taosArrayDestroy(ofps); + pParent->pPager->ofps = NULL; + } + } + } } else { ((SIntHdr *)pParent->pData)->pgno = 0; } @@ -861,6 +900,8 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx if (!TDB_BTREE_PAGE_IS_LEAF(pNews[0])) { ((SIntHdr *)(pParent->pData))->pgno = ((SIntHdr *)(pNews[0]->pData))->pgno; } + + tdbPagerInsertFreePage(pBt->pPager, pNews[0], pTxn); } for (int i = 0; i < 3; i++) { @@ -870,6 +911,9 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx } for (pageIdx = 0; pageIdx < nOlds; ++pageIdx) { + if (pageIdx >= nNews) { + tdbPagerInsertFreePage(pBt->pPager, pOlds[pageIdx], pTxn); + } tdbPagerReturnPage(pBt->pPager, pOlds[pageIdx], pTxn); } for (; pageIdx < nNews; ++pageIdx) { @@ -1311,7 +1355,11 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader, if (ret < 0) { return -1; } - + /* + if (pDecoder->ofps) { + taosArrayPush(pDecoder->ofps, &ofp); + } + */ ofpCell = tdbPageGetCell(ofp, 0); if (nLeft <= ofp->maxLocal - sizeof(SPgno)) { @@ -1346,11 +1394,17 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader, int lastKeyPageSpace = 0; // load left key & val to ovpages while (pgno != 0) { + tdbTrace("tdb decode-ofp, pTxn: %p, pgno:%u by cell:%p", pTxn, pgno, pCell); + // printf("tdb decode-ofp, pTxn: %p, pgno:%u by cell:%p\n", pTxn, pgno, pCell); ret = tdbLoadOvflPage(&pgno, &ofp, pTxn, pBt); if (ret < 0) { return -1; } - + /* + if (pDecoder->ofps) { + taosArrayPush(pDecoder->ofps, &ofp); + } + */ ofpCell = tdbPageGetCell(ofp, 0); int lastKeyPage = 0; @@ -1518,8 +1572,8 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN * if (pPage->vLen == TDB_VARIANT_LEN) { if (!leaf) { - tdbError("tdb/btree-cell-size: not a leaf page."); - return -1; + tdbError("tdb/btree-cell-size: not a leaf page:%p, pgno:%" PRIu32 ".", pPage, TDB_PAGE_PGNO(pPage)); + // return -1; } nHeader += tdbGetVarInt(pCell + nHeader, &vLen); } else if (leaf) { @@ -1559,8 +1613,27 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN * bytes = ofp->maxLocal - sizeof(SPgno); } + // SPgno origPgno = pgno; memcpy(&pgno, ofpCell + bytes, sizeof(pgno)); + ret = tdbPagerWrite(pBt->pPager, ofp); + if (ret < 0) { + tdbError("failed to write page since %s", terrstr()); + return -1; + } + /* + tdbPageDropCell(ofp, 0, pTxn, pBt); + */ + // SIntHdr *pIntHdr = (SIntHdr *)(ofp->pData); + // pIntHdr->flags = TDB_FLAG_ADD(0, TDB_BTREE_OVFL); + // pIntHdr->pgno = 0; + // ofp->pPager = NULL; + + SArray *ofps = pPage->pPager->ofps; + if (ofps) { + taosArrayPush(ofps, &ofp); + } + tdbPagerReturnPage(pPage->pPager, ofp, pTxn); nLeft -= bytes; @@ -1980,6 +2053,11 @@ static int tdbBtcMoveDownward(SBTC *pBtc) { return -1; } + if (TDB_BTREE_PAGE_IS_OVFL(pBtc->pPage)) { + tdbError("tdb/btc-move-downward: should not be a ovfl page here."); + return -1; + } + if (pBtc->idx < TDB_PAGE_TOTAL_CELLS(pBtc->pPage)) { pCell = tdbPageGetCell(pBtc->pPage, pBtc->idx); pgno = ((SPgno *)pCell)[0]; @@ -2068,8 +2146,27 @@ int tdbBtcDelete(SBTC *pBtc) { return -1; } + bool destroyOfps = false; + if (!pBtc->pPage->pPager->ofps) { + pBtc->pPage->pPager->ofps = taosArrayInit(8, sizeof(SPage *)); + destroyOfps = true; + } + tdbPageDropCell(pBtc->pPage, idx, pBtc->pTxn, pBtc->pBt); + SArray *ofps = pBtc->pPage->pPager->ofps; + if (ofps) { + for (int i = 0; i < TARRAY_SIZE(ofps); ++i) { + SPage *ofp = *(SPage **)taosArrayGet(ofps, i); + tdbPagerInsertFreePage(pBtc->pPage->pPager, ofp, pBtc->pTxn); + } + + if (destroyOfps) { + taosArrayDestroy(ofps); + pBtc->pPage->pPager->ofps = NULL; + } + } + // update interior page or do balance if (idx == nCells - 1) { if (idx) { @@ -2113,6 +2210,8 @@ int tdbBtcDelete(SBTC *pBtc) { return -1; } + // printf("tdb/btc-delete: btree balance delete pgno: %d.\n", TDB_PAGE_PGNO(pBtc->pPage)); + ret = tdbBtreeBalance(pBtc); if (ret < 0) { tdbError("tdb/btc-delete: btree balance failed with ret: %d.", ret); @@ -2181,7 +2280,13 @@ int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pData, int tdbError("tdb/btc-upsert: page insert/update cell failed with ret: %d.", ret); return -1; } - + /* + bool destroyOfps = false; + if (!pBtc->pPage->pPager->ofps) { + pBtc->pPage->pPager->ofps = taosArrayInit(8, sizeof(SPage *)); + destroyOfps = true; + } + */ // check balance if (pBtc->pPage->nOverflow > 0) { ret = tdbBtreeBalance(pBtc); @@ -2190,7 +2295,20 @@ int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pData, int return -1; } } + /* + SArray *ofps = pBtc->pPage->pPager->ofps; + if (ofps) { + for (int i = 0; i < TARRAY_SIZE(ofps); ++i) { + SPage *ofp = *(SPage **)taosArrayGet(ofps, i); + tdbPagerInsertFreePage(pBtc->pPage->pPager, ofp, pBtc->pTxn); + } + if (destroyOfps) { + taosArrayDestroy(ofps); + pBtc->pPage->pPager->ofps = NULL; + } + } + */ return 0; } diff --git a/source/libs/tdb/src/db/tdbDb.c b/source/libs/tdb/src/db/tdbDb.c index 952c49db73..fe9d51dc82 100644 --- a/source/libs/tdb/src/db/tdbDb.c +++ b/source/libs/tdb/src/db/tdbDb.c @@ -70,6 +70,11 @@ int32_t tdbOpen(const char *dbname, int32_t szPage, int32_t pages, TDB **ppDb, i if (ret < 0) { return -1; } + + ret = tdbTbOpen(TDB_FREEDB_NAME, sizeof(SPgno), 0, NULL, pDb, &pDb->pFreeDb, rollback); + if (ret < 0) { + return -1; + } #endif *ppDb = pDb; @@ -82,6 +87,7 @@ int tdbClose(TDB *pDb) { if (pDb) { #ifdef USE_MAINDB if (pDb->pMainDb) tdbTbClose(pDb->pMainDb); + if (pDb->pFreeDb) tdbTbClose(pDb->pFreeDb); #endif for (pPager = pDb->pgrList; pPager; pPager = pDb->pgrList) { diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index 5ea9be63db..896b0713df 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -292,7 +292,23 @@ int tdbPagerBegin(SPager *pPager, TXN *pTxn) { */ return 0; } +/* +int tdbPagerCancelDirty(SPager *pPager, SPage *pPage, TXN *pTxn) { + SRBTreeNode *pNode = tRBTreeGet(&pPager->rbt, (SRBTreeNode *)pPage); + if (pNode) { + pPage->isDirty = 0; + tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage); + if (pTxn->jPageSet) { + hashset_remove(pTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage))); + } + + tdbPCacheRelease(pPager->pCache, pPage, pTxn); + } + + return 0; +} +*/ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { SPage *pPage; int ret; @@ -338,10 +354,13 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { if (pTxn->jPageSet) { hashset_remove(pTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage))); } + + tdbTrace("tdb/pager-commit: remove page: %p %d from dirty tree: %p", pPage, TDB_PAGE_PGNO(pPage), &pPager->rbt); + tdbPCacheRelease(pPager->pCache, pPage, pTxn); } - tdbTrace("pager/commit reset dirty tree: %p", &pPager->rbt); + tdbTrace("tdb/pager-commit reset dirty tree: %p", &pPager->rbt); tRBTreeCreate(&pPager->rbt, pageCmpFn); // sync the db file @@ -629,6 +648,8 @@ int tdbPagerFlushPage(SPager *pPager, TXN *pTxn) { return 0; } +static int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno, TXN *pTxn); + int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *, int), void *arg, TXN *pTxn) { SPage *pPage; @@ -643,7 +664,7 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa // alloc new page if (pgno == 0) { loadPage = 0; - ret = tdbPagerAllocPage(pPager, &pgno); + ret = tdbPagerAllocPage(pPager, &pgno, pTxn); if (ret < 0) { tdbError("tdb/pager: %p, ret: %d pgno: %" PRIu32 ", alloc page failed.", pPager, ret, pgno); return -1; @@ -695,23 +716,114 @@ void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn) { // TDB_PAGE_PGNO(pPage), pPage); } -static int tdbPagerAllocFreePage(SPager *pPager, SPgno *ppgno) { - // TODO: Allocate a page from the free list +int tdbPagerInsertFreePage(SPager *pPager, SPage *pPage, TXN *pTxn) { + int code = 0; + SPgno pgno = TDB_PAGE_PGNO(pPage); + + if (pPager->frps) { + taosArrayPush(pPager->frps, &pgno); + pPage->pPager = NULL; + return code; + } + + pPager->frps = taosArrayInit(8, sizeof(SPgno)); + // memset(pPage->pData, 0, pPage->pageSize); + tdbTrace("tdb/insert-free-page: tbc recycle page: %d.", pgno); + // printf("tdb/insert-free-page: tbc recycle page: %d.\n", pgno); + code = tdbTbInsert(pPager->pEnv->pFreeDb, &pgno, sizeof(pgno), NULL, 0, pTxn); + if (code < 0) { + tdbError("tdb/insert-free-page: tb insert failed with ret: %d.", code); + taosArrayDestroy(pPager->frps); + pPager->frps = NULL; + return -1; + } + + while (TARRAY_SIZE(pPager->frps) > 0) { + pgno = *(SPgno *)taosArrayPop(pPager->frps); + + code = tdbTbInsert(pPager->pEnv->pFreeDb, &pgno, sizeof(pgno), NULL, 0, pTxn); + if (code < 0) { + tdbError("tdb/insert-free-page: tb insert failed with ret: %d.", code); + taosArrayDestroy(pPager->frps); + pPager->frps = NULL; + return -1; + } + } + + taosArrayDestroy(pPager->frps); + pPager->frps = NULL; + + pPage->pPager = NULL; + + return code; +} + +static int tdbPagerRemoveFreePage(SPager *pPager, SPgno *pPgno, TXN *pTxn) { + int code = 0; + TBC *pCur; + + if (!pPager->pEnv->pFreeDb) { + return code; + } + + if (pPager->frps) { + return code; + } + + code = tdbTbcOpen(pPager->pEnv->pFreeDb, &pCur, pTxn); + if (code < 0) { + return 0; + } + + code = tdbTbcMoveToFirst(pCur); + if (code) { + tdbError("tdb/remove-free-page: moveto first failed with ret: %d.", code); + tdbTbcClose(pCur); + return 0; + } + + void *pKey = NULL; + int nKey = 0; + + code = tdbTbcGet(pCur, (const void **)&pKey, &nKey, NULL, NULL); + if (code < 0) { + // tdbError("tdb/remove-free-page: tbc get failed with ret: %d.", code); + tdbTbcClose(pCur); + return 0; + } + + *pPgno = *(SPgno *)pKey; + tdbTrace("tdb/remove-free-page: tbc get page: %d.", *pPgno); + // printf("tdb/remove-free-page: tbc get page: %d.\n", *pPgno); + + code = tdbTbcDelete(pCur); + if (code < 0) { + tdbError("tdb/remove-free-page: tbc delete failed with ret: %d.", code); + tdbTbcClose(pCur); + return 0; + } + tdbTbcClose(pCur); return 0; } +static int tdbPagerAllocFreePage(SPager *pPager, SPgno *ppgno, TXN *pTxn) { + // Allocate a page from the free list + return tdbPagerRemoveFreePage(pPager, ppgno, pTxn); +} + static int tdbPagerAllocNewPage(SPager *pPager, SPgno *ppgno) { *ppgno = ++pPager->dbFileSize; + // tdbError("tdb/alloc-new-page: %d.", *ppgno); return 0; } -int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno) { +static int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno, TXN *pTxn) { int ret; *ppgno = 0; // Try to allocate from the free list of the pager - ret = tdbPagerAllocFreePage(pPager, ppgno); + ret = tdbPagerAllocFreePage(pPager, ppgno, pTxn); if (ret < 0) { return -1; } diff --git a/source/libs/tdb/src/inc/tdbInt.h b/source/libs/tdb/src/inc/tdbInt.h index 7a0bcc00a4..8ce294a3c6 100644 --- a/source/libs/tdb/src/inc/tdbInt.h +++ b/source/libs/tdb/src/inc/tdbInt.h @@ -131,13 +131,14 @@ typedef struct SBtInfo { #define TDB_CELLDECODER_FREE_VAL(pCellDecoder) ((pCellDecoder)->freeKV & TDB_CELLD_F_VAL) typedef struct { - int kLen; - u8 *pKey; - int vLen; - u8 *pVal; - SPgno pgno; - u8 *pBuf; - u8 freeKV; + int kLen; + u8 *pKey; + int vLen; + u8 *pVal; + SPgno pgno; + u8 *pBuf; + u8 freeKV; + SArray *ofps; } SCellDecoder; struct SBTC { @@ -198,9 +199,10 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn); int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *, int), void *arg, TXN *pTxn); void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn); -int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno); -int tdbPagerRestoreJournals(SPager *pPager); -int tdbPagerRollback(SPager *pPager); +int tdbPagerInsertFreePage(SPager *pPager, SPage *pPage, TXN *pTxn); +// int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno); +int tdbPagerRestoreJournals(SPager *pPager); +int tdbPagerRollback(SPager *pPager); // tdbPCache.c ==================================== #define TDB_PCACHE_PAGE \ @@ -373,6 +375,7 @@ static inline SCell *tdbPageGetCell(SPage *pPage, int idx) { #ifdef USE_MAINDB #define TDB_MAINDB_NAME "main.tdb" +#define TDB_FREEDB_NAME "_free.db" #endif struct STDB { @@ -386,6 +389,7 @@ struct STDB { SPager **pgrHash; #ifdef USE_MAINDB TTB *pMainDb; + TTB *pFreeDb; #endif int64_t txnId; }; @@ -403,6 +407,8 @@ struct SPager { SRBTree rbt; // u8 inTran; TXN *pActiveTxn; + SArray *ofps; + SArray *frps; SPager *pNext; // used by TDB SPager *pHashNext; // used by TDB #ifdef USE_MAINDB diff --git a/source/libs/tdb/test/CMakeLists.txt b/source/libs/tdb/test/CMakeLists.txt index fd4d7c101d..4715ccbd41 100644 --- a/source/libs/tdb/test/CMakeLists.txt +++ b/source/libs/tdb/test/CMakeLists.txt @@ -14,3 +14,7 @@ target_link_libraries(tdbExOVFLTest tdb gtest gtest_main) add_executable(tdbPageDefragmentTest "tdbPageDefragmentTest.cpp") target_link_libraries(tdbPageDefragmentTest tdb gtest gtest_main) +# page recycling testing +add_executable(tdbPageRecycleTest "tdbPageRecycleTest.cpp") +target_link_libraries(tdbPageRecycleTest tdb gtest gtest_main) + diff --git a/source/libs/tdb/test/tdbExOVFLTest.cpp b/source/libs/tdb/test/tdbExOVFLTest.cpp index b16bc643d3..325703c946 100644 --- a/source/libs/tdb/test/tdbExOVFLTest.cpp +++ b/source/libs/tdb/test/tdbExOVFLTest.cpp @@ -190,6 +190,15 @@ static void insertOfp(void) { // commit current transaction tdbCommit(pEnv, txn); tdbPostCommit(pEnv, txn); + + closePool(pPool); + + // Close a database + tdbTbClose(pDb); + + // Close Env + ret = tdbClose(pEnv); + GTEST_ASSERT_EQ(ret, 0); } // TEST(TdbOVFLPagesTest, DISABLED_TbInsertTest) { @@ -233,6 +242,13 @@ TEST(TdbOVFLPagesTest, TbGetTest) { tdbFree(pVal); } + + // Close a database + tdbTbClose(pDb); + + // Close Env + ret = tdbClose(pEnv); + GTEST_ASSERT_EQ(ret, 0); } // TEST(TdbOVFLPagesTest, DISABLED_TbDeleteTest) { @@ -334,6 +350,15 @@ tdbBegin(pEnv, &txn); // commit current transaction tdbCommit(pEnv, txn); tdbPostCommit(pEnv, txn); + + closePool(pPool); + + // Close a database + tdbTbClose(pDb); + + // Close Env + ret = tdbClose(pEnv); + GTEST_ASSERT_EQ(ret, 0); } // TEST(tdb_test, DISABLED_simple_insert1) { @@ -407,6 +432,8 @@ TEST(tdb_test, simple_insert1) { tdbCommit(pEnv, txn); tdbPostCommit(pEnv, txn); + closePool(pPool); + { // Query the data void *pVal = NULL; int vLen; diff --git a/source/libs/tdb/test/tdbPageRecycleTest.cpp b/source/libs/tdb/test/tdbPageRecycleTest.cpp new file mode 100644 index 0000000000..4d7b314917 --- /dev/null +++ b/source/libs/tdb/test/tdbPageRecycleTest.cpp @@ -0,0 +1,835 @@ +#include + +#define ALLOW_FORBID_FUNC +#include "os.h" +#include "tdb.h" + +#include +#include +#include +#include +#include "tlog.h" + +typedef struct SPoolMem { + int64_t size; + struct SPoolMem *prev; + struct SPoolMem *next; +} SPoolMem; + +static SPoolMem *openPool() { + SPoolMem *pPool = (SPoolMem *)taosMemoryMalloc(sizeof(*pPool)); + + pPool->prev = pPool->next = pPool; + pPool->size = 0; + + return pPool; +} + +static void clearPool(SPoolMem *pPool) { + SPoolMem *pMem; + + do { + pMem = pPool->next; + + if (pMem == pPool) break; + + pMem->next->prev = pMem->prev; + pMem->prev->next = pMem->next; + pPool->size -= pMem->size; + + taosMemoryFree(pMem); + } while (1); + + assert(pPool->size == 0); +} + +static void closePool(SPoolMem *pPool) { + clearPool(pPool); + taosMemoryFree(pPool); +} + +static void *poolMalloc(void *arg, size_t size) { + void *ptr = NULL; + SPoolMem *pPool = (SPoolMem *)arg; + SPoolMem *pMem; + + pMem = (SPoolMem *)taosMemoryMalloc(sizeof(*pMem) + size); + if (pMem == NULL) { + assert(0); + } + + pMem->size = sizeof(*pMem) + size; + pMem->next = pPool->next; + pMem->prev = pPool; + + pPool->next->prev = pMem; + pPool->next = pMem; + pPool->size += pMem->size; + + ptr = (void *)(&pMem[1]); + return ptr; +} + +static void poolFree(void *arg, void *ptr) { + SPoolMem *pPool = (SPoolMem *)arg; + SPoolMem *pMem; + + pMem = &(((SPoolMem *)ptr)[-1]); + + pMem->next->prev = pMem->prev; + pMem->prev->next = pMem->next; + pPool->size -= pMem->size; + + taosMemoryFree(pMem); +} + +static int tKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) { + int k1, k2; + + std::string s1((char *)pKey1 + 3, kLen1 - 3); + std::string s2((char *)pKey2 + 3, kLen2 - 3); + k1 = stoi(s1); + k2 = stoi(s2); + + if (k1 < k2) { + return -1; + } else if (k1 > k2) { + return 1; + } else { + return 0; + } +} + +static int tDefaultKeyCmpr(const void *pKey1, int keyLen1, const void *pKey2, int keyLen2) { + int mlen; + int cret; + + ASSERT(keyLen1 > 0 && keyLen2 > 0 && pKey1 != NULL && pKey2 != NULL); + + mlen = keyLen1 < keyLen2 ? keyLen1 : keyLen2; + cret = memcmp(pKey1, pKey2, mlen); + if (cret == 0) { + if (keyLen1 < keyLen2) { + cret = -1; + } else if (keyLen1 > keyLen2) { + cret = 1; + } else { + cret = 0; + } + } + return cret; +} + +static TDB *openEnv(char const *envName, int const pageSize, int const pageNum) { + TDB *pEnv = NULL; + + int ret = tdbOpen(envName, pageSize, pageNum, &pEnv, 0); + if (ret) { + pEnv = NULL; + } + + return pEnv; +} + +static void generateBigVal(char *val, int valLen) { + for (int i = 0; i < valLen; ++i) { + char c = char(i & 0xff); + if (c == 0) { + c = 1; + } + val[i] = c; + } +} + +static void insertOfp(void) { + int ret = 0; + + // open Env + int const pageSize = 4096; + int const pageNum = 64; + TDB *pEnv = openEnv("tdb", pageSize, pageNum); + GTEST_ASSERT_NE(pEnv, nullptr); + + // open db + TTB *pDb = NULL; + tdb_cmpr_fn_t compFunc = tKeyCmpr; + // ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0); + ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0); + GTEST_ASSERT_EQ(ret, 0); + + // open the pool + SPoolMem *pPool = openPool(); + + // start a transaction + TXN *txn = NULL; + + tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); + + // generate value payload + // char val[((4083 - 4 - 3 - 2) + 1) * 100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4) + char val[32605]; + int valLen = sizeof(val) / sizeof(val[0]); + generateBigVal(val, valLen); + + // insert the generated big data + // char const *key = "key1"; + char const *key = "key123456789"; + ret = tdbTbInsert(pDb, key, strlen(key) + 1, val, valLen, txn); + GTEST_ASSERT_EQ(ret, 0); + + // commit current transaction + tdbCommit(pEnv, txn); + tdbPostCommit(pEnv, txn); + + closePool(pPool); + + // Close a database + tdbTbClose(pDb); + + // Close Env + ret = tdbClose(pEnv); + GTEST_ASSERT_EQ(ret, 0); +} + +static void clearDb(char const *db) { taosRemoveDir(db); } + +TEST(TdbPageRecycleTest, DISABLED_TbInsertTest) { + // TEST(TdbPageRecycleTest, TbInsertTest) { + // ofp inserting + clearDb("tdb"); + insertOfp(); +} + +TEST(TdbPageRecycleTest, DISABLED_TbGetTest) { + // TEST(TdbPageRecycleTest, TbGetTest) { + clearDb("tdb"); + insertOfp(); + + // open Env + int const pageSize = 4096; + int const pageNum = 64; + TDB *pEnv = openEnv("tdb", pageSize, pageNum); + GTEST_ASSERT_NE(pEnv, nullptr); + + // open db + TTB *pDb = NULL; + tdb_cmpr_fn_t compFunc = tKeyCmpr; + // int ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0); + int ret = tdbTbOpen("ofp_insert.db", 12, -1, compFunc, pEnv, &pDb, 0); + GTEST_ASSERT_EQ(ret, 0); + + // generate value payload + // char val[((4083 - 4 - 3 - 2) + 1) * 100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4) + char val[32605]; + int valLen = sizeof(val) / sizeof(val[0]); + generateBigVal(val, valLen); + + { // Query the data + void *pVal = NULL; + int vLen; + + // char const *key = "key1"; + char const *key = "key123456789"; + ret = tdbTbGet(pDb, key, strlen(key), &pVal, &vLen); + ASSERT(ret == 0); + GTEST_ASSERT_EQ(ret, 0); + + GTEST_ASSERT_EQ(vLen, valLen); + GTEST_ASSERT_EQ(memcmp(val, pVal, vLen), 0); + + tdbFree(pVal); + } +} + +TEST(TdbPageRecycleTest, DISABLED_TbDeleteTest) { + // TEST(TdbPageRecycleTest, TbDeleteTest) { + int ret = 0; + + taosRemoveDir("tdb"); + + // open Env + int const pageSize = 4096; + int const pageNum = 64; + TDB *pEnv = openEnv("tdb", pageSize, pageNum); + GTEST_ASSERT_NE(pEnv, nullptr); + + // open db + TTB *pDb = NULL; + tdb_cmpr_fn_t compFunc = tKeyCmpr; + ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0); + GTEST_ASSERT_EQ(ret, 0); + + // open the pool + SPoolMem *pPool = openPool(); + + // start a transaction + TXN *txn; + + tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); + + // generate value payload + // char val[((4083 - 4 - 3 - 2) + 1) * 100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4) + char val[((4083 - 4 - 3 - 2) + 1) * 2]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4) + int valLen = sizeof(val) / sizeof(val[0]); + generateBigVal(val, valLen); + + { // insert the generated big data + ret = tdbTbInsert(pDb, "key1", strlen("key1"), val, valLen, txn); + GTEST_ASSERT_EQ(ret, 0); + } + + { // query the data + void *pVal = NULL; + int vLen; + + ret = tdbTbGet(pDb, "key1", strlen("key1"), &pVal, &vLen); + ASSERT(ret == 0); + GTEST_ASSERT_EQ(ret, 0); + + GTEST_ASSERT_EQ(vLen, valLen); + GTEST_ASSERT_EQ(memcmp(val, pVal, vLen), 0); + + tdbFree(pVal); + } + /* open to debug committed file +tdbCommit(pEnv, &txn); +tdbTxnClose(&txn); + +++txnid; +tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); +tdbBegin(pEnv, &txn); + */ + { // upsert the data + ret = tdbTbUpsert(pDb, "key1", strlen("key1"), "value1", strlen("value1"), txn); + GTEST_ASSERT_EQ(ret, 0); + } + + { // query the upserted data + void *pVal = NULL; + int vLen; + + ret = tdbTbGet(pDb, "key1", strlen("key1"), &pVal, &vLen); + ASSERT(ret == 0); + GTEST_ASSERT_EQ(ret, 0); + + GTEST_ASSERT_EQ(vLen, strlen("value1")); + GTEST_ASSERT_EQ(memcmp("value1", pVal, vLen), 0); + + tdbFree(pVal); + } + + { // delete the data + ret = tdbTbDelete(pDb, "key1", strlen("key1"), txn); + GTEST_ASSERT_EQ(ret, 0); + } + + { // query the deleted data + void *pVal = NULL; + int vLen = -1; + + ret = tdbTbGet(pDb, "key1", strlen("key1"), &pVal, &vLen); + ASSERT(ret == -1); + GTEST_ASSERT_EQ(ret, -1); + + GTEST_ASSERT_EQ(vLen, -1); + GTEST_ASSERT_EQ(pVal, nullptr); + + tdbFree(pVal); + } + + // commit current transaction + tdbCommit(pEnv, txn); + tdbPostCommit(pEnv, txn); +} + +TEST(TdbPageRecycleTest, DISABLED_simple_insert1) { + // TEST(TdbPageRecycleTest, simple_insert1) { + int ret; + TDB *pEnv; + TTB *pDb; + tdb_cmpr_fn_t compFunc; + int nData = 1; + TXN *txn; + int const pageSize = 4096; + + taosRemoveDir("tdb"); + + // Open Env + ret = tdbOpen("tdb", pageSize, 64, &pEnv, 0); + GTEST_ASSERT_EQ(ret, 0); + + // Create a database + compFunc = tKeyCmpr; + ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb, 0); + GTEST_ASSERT_EQ(ret, 0); + + { + char key[64]; + // char val[(4083 - 4 - 3 - 2)]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4) + char val[(4083 - 4 - 3 - 2) + 1]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4) + int64_t poolLimit = 4096; // 1M pool limit + SPoolMem *pPool; + + // open the pool + pPool = openPool(); + + // start a transaction + tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); + + for (int iData = 1; iData <= nData; iData++) { + sprintf(key, "key0"); + sprintf(val, "value%d", iData); + + // ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), &txn); + // GTEST_ASSERT_EQ(ret, 0); + + // generate value payload + int valLen = sizeof(val) / sizeof(val[0]); + for (int i = 6; i < valLen; ++i) { + char c = char(i & 0xff); + if (c == 0) { + c = 1; + } + val[i] = c; + } + + ret = tdbTbInsert(pDb, "key1", strlen("key1"), val, valLen, txn); + GTEST_ASSERT_EQ(ret, 0); + + // if pool is full, commit the transaction and start a new one + if (pPool->size >= poolLimit) { + // commit current transaction + tdbCommit(pEnv, txn); + tdbPostCommit(pEnv, txn); + + // start a new transaction + clearPool(pPool); + + tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); + } + } + + // commit the transaction + tdbCommit(pEnv, txn); + tdbPostCommit(pEnv, txn); + + { // Query the data + void *pVal = NULL; + int vLen; + + for (int i = 1; i <= nData; i++) { + sprintf(key, "key%d", i); + // sprintf(val, "value%d", i); + + ret = tdbTbGet(pDb, key, strlen(key), &pVal, &vLen); + ASSERT(ret == 0); + GTEST_ASSERT_EQ(ret, 0); + + GTEST_ASSERT_EQ(vLen, sizeof(val) / sizeof(val[0])); + GTEST_ASSERT_EQ(memcmp(val, pVal, vLen), 0); + } + + tdbFree(pVal); + } + + { // Iterate to query the DB data + TBC *pDBC; + void *pKey = NULL; + void *pVal = NULL; + int vLen, kLen; + int count = 0; + + ret = tdbTbcOpen(pDb, &pDBC, NULL); + GTEST_ASSERT_EQ(ret, 0); + + tdbTbcMoveToFirst(pDBC); + + for (;;) { + ret = tdbTbcNext(pDBC, &pKey, &kLen, &pVal, &vLen); + if (ret < 0) break; + + // std::cout.write((char *)pKey, kLen) /* << " " << kLen */ << " "; + // std::cout.write((char *)pVal, vLen) /* << " " << vLen */; + // std::cout << std::endl; + + count++; + } + + GTEST_ASSERT_EQ(count, nData); + + tdbTbcClose(pDBC); + + tdbFree(pKey); + tdbFree(pVal); + } + } + + ret = tdbTbDrop(pDb); + GTEST_ASSERT_EQ(ret, 0); + + // Close a database + tdbTbClose(pDb); + + // Close Env + ret = tdbClose(pEnv); + GTEST_ASSERT_EQ(ret, 0); +} + +static void insertDb(int nData) { + int ret = 0; + TDB *pEnv = NULL; + TTB *pDb = NULL; + tdb_cmpr_fn_t compFunc; + TXN *txn = NULL; + int const pageSize = 4 * 1024; + + // Open Env + ret = tdbOpen("tdb", pageSize, 64, &pEnv, 0); + GTEST_ASSERT_EQ(ret, 0); + + // Create a database + compFunc = tKeyCmpr; + ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb, 0); + GTEST_ASSERT_EQ(ret, 0); + + // 1, insert nData kv + { + char key[64]; + char val[(4083 - 4 - 3 - 2) + 1]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4) + int64_t poolLimit = 4096; // 1M pool limit + SPoolMem *pPool; + + // open the pool + pPool = openPool(); + + // start a transaction + tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); + + for (int iData = 0; iData < nData; ++iData) { + sprintf(key, "key%03d", iData); + sprintf(val, "value%03d", iData); + + ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), txn); + GTEST_ASSERT_EQ(ret, 0); + // if pool is full, commit the transaction and start a new one + if (pPool->size >= poolLimit) { + // commit current transaction + tdbCommit(pEnv, txn); + tdbPostCommit(pEnv, txn); + + // start a new transaction + clearPool(pPool); + + tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); + } + } + + // commit the transaction + tdbCommit(pEnv, txn); + tdbPostCommit(pEnv, txn); + + // 2, delete nData/2 records + + closePool(pPool); + } + + // Close a database + tdbTbClose(pDb); + + // Close Env + ret = tdbClose(pEnv); + GTEST_ASSERT_EQ(ret, 0); + + system("ls -l ./tdb"); +} + +static void deleteDb(int nData) { + int ret = 0; + TDB *pEnv = NULL; + TTB *pDb = NULL; + tdb_cmpr_fn_t compFunc; + TXN *txn = NULL; + int const pageSize = 4 * 1024; + + // Open Env + ret = tdbOpen("tdb", pageSize, 64, &pEnv, 0); + GTEST_ASSERT_EQ(ret, 0); + + // Create a database + compFunc = tKeyCmpr; + ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb, 0); + GTEST_ASSERT_EQ(ret, 0); + + // 2, delete nData/2 records + { + char key[64]; + char val[(4083 - 4 - 3 - 2) + 1]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4) + int64_t poolLimit = 4096; // 1M pool limit + SPoolMem *pPool; + + // open the pool + pPool = openPool(); + + // start a transaction + tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); + + for (int iData = 0; iData < nData; iData++) { + // if (iData % 2 == 0) continue; + + sprintf(key, "key%03d", iData); + sprintf(val, "value%03d", iData); + + { // delete the data + ret = tdbTbDelete(pDb, key, strlen(key), txn); + GTEST_ASSERT_EQ(ret, 0); + } + // if pool is full, commit the transaction and start a new one + if (pPool->size >= poolLimit) { + // commit current transaction + tdbCommit(pEnv, txn); + tdbPostCommit(pEnv, txn); + + // start a new transaction + clearPool(pPool); + + tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); + } + } + + // commit the transaction + tdbCommit(pEnv, txn); + tdbPostCommit(pEnv, txn); + + closePool(pPool); + } + + // Close a database + tdbTbClose(pDb); + + // Close Env + ret = tdbClose(pEnv); + GTEST_ASSERT_EQ(ret, 0); + + system("ls -l ./tdb"); +} + +static const int nDataConst = 256 * 19; + +// TEST(TdbPageRecycleTest, DISABLED_seq_insert) { +TEST(TdbPageRecycleTest, seq_insert) { + clearDb("tdb"); + insertDb(nDataConst); +} + +// TEST(TdbPageRecycleTest, DISABLED_seq_delete) { +TEST(TdbPageRecycleTest, seq_delete) { deleteDb(nDataConst); } + +// TEST(TdbPageRecycleTest, DISABLED_recycly_insert) { +TEST(TdbPageRecycleTest, recycly_insert) { insertDb(nDataConst); } + +// TEST(TdbPageRecycleTest, DISABLED_recycly_seq_insert_ofp) { +TEST(TdbPageRecycleTest, recycly_seq_insert_ofp) { + clearDb("tdb"); + insertOfp(); + system("ls -l ./tdb"); +} + +static void deleteOfp(void) { + // open Env + int ret = 0; + int const pageSize = 4096; + int const pageNum = 64; + TDB *pEnv = openEnv("tdb", pageSize, pageNum); + GTEST_ASSERT_NE(pEnv, nullptr); + + // open db + TTB *pDb = NULL; + tdb_cmpr_fn_t compFunc = tKeyCmpr; + ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0); + GTEST_ASSERT_EQ(ret, 0); + + // open the pool + SPoolMem *pPool = openPool(); + + // start a transaction + TXN *txn; + + tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); + + { // delete the data + char const *key = "key123456789"; + ret = tdbTbDelete(pDb, key, strlen(key) + 1, txn); + GTEST_ASSERT_EQ(ret, 0); + } + + // commit current transaction + tdbCommit(pEnv, txn); + tdbPostCommit(pEnv, txn); + + closePool(pPool); + + ret = tdbTbDrop(pDb); + GTEST_ASSERT_EQ(ret, 0); + + // Close a database + tdbTbClose(pDb); + + // Close Env + ret = tdbClose(pEnv); + GTEST_ASSERT_EQ(ret, 0); +} + +// TEST(TdbPageRecycleTest, DISABLED_seq_delete_ofp) { +TEST(TdbPageRecycleTest, seq_delete_ofp) { + deleteOfp(); + system("ls -l ./tdb"); +} + +// TEST(TdbPageRecycleTest, DISABLED_recycly_seq_insert_ofp_again) { +TEST(TdbPageRecycleTest, recycly_seq_insert_ofp_again) { + insertOfp(); + system("ls -l ./tdb"); +} + +// TEST(TdbPageRecycleTest, DISABLED_recycly_seq_insert_ofp_nocommit) { +TEST(TdbPageRecycleTest, recycly_seq_insert_ofp_nocommit) { + clearDb("tdb"); + insertOfp(); + system("ls -l ./tdb"); + + // open Env + int ret = 0; + int const pageSize = 4096; + int const pageNum = 64; + TDB *pEnv = openEnv("tdb", pageSize, pageNum); + GTEST_ASSERT_NE(pEnv, nullptr); + + // open db + TTB *pDb = NULL; + tdb_cmpr_fn_t compFunc = tKeyCmpr; + ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0); + GTEST_ASSERT_EQ(ret, 0); + + // open the pool + SPoolMem *pPool = openPool(); + + // start a transaction + TXN *txn; + + tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); + + { // delete the data + char const *key = "key123456789"; + ret = tdbTbDelete(pDb, key, strlen(key) + 1, txn); + GTEST_ASSERT_EQ(ret, 0); + } + + // 1, insert nData kv + { + int nData = nDataConst; + char key[64]; + char val[(4083 - 4 - 3 - 2) + 1]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4) + int64_t poolLimit = 4096; // 1M pool limit + + for (int iData = 0; iData < nData; ++iData) { + sprintf(key, "key%03d", iData); + sprintf(val, "value%03d", iData); + + ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), txn); + GTEST_ASSERT_EQ(ret, 0); + // if pool is full, commit the transaction and start a new one + if (pPool->size >= poolLimit) { + // commit current transaction + tdbCommit(pEnv, txn); + tdbPostCommit(pEnv, txn); + + // start a new transaction + clearPool(pPool); + + tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); + } + } + } + + // commit current transaction + tdbCommit(pEnv, txn); + tdbPostCommit(pEnv, txn); + + closePool(pPool); + + // Close a database + tdbTbClose(pDb); + + // Close Env + ret = tdbClose(pEnv); + GTEST_ASSERT_EQ(ret, 0); + + system("ls -l ./tdb"); +} + +// TEST(TdbPageRecycleTest, DISABLED_recycly_delete_interior_ofp_nocommit) { +TEST(TdbPageRecycleTest, recycly_delete_interior_ofp_nocommit) { + clearDb("tdb"); + + // open Env + int ret = 0; + int const pageSize = 4096; + int const pageNum = 64; + TDB *pEnv = openEnv("tdb", pageSize, pageNum); + GTEST_ASSERT_NE(pEnv, nullptr); + + // open db + TTB *pDb = NULL; + tdb_cmpr_fn_t compFunc = NULL; // tKeyCmpr; + ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0); + GTEST_ASSERT_EQ(ret, 0); + + // open the pool + SPoolMem *pPool = openPool(); + + // start a transaction + TXN *txn; + + tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); + + char key[1024] = {0}; + int count = sizeof(key) / sizeof(key[0]); + for (int i = 0; i < count - 1; ++i) { + key[i] = 'a'; + } + + // insert n ofp keys to form 2-layer btree + { + for (int i = 0; i < 7; ++i) { + // sprintf(&key[count - 2], "%c", i); + key[count - 2] = '0' + i; + + ret = tdbTbInsert(pDb, key, count, NULL, NULL, txn); + GTEST_ASSERT_EQ(ret, 0); + } + } + /* + // delete one interior key + { + sprintf(&key[count - 2], "%c", 2); + key[count - 2] = '0' + 2; + + ret = tdbTbDelete(pDb, key, strlen(key) + 1, txn); + GTEST_ASSERT_EQ(ret, 0); + } + */ + // commit current transaction + tdbCommit(pEnv, txn); + tdbPostCommit(pEnv, txn); + + closePool(pPool); + + // Close a database + tdbTbClose(pDb); + + // Close Env + ret = tdbClose(pEnv); + GTEST_ASSERT_EQ(ret, 0); + + system("ls -l ./tdb"); +} diff --git a/source/util/src/terror.c b/source/util/src/terror.c index d2b9edf753..7d3859e04a 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -568,6 +568,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_SELECTED_EXPR, "Invalid SELECTed ex TAOS_DEFINE_ERROR(TSDB_CODE_PAR_GET_META_ERROR, "Fail to get table info") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_NOT_UNIQUE_TABLE_ALIAS, "Not unique table/alias") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED_FUNC, "System table not allowed") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED, "System table not allowed") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error") //planner diff --git a/tests/script/tsim/sma/drop_sma.sim b/tests/script/tsim/sma/drop_sma.sim index 0d2712f8db..8fd8ebdcfd 100644 --- a/tests/script/tsim/sma/drop_sma.sim +++ b/tests/script/tsim/sma/drop_sma.sim @@ -129,6 +129,7 @@ sql DROP INDEX sma_index_3 ; print ========== step8 sql drop database if exists db; +sleep 2000 sql create database db duration 300; sql use db; sql create table stb1(ts timestamp, c_int int, c_bint bigint, c_sint smallint, c_tint tinyint,c_float float, c_double double, c_bool bool,c_binary binary(16), c_nchar nchar(32), c_ts timestamp,c_tint_un tinyint unsigned, c_sint_un smallint unsigned,c_int_un int unsigned, c_bint_un bigint unsigned) tags (t_int int);