Merge branch '3.0' into enh/TD-23769-3.0
This commit is contained in:
commit
cb8b10f6a2
|
@ -81,10 +81,6 @@ Set<String> subscription() throws SQLException;
|
||||||
|
|
||||||
ConsumerRecords<V> poll(Duration timeout) throws SQLException;
|
ConsumerRecords<V> poll(Duration timeout) throws SQLException;
|
||||||
|
|
||||||
void commitAsync();
|
|
||||||
|
|
||||||
void commitAsync(OffsetCommitCallback callback);
|
|
||||||
|
|
||||||
void commitSync() throws SQLException;
|
void commitSync() throws SQLException;
|
||||||
|
|
||||||
void close() throws SQLException;
|
void close() throws SQLException;
|
||||||
|
|
|
@ -36,15 +36,16 @@ REST connection supports all platforms that can run Java.
|
||||||
|
|
||||||
| taos-jdbcdriver version | major changes | TDengine version |
|
| taos-jdbcdriver version | major changes | TDengine version |
|
||||||
| :---------------------: | :------------------------------------------------------------------------------------------------------------------------------------------------: | :--------------: |
|
| :---------------------: | :------------------------------------------------------------------------------------------------------------------------------------------------: | :--------------: |
|
||||||
|
| 3.2.4 | Subscription add the enable.auto.commit parameter and the unsubscribe() method in the WebSocket connection | 3.0.5.0 or later |
|
||||||
| 3.2.3 | Fixed resultSet data parsing failure in some cases | 3.0.5.0 or later |
|
| 3.2.3 | Fixed resultSet data parsing failure in some cases | 3.0.5.0 or later |
|
||||||
| 3.2.2 | subscription add seek function | 3.0.5.0 or later |
|
| 3.2.2 | Subscription add seek function | 3.0.5.0 or later |
|
||||||
| 3.2.1 | JDBC REST connection supports schemaless/prepareStatement over WebSocket | 3.0.3.0 or later |
|
| 3.2.1 | JDBC REST connection supports schemaless/prepareStatement over WebSocket | 3.0.3.0 or later |
|
||||||
| 3.2.0 | This version has been deprecated | - |
|
| 3.2.0 | This version has been deprecated | - |
|
||||||
| 3.1.0 | JDBC REST connection supports subscription over WebSocket | - |
|
| 3.1.0 | JDBC REST connection supports subscription over WebSocket | - |
|
||||||
| 3.0.1 - 3.0.4 | fix the resultSet data is parsed incorrectly sometimes. 3.0.1 is compiled on JDK 11, you are advised to use other version in the JDK 8 environment | - |
|
| 3.0.1 - 3.0.4 | fix the resultSet data is parsed incorrectly sometimes. 3.0.1 is compiled on JDK 11, you are advised to use other version in the JDK 8 environment | - |
|
||||||
| 3.0.0 | Support for TDengine 3.0 | 3.0.0.0 or later |
|
| 3.0.0 | Support for TDengine 3.0 | 3.0.0.0 or later |
|
||||||
| 2.0.42 | fix wasNull interface return value in WebSocket connection | - |
|
| 2.0.42 | Fix wasNull interface return value in WebSocket connection | - |
|
||||||
| 2.0.41 | fix decode method of username and password in REST connection | - |
|
| 2.0.41 | Fix decode method of username and password in REST connection | - |
|
||||||
| 2.0.39 - 2.0.40 | Add REST connection/request timeout parameters | - |
|
| 2.0.39 - 2.0.40 | Add REST connection/request timeout parameters | - |
|
||||||
| 2.0.38 | JDBC REST connections add bulk pull function | - |
|
| 2.0.38 | JDBC REST connections add bulk pull function | - |
|
||||||
| 2.0.37 | Support json tags | - |
|
| 2.0.37 | Support json tags | - |
|
||||||
|
|
|
@ -81,10 +81,6 @@ Set<String> subscription() throws SQLException;
|
||||||
|
|
||||||
ConsumerRecords<V> poll(Duration timeout) throws SQLException;
|
ConsumerRecords<V> poll(Duration timeout) throws SQLException;
|
||||||
|
|
||||||
void commitAsync();
|
|
||||||
|
|
||||||
void commitAsync(OffsetCommitCallback callback);
|
|
||||||
|
|
||||||
void commitSync() throws SQLException;
|
void commitSync() throws SQLException;
|
||||||
|
|
||||||
void close() throws SQLException;
|
void close() throws SQLException;
|
||||||
|
|
|
@ -36,6 +36,7 @@ REST 连接支持所有能运行 Java 的平台。
|
||||||
|
|
||||||
| taos-jdbcdriver 版本 | 主要变化 | TDengine 版本 |
|
| taos-jdbcdriver 版本 | 主要变化 | TDengine 版本 |
|
||||||
| :------------------: | :----------------------------------------------------------------------------------------------------------------------------------------------------: | :----------------: |
|
| :------------------: | :----------------------------------------------------------------------------------------------------------------------------------------------------: | :----------------: |
|
||||||
|
| 3.2.4 | 数据订阅在 WebSocket 连接下增加 enable.auto.commit 参数,以及 unsubscribe() 方法。 | - |
|
||||||
| 3.2.3 | 修复 ResultSet 在一些情况数据解析失败 | - |
|
| 3.2.3 | 修复 ResultSet 在一些情况数据解析失败 | - |
|
||||||
| 3.2.2 | 新增功能:数据订阅支持 seek 功能。 | 3.0.5.0 及更高版本 |
|
| 3.2.2 | 新增功能:数据订阅支持 seek 功能。 | 3.0.5.0 及更高版本 |
|
||||||
| 3.2.1 | 新增功能:WebSocket 连接支持 schemaless 与 prepareStatement 写入。变更:consumer poll 返回结果集为 ConsumerRecord,可通过 value() 获取指定结果集数据。 | 3.0.3.0 及更高版本 |
|
| 3.2.1 | 新增功能:WebSocket 连接支持 schemaless 与 prepareStatement 写入。变更:consumer poll 返回结果集为 ConsumerRecord,可通过 value() 获取指定结果集数据。 | 3.0.3.0 及更高版本 |
|
||||||
|
|
|
@ -3025,6 +3025,7 @@ typedef struct {
|
||||||
char* sql;
|
char* sql;
|
||||||
char* ast;
|
char* ast;
|
||||||
int64_t deleteMark;
|
int64_t deleteMark;
|
||||||
|
int64_t lastTs;
|
||||||
} SMCreateSmaReq;
|
} SMCreateSmaReq;
|
||||||
|
|
||||||
int32_t tSerializeSMCreateSmaReq(void* buf, int32_t bufLen, SMCreateSmaReq* pReq);
|
int32_t tSerializeSMCreateSmaReq(void* buf, int32_t bufLen, SMCreateSmaReq* pReq);
|
||||||
|
|
|
@ -319,19 +319,22 @@ typedef struct SIndexOptions {
|
||||||
SNode* pInterval;
|
SNode* pInterval;
|
||||||
SNode* pOffset;
|
SNode* pOffset;
|
||||||
SNode* pSliding;
|
SNode* pSliding;
|
||||||
|
int8_t tsPrecision;
|
||||||
SNode* pStreamOptions;
|
SNode* pStreamOptions;
|
||||||
} SIndexOptions;
|
} SIndexOptions;
|
||||||
|
|
||||||
typedef struct SCreateIndexStmt {
|
typedef struct SCreateIndexStmt {
|
||||||
ENodeType type;
|
ENodeType type;
|
||||||
EIndexType indexType;
|
EIndexType indexType;
|
||||||
bool ignoreExists;
|
bool ignoreExists;
|
||||||
char indexDbName[TSDB_DB_NAME_LEN];
|
char indexDbName[TSDB_DB_NAME_LEN];
|
||||||
char indexName[TSDB_INDEX_NAME_LEN];
|
char indexName[TSDB_INDEX_NAME_LEN];
|
||||||
char dbName[TSDB_DB_NAME_LEN];
|
char dbName[TSDB_DB_NAME_LEN];
|
||||||
char tableName[TSDB_TABLE_NAME_LEN];
|
char tableName[TSDB_TABLE_NAME_LEN];
|
||||||
SNodeList* pCols;
|
SNodeList* pCols;
|
||||||
SIndexOptions* pOptions;
|
SIndexOptions* pOptions;
|
||||||
|
SNode* pPrevQuery;
|
||||||
|
SMCreateSmaReq* pReq;
|
||||||
} SCreateIndexStmt;
|
} SCreateIndexStmt;
|
||||||
|
|
||||||
typedef struct SDropIndexStmt {
|
typedef struct SDropIndexStmt {
|
||||||
|
|
|
@ -706,6 +706,7 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_PAR_INVALID_TIMELINE_QUERY TAOS_DEF_ERROR_CODE(0, 0x2666)
|
#define TSDB_CODE_PAR_INVALID_TIMELINE_QUERY TAOS_DEF_ERROR_CODE(0, 0x2666)
|
||||||
#define TSDB_CODE_PAR_INVALID_OPTR_USAGE TAOS_DEF_ERROR_CODE(0, 0x2667)
|
#define TSDB_CODE_PAR_INVALID_OPTR_USAGE TAOS_DEF_ERROR_CODE(0, 0x2667)
|
||||||
#define TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x2668)
|
#define TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x2668)
|
||||||
|
#define TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED TAOS_DEF_ERROR_CODE(0, 0x2669)
|
||||||
#define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF)
|
#define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF)
|
||||||
|
|
||||||
//planner
|
//planner
|
||||||
|
|
|
@ -835,6 +835,7 @@ int32_t tSerializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pReq
|
||||||
if (tEncodeBinary(&encoder, pReq->ast, pReq->astLen) < 0) return -1;
|
if (tEncodeBinary(&encoder, pReq->ast, pReq->astLen) < 0) return -1;
|
||||||
}
|
}
|
||||||
if (tEncodeI64(&encoder, pReq->deleteMark) < 0) return -1;
|
if (tEncodeI64(&encoder, pReq->deleteMark) < 0) return -1;
|
||||||
|
if (tEncodeI64(&encoder, pReq->lastTs) < 0) return -1;
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
int32_t tlen = encoder.pos;
|
int32_t tlen = encoder.pos;
|
||||||
|
@ -884,6 +885,7 @@ int32_t tDeserializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pR
|
||||||
if (tDecodeCStrTo(&decoder, pReq->ast) < 0) return -1;
|
if (tDecodeCStrTo(&decoder, pReq->ast) < 0) return -1;
|
||||||
}
|
}
|
||||||
if (tDecodeI64(&decoder, &pReq->deleteMark) < 0) return -1;
|
if (tDecodeI64(&decoder, &pReq->deleteMark) < 0) return -1;
|
||||||
|
if (tDecodeI64(&decoder, &pReq->lastTs) < 0) return -1;
|
||||||
tEndDecode(&decoder);
|
tEndDecode(&decoder);
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -79,16 +79,18 @@ typedef struct {
|
||||||
TXN* pTxn;
|
TXN* pTxn;
|
||||||
} STtlDelTtlCtx;
|
} STtlDelTtlCtx;
|
||||||
|
|
||||||
int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback);
|
int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback);
|
||||||
int ttlMgrClose(STtlManger* pTtlMgr);
|
void ttlMgrClose(STtlManger* pTtlMgr);
|
||||||
int ttlMgrBegin(STtlManger* pTtlMgr, void* pMeta);
|
int ttlMgrPostOpen(STtlManger* pTtlMgr, void* pMeta);
|
||||||
|
|
||||||
int ttlMgrConvert(TTB* pOldTtlIdx, TTB* pNewTtlIdx, void* pMeta);
|
bool ttlMgrNeedUpgrade(TDB* pEnv);
|
||||||
int ttlMgrFlush(STtlManger* pTtlMgr, TXN* pTxn);
|
int ttlMgrUpgrade(STtlManger* pTtlMgr, void* pMeta);
|
||||||
|
|
||||||
int ttlMgrInsertTtl(STtlManger* pTtlMgr, const STtlUpdTtlCtx* pUpdCtx);
|
int ttlMgrInsertTtl(STtlManger* pTtlMgr, const STtlUpdTtlCtx* pUpdCtx);
|
||||||
int ttlMgrDeleteTtl(STtlManger* pTtlMgr, const STtlDelTtlCtx* pDelCtx);
|
int ttlMgrDeleteTtl(STtlManger* pTtlMgr, const STtlDelTtlCtx* pDelCtx);
|
||||||
int ttlMgrUpdateChangeTime(STtlManger* pTtlMgr, const STtlUpdCtimeCtx* pUpdCtimeCtx);
|
int ttlMgrUpdateChangeTime(STtlManger* pTtlMgr, const STtlUpdCtimeCtx* pUpdCtimeCtx);
|
||||||
|
|
||||||
|
int ttlMgrFlush(STtlManger* pTtlMgr, TXN* pTxn);
|
||||||
int ttlMgrFindExpired(STtlManger* pTtlMgr, int64_t timePointMs, SArray* pTbUids);
|
int ttlMgrFindExpired(STtlManger* pTtlMgr, int64_t timePointMs, SArray* pTbUids);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -136,6 +136,7 @@ typedef struct STbUidStore STbUidStore;
|
||||||
#define META_BEGIN_HEAP_NIL 2
|
#define META_BEGIN_HEAP_NIL 2
|
||||||
|
|
||||||
int metaOpen(SVnode* pVnode, SMeta** ppMeta, int8_t rollback);
|
int metaOpen(SVnode* pVnode, SMeta** ppMeta, int8_t rollback);
|
||||||
|
int metaUpgrade(SVnode* pVnode, SMeta** ppMeta);
|
||||||
int metaClose(SMeta** pMeta);
|
int metaClose(SMeta** pMeta);
|
||||||
int metaBegin(SMeta* pMeta, int8_t fromSys);
|
int metaBegin(SMeta* pMeta, int8_t fromSys);
|
||||||
TXN* metaGetTxn(SMeta* pMeta);
|
TXN* metaGetTxn(SMeta* pMeta);
|
||||||
|
|
|
@ -40,10 +40,6 @@ int metaBegin(SMeta *pMeta, int8_t heap) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ttlMgrBegin(pMeta->pTtlMgr, pMeta) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
tdbCommit(pMeta->pEnv, pMeta->txn);
|
tdbCommit(pMeta->pEnv, pMeta->txn);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -29,6 +29,8 @@ static int ncolIdxCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen
|
||||||
static int32_t metaInitLock(SMeta *pMeta) { return taosThreadRwlockInit(&pMeta->lock, NULL); }
|
static int32_t metaInitLock(SMeta *pMeta) { return taosThreadRwlockInit(&pMeta->lock, NULL); }
|
||||||
static int32_t metaDestroyLock(SMeta *pMeta) { return taosThreadRwlockDestroy(&pMeta->lock); }
|
static int32_t metaDestroyLock(SMeta *pMeta) { return taosThreadRwlockDestroy(&pMeta->lock); }
|
||||||
|
|
||||||
|
static void metaCleanup(SMeta **ppMeta);
|
||||||
|
|
||||||
int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
|
int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
|
||||||
SMeta *pMeta = NULL;
|
SMeta *pMeta = NULL;
|
||||||
int ret;
|
int ret;
|
||||||
|
@ -180,51 +182,43 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
if (pMeta->pIdx) metaCloseIdx(pMeta);
|
metaCleanup(&pMeta);
|
||||||
if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb);
|
|
||||||
if (pMeta->pNcolIdx) tdbTbClose(pMeta->pNcolIdx);
|
|
||||||
if (pMeta->pBtimeIdx) tdbTbClose(pMeta->pBtimeIdx);
|
|
||||||
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
|
|
||||||
if (pMeta->pTtlMgr) ttlMgrClose(pMeta->pTtlMgr);
|
|
||||||
if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx);
|
|
||||||
if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx);
|
|
||||||
if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx);
|
|
||||||
if (pMeta->pSuidIdx) tdbTbClose(pMeta->pSuidIdx);
|
|
||||||
if (pMeta->pNameIdx) tdbTbClose(pMeta->pNameIdx);
|
|
||||||
if (pMeta->pUidIdx) tdbTbClose(pMeta->pUidIdx);
|
|
||||||
if (pMeta->pSkmDb) tdbTbClose(pMeta->pSkmDb);
|
|
||||||
if (pMeta->pTbDb) tdbTbClose(pMeta->pTbDb);
|
|
||||||
if (pMeta->pEnv) tdbClose(pMeta->pEnv);
|
|
||||||
metaDestroyLock(pMeta);
|
|
||||||
taosMemoryFree(pMeta);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int metaClose(SMeta **ppMeta) {
|
int metaUpgrade(SVnode *pVnode, SMeta **ppMeta) {
|
||||||
|
int code = TSDB_CODE_SUCCESS;
|
||||||
SMeta *pMeta = *ppMeta;
|
SMeta *pMeta = *ppMeta;
|
||||||
if (pMeta) {
|
|
||||||
if (pMeta->pEnv) metaAbort(pMeta);
|
|
||||||
if (pMeta->pCache) metaCacheClose(pMeta);
|
|
||||||
if (pMeta->pIdx) metaCloseIdx(pMeta);
|
|
||||||
if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb);
|
|
||||||
if (pMeta->pNcolIdx) tdbTbClose(pMeta->pNcolIdx);
|
|
||||||
if (pMeta->pBtimeIdx) tdbTbClose(pMeta->pBtimeIdx);
|
|
||||||
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
|
|
||||||
if (pMeta->pTtlMgr) ttlMgrClose(pMeta->pTtlMgr);
|
|
||||||
if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx);
|
|
||||||
if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx);
|
|
||||||
if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx);
|
|
||||||
if (pMeta->pSuidIdx) tdbTbClose(pMeta->pSuidIdx);
|
|
||||||
if (pMeta->pNameIdx) tdbTbClose(pMeta->pNameIdx);
|
|
||||||
if (pMeta->pUidIdx) tdbTbClose(pMeta->pUidIdx);
|
|
||||||
if (pMeta->pSkmDb) tdbTbClose(pMeta->pSkmDb);
|
|
||||||
if (pMeta->pTbDb) tdbTbClose(pMeta->pTbDb);
|
|
||||||
if (pMeta->pEnv) tdbClose(pMeta->pEnv);
|
|
||||||
metaDestroyLock(pMeta);
|
|
||||||
|
|
||||||
taosMemoryFreeClear(*ppMeta);
|
if (ttlMgrNeedUpgrade(pMeta->pEnv)) {
|
||||||
|
code = metaBegin(pMeta, META_BEGIN_HEAP_OS);
|
||||||
|
if (code < 0) {
|
||||||
|
metaError("vgId:%d, failed to upgrade meta, meta begin failed since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = ttlMgrUpgrade(pMeta->pTtlMgr, pMeta);
|
||||||
|
if (code < 0) {
|
||||||
|
metaError("vgId:%d, failed to upgrade meta ttl since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = metaCommit(pMeta, pMeta->txn);
|
||||||
|
if (code < 0) {
|
||||||
|
metaError("vgId:%d, failed to upgrade meta ttl, meta commit failed since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
metaCleanup(ppMeta);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int metaClose(SMeta **ppMeta) {
|
||||||
|
metaCleanup(ppMeta);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -270,6 +264,32 @@ int32_t metaULock(SMeta *pMeta) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void metaCleanup(SMeta **ppMeta) {
|
||||||
|
SMeta *pMeta = *ppMeta;
|
||||||
|
if (pMeta) {
|
||||||
|
if (pMeta->pEnv) metaAbort(pMeta);
|
||||||
|
if (pMeta->pCache) metaCacheClose(pMeta);
|
||||||
|
if (pMeta->pIdx) metaCloseIdx(pMeta);
|
||||||
|
if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb);
|
||||||
|
if (pMeta->pNcolIdx) tdbTbClose(pMeta->pNcolIdx);
|
||||||
|
if (pMeta->pBtimeIdx) tdbTbClose(pMeta->pBtimeIdx);
|
||||||
|
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
|
||||||
|
if (pMeta->pTtlMgr) ttlMgrClose(pMeta->pTtlMgr);
|
||||||
|
if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx);
|
||||||
|
if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx);
|
||||||
|
if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx);
|
||||||
|
if (pMeta->pSuidIdx) tdbTbClose(pMeta->pSuidIdx);
|
||||||
|
if (pMeta->pNameIdx) tdbTbClose(pMeta->pNameIdx);
|
||||||
|
if (pMeta->pUidIdx) tdbTbClose(pMeta->pUidIdx);
|
||||||
|
if (pMeta->pSkmDb) tdbTbClose(pMeta->pSkmDb);
|
||||||
|
if (pMeta->pTbDb) tdbTbClose(pMeta->pTbDb);
|
||||||
|
if (pMeta->pEnv) tdbClose(pMeta->pEnv);
|
||||||
|
metaDestroyLock(pMeta);
|
||||||
|
|
||||||
|
taosMemoryFreeClear(*ppMeta);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int tbDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
|
static int tbDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
|
||||||
STbDbKey *pTbDbKey1 = (STbDbKey *)pKey1;
|
STbDbKey *pTbDbKey1 = (STbDbKey *)pKey1;
|
||||||
STbDbKey *pTbDbKey2 = (STbDbKey *)pKey2;
|
STbDbKey *pTbDbKey2 = (STbDbKey *)pKey2;
|
||||||
|
|
|
@ -21,6 +21,10 @@ typedef struct {
|
||||||
SMeta *pMeta;
|
SMeta *pMeta;
|
||||||
} SConvertData;
|
} SConvertData;
|
||||||
|
|
||||||
|
static void ttlMgrCleanup(STtlManger *pTtlMgr);
|
||||||
|
|
||||||
|
static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta);
|
||||||
|
|
||||||
static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid);
|
static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid);
|
||||||
static int ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
static int ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
||||||
static int ttlIdxKeyV1Cmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
static int ttlIdxKeyV1Cmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
||||||
|
@ -36,27 +40,17 @@ const char *ttlTbname = "ttl.idx";
|
||||||
const char *ttlV1Tbname = "ttlv1.idx";
|
const char *ttlV1Tbname = "ttlv1.idx";
|
||||||
|
|
||||||
int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) {
|
int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) {
|
||||||
int ret;
|
int ret = TSDB_CODE_SUCCESS;
|
||||||
|
int64_t startNs = taosGetTimestampNs();
|
||||||
|
|
||||||
*ppTtlMgr = NULL;
|
*ppTtlMgr = NULL;
|
||||||
|
|
||||||
STtlManger *pTtlMgr = (STtlManger *)tdbOsCalloc(1, sizeof(*pTtlMgr));
|
STtlManger *pTtlMgr = (STtlManger *)tdbOsCalloc(1, sizeof(*pTtlMgr));
|
||||||
if (pTtlMgr == NULL) {
|
if (pTtlMgr == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tdbTbExist(ttlTbname, pEnv)) {
|
|
||||||
ret = tdbTbOpen(ttlTbname, sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, pEnv, &pTtlMgr->pOldTtlIdx, rollback);
|
|
||||||
if (ret < 0) {
|
|
||||||
metaError("failed to open %s index since %s", ttlTbname, tstrerror(terrno));
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ret = tdbTbOpen(ttlV1Tbname, TDB_VARIANT_LEN, TDB_VARIANT_LEN, ttlIdxKeyV1Cmpr, pEnv, &pTtlMgr->pTtlIdx, rollback);
|
ret = tdbTbOpen(ttlV1Tbname, TDB_VARIANT_LEN, TDB_VARIANT_LEN, ttlIdxKeyV1Cmpr, pEnv, &pTtlMgr->pTtlIdx, rollback);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("failed to open %s since %s", ttlV1Tbname, tstrerror(terrno));
|
metaError("failed to open %s since %s", ttlV1Tbname, tstrerror(terrno));
|
||||||
|
|
||||||
tdbOsFree(pTtlMgr);
|
tdbOsFree(pTtlMgr);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -66,42 +60,57 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) {
|
||||||
|
|
||||||
taosThreadRwlockInit(&pTtlMgr->lock, NULL);
|
taosThreadRwlockInit(&pTtlMgr->lock, NULL);
|
||||||
|
|
||||||
|
ret = ttlMgrFillCache(pTtlMgr);
|
||||||
|
if (ret < 0) {
|
||||||
|
metaError("failed to fill hash since %s", tstrerror(terrno));
|
||||||
|
ttlMgrCleanup(pTtlMgr);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t endNs = taosGetTimestampNs();
|
||||||
|
metaInfo("ttl mgr open end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache),
|
||||||
|
endNs - startNs);
|
||||||
|
|
||||||
*ppTtlMgr = pTtlMgr;
|
*ppTtlMgr = pTtlMgr;
|
||||||
return 0;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ttlMgrClose(STtlManger *pTtlMgr) {
|
void ttlMgrClose(STtlManger *pTtlMgr) { ttlMgrCleanup(pTtlMgr); }
|
||||||
taosHashCleanup(pTtlMgr->pTtlCache);
|
|
||||||
taosHashCleanup(pTtlMgr->pDirtyUids);
|
bool ttlMgrNeedUpgrade(TDB *pEnv) {
|
||||||
tdbTbClose(pTtlMgr->pTtlIdx);
|
bool needUpgrade = tdbTbExist(ttlTbname, pEnv);
|
||||||
taosThreadRwlockDestroy(&pTtlMgr->lock);
|
if (needUpgrade) {
|
||||||
tdbOsFree(pTtlMgr);
|
metaInfo("find ttl idx in old version , will convert");
|
||||||
return 0;
|
}
|
||||||
|
return needUpgrade;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ttlMgrBegin(STtlManger *pTtlMgr, void *pMeta) {
|
int ttlMgrUpgrade(STtlManger *pTtlMgr, void *pMeta) {
|
||||||
metaInfo("ttl mgr start open");
|
SMeta *meta = (SMeta *)pMeta;
|
||||||
int ret;
|
int ret = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
if (!tdbTbExist(ttlTbname, meta->pEnv)) return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
metaInfo("ttl mgr start upgrade");
|
||||||
|
|
||||||
int64_t startNs = taosGetTimestampNs();
|
int64_t startNs = taosGetTimestampNs();
|
||||||
|
|
||||||
SMeta *meta = (SMeta *)pMeta;
|
ret = tdbTbOpen(ttlTbname, sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, meta->pEnv, &pTtlMgr->pOldTtlIdx, 0);
|
||||||
|
if (ret < 0) {
|
||||||
|
metaError("failed to open %s index since %s", ttlTbname, tstrerror(terrno));
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
|
||||||
if (pTtlMgr->pOldTtlIdx) {
|
ret = ttlMgrConvert(pTtlMgr->pOldTtlIdx, pTtlMgr->pTtlIdx, pMeta);
|
||||||
ret = ttlMgrConvert(pTtlMgr->pOldTtlIdx, pTtlMgr->pTtlIdx, pMeta);
|
if (ret < 0) {
|
||||||
if (ret < 0) {
|
metaError("failed to convert ttl index since %s", tstrerror(terrno));
|
||||||
metaError("failed to convert ttl index since %s", tstrerror(terrno));
|
goto _out;
|
||||||
goto _out;
|
}
|
||||||
}
|
|
||||||
|
|
||||||
ret = tdbTbDropByName(ttlTbname, meta->pEnv, meta->txn);
|
ret = tdbTbDropByName(ttlTbname, meta->pEnv, meta->txn);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("failed to drop old ttl index since %s", tstrerror(terrno));
|
metaError("failed to drop old ttl index since %s", tstrerror(terrno));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
|
||||||
|
|
||||||
tdbTbClose(pTtlMgr->pOldTtlIdx);
|
|
||||||
pTtlMgr->pOldTtlIdx = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = ttlMgrFillCache(pTtlMgr);
|
ret = ttlMgrFillCache(pTtlMgr);
|
||||||
|
@ -111,13 +120,23 @@ int ttlMgrBegin(STtlManger *pTtlMgr, void *pMeta) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t endNs = taosGetTimestampNs();
|
int64_t endNs = taosGetTimestampNs();
|
||||||
|
metaInfo("ttl mgr upgrade end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache),
|
||||||
metaInfo("ttl mgr open end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache),
|
|
||||||
endNs - startNs);
|
endNs - startNs);
|
||||||
_out:
|
_out:
|
||||||
|
tdbTbClose(pTtlMgr->pOldTtlIdx);
|
||||||
|
pTtlMgr->pOldTtlIdx = NULL;
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void ttlMgrCleanup(STtlManger *pTtlMgr) {
|
||||||
|
taosHashCleanup(pTtlMgr->pTtlCache);
|
||||||
|
taosHashCleanup(pTtlMgr->pDirtyUids);
|
||||||
|
tdbTbClose(pTtlMgr->pTtlIdx);
|
||||||
|
taosThreadRwlockDestroy(&pTtlMgr->lock);
|
||||||
|
tdbOsFree(pTtlMgr);
|
||||||
|
}
|
||||||
|
|
||||||
static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid) {
|
static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid) {
|
||||||
if (ttlDays <= 0) return;
|
if (ttlDays <= 0) return;
|
||||||
|
|
||||||
|
@ -205,7 +224,7 @@ _out:
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta) {
|
static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta) {
|
||||||
SMeta *meta = pMeta;
|
SMeta *meta = pMeta;
|
||||||
|
|
||||||
metaInfo("ttlMgr convert ttl start.");
|
metaInfo("ttlMgr convert ttl start.");
|
||||||
|
|
|
@ -76,7 +76,7 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *p
|
||||||
}
|
}
|
||||||
|
|
||||||
SSyncCfg *pCfg = &info.config.syncCfg;
|
SSyncCfg *pCfg = &info.config.syncCfg;
|
||||||
|
|
||||||
pCfg->replicaNum = 0;
|
pCfg->replicaNum = 0;
|
||||||
pCfg->totalReplicaNum = 0;
|
pCfg->totalReplicaNum = 0;
|
||||||
memset(&pCfg->nodeInfo, 0, sizeof(pCfg->nodeInfo));
|
memset(&pCfg->nodeInfo, 0, sizeof(pCfg->nodeInfo));
|
||||||
|
@ -109,7 +109,7 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *p
|
||||||
pCfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
|
pCfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d",
|
vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d",
|
||||||
pReq->vgId, pCfg->replicaNum, pCfg->totalReplicaNum, pCfg->myIndex);
|
pReq->vgId, pCfg->replicaNum, pCfg->totalReplicaNum, pCfg->myIndex);
|
||||||
|
|
||||||
info.config.syncCfg = *pCfg;
|
info.config.syncCfg = *pCfg;
|
||||||
|
@ -372,6 +372,10 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (metaUpgrade(pVnode, &pVnode->pMeta) < 0) {
|
||||||
|
vError("vgId:%d, failed to upgrade meta since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
|
}
|
||||||
|
|
||||||
// open tsdb
|
// open tsdb
|
||||||
if (!VND_IS_RSMA(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL, rollback) < 0) {
|
if (!VND_IS_RSMA(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL, rollback) < 0) {
|
||||||
vError("vgId:%d, failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
|
vError("vgId:%d, failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
|
|
|
@ -907,6 +907,10 @@ void nodesDestroyNode(SNode* pNode) {
|
||||||
SCreateIndexStmt* pStmt = (SCreateIndexStmt*)pNode;
|
SCreateIndexStmt* pStmt = (SCreateIndexStmt*)pNode;
|
||||||
nodesDestroyNode((SNode*)pStmt->pOptions);
|
nodesDestroyNode((SNode*)pStmt->pOptions);
|
||||||
nodesDestroyList(pStmt->pCols);
|
nodesDestroyList(pStmt->pCols);
|
||||||
|
if (pStmt->pReq) {
|
||||||
|
tFreeSMCreateSmaReq(pStmt->pReq);
|
||||||
|
taosMemoryFreeClear(pStmt->pReq);
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case QUERY_NODE_DROP_INDEX_STMT: // no pointer field
|
case QUERY_NODE_DROP_INDEX_STMT: // no pointer field
|
||||||
|
@ -1053,6 +1057,7 @@ void nodesDestroyNode(SNode* pNode) {
|
||||||
}
|
}
|
||||||
case QUERY_NODE_QUERY: {
|
case QUERY_NODE_QUERY: {
|
||||||
SQuery* pQuery = (SQuery*)pNode;
|
SQuery* pQuery = (SQuery*)pNode;
|
||||||
|
nodesDestroyNode(pQuery->pPrevRoot);
|
||||||
nodesDestroyNode(pQuery->pRoot);
|
nodesDestroyNode(pQuery->pRoot);
|
||||||
nodesDestroyNode(pQuery->pPostRoot);
|
nodesDestroyNode(pQuery->pPostRoot);
|
||||||
taosMemoryFreeClear(pQuery->pResSchema);
|
taosMemoryFreeClear(pQuery->pResSchema);
|
||||||
|
|
|
@ -35,6 +35,7 @@ int32_t translate(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* pMe
|
||||||
int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema);
|
int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema);
|
||||||
int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery);
|
int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery);
|
||||||
int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, void** pResRow);
|
int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, void** pResRow);
|
||||||
|
int32_t translatePostCreateSmaIndex(SParseContext* pParseCxt, SQuery* pQuery, void** pResRow);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -3520,6 +3520,10 @@ static int32_t translateWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||||
if (NULL == pSelect->pWindow) {
|
if (NULL == pSelect->pWindow) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
if (pSelect->pFromTable->type == QUERY_NODE_REAL_TABLE &&
|
||||||
|
((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType == TSDB_SYSTEM_TABLE) {
|
||||||
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED, "WINDOW");
|
||||||
|
}
|
||||||
pCxt->currClause = SQL_CLAUSE_WINDOW;
|
pCxt->currClause = SQL_CLAUSE_WINDOW;
|
||||||
int32_t code = translateExpr(pCxt, &pSelect->pWindow);
|
int32_t code = translateExpr(pCxt, &pSelect->pWindow);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
@ -5803,6 +5807,15 @@ static int32_t buildCreateSmaReq(STranslateContext* pCxt, SCreateIndexStmt* pStm
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = getSmaIndexAst(pCxt, pStmt, &pReq->ast, &pReq->astLen, &pReq->expr, &pReq->exprLen);
|
code = getSmaIndexAst(pCxt, pStmt, &pReq->ast, &pReq->astLen, &pReq->expr, &pReq->exprLen);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
STableMeta* pMetaCache = NULL;
|
||||||
|
code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pMetaCache);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
pStmt->pOptions->tsPrecision = pMetaCache->tableInfo.precision;
|
||||||
|
code = createLastTsSelectStmt(pStmt->dbName, pStmt->tableName, pMetaCache, &pStmt->pPrevQuery);
|
||||||
|
}
|
||||||
|
taosMemoryFreeClear(pMetaCache);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -5828,15 +5841,60 @@ static int32_t checkCreateSmaIndex(STranslateContext* pCxt, SCreateIndexStmt* pS
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t translateCreateSmaIndex(STranslateContext* pCxt, SCreateIndexStmt* pStmt) {
|
static int32_t translateCreateSmaIndex(STranslateContext* pCxt, SCreateIndexStmt* pStmt) {
|
||||||
SMCreateSmaReq createSmaReq = {0};
|
|
||||||
int32_t code = checkCreateSmaIndex(pCxt, pStmt);
|
int32_t code = checkCreateSmaIndex(pCxt, pStmt);
|
||||||
|
pStmt->pReq = taosMemoryCalloc(1, sizeof(SMCreateSmaReq));
|
||||||
|
if (pStmt->pReq == NULL) code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = buildCreateSmaReq(pCxt, pStmt, &createSmaReq);
|
code = buildCreateSmaReq(pCxt, pStmt, pStmt->pReq);
|
||||||
|
}
|
||||||
|
TSWAP(pCxt->pPrevRoot, pStmt->pPrevQuery);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t createIntervalFromCreateSmaIndexStmt(SCreateIndexStmt* pStmt, SInterval* pInterval) {
|
||||||
|
pInterval->interval = ((SValueNode*)pStmt->pOptions->pInterval)->datum.i;
|
||||||
|
pInterval->intervalUnit = ((SValueNode*)pStmt->pOptions->pInterval)->unit;
|
||||||
|
pInterval->offset = NULL != pStmt->pOptions->pOffset ? ((SValueNode*)pStmt->pOptions->pOffset)->datum.i : 0;
|
||||||
|
pInterval->sliding = NULL != pStmt->pOptions->pSliding ? ((SValueNode*)pStmt->pOptions->pSliding)->datum.i : pInterval->interval;
|
||||||
|
pInterval->slidingUnit = NULL != pStmt->pOptions->pSliding ? ((SValueNode*)pStmt->pOptions->pSliding)->unit : pInterval->intervalUnit;
|
||||||
|
pInterval->precision = pStmt->pOptions->tsPrecision;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t translatePostCreateSmaIndex(SParseContext* pParseCxt, SQuery* pQuery, void ** pResRow) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
SCreateIndexStmt* pStmt = (SCreateIndexStmt*)pQuery->pRoot;
|
||||||
|
int64_t lastTs = 0;
|
||||||
|
SInterval interval = {0};
|
||||||
|
STranslateContext pCxt = {0};
|
||||||
|
code = initTranslateContext(pParseCxt, NULL, &pCxt);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = createIntervalFromCreateSmaIndexStmt(pStmt, &interval);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = buildCmdMsg(pCxt, TDMT_MND_CREATE_SMA, (FSerializeFunc)tSerializeSMCreateSmaReq, &createSmaReq);
|
if (pResRow && pResRow[0]) {
|
||||||
|
lastTs = *(int64_t*)pResRow[0];
|
||||||
|
} else if (interval.interval > 0) {
|
||||||
|
lastTs = convertTimePrecision(taosGetTimestampMs(), TSDB_TIME_PRECISION_MILLI, interval.precision);
|
||||||
|
} else {
|
||||||
|
lastTs = taosGetTimestampMs();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
tFreeSMCreateSmaReq(&createSmaReq);
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
if (interval.interval > 0) {
|
||||||
|
pStmt->pReq->lastTs = taosTimeTruncate(lastTs, &interval);
|
||||||
|
} else {
|
||||||
|
pStmt->pReq->lastTs = lastTs;
|
||||||
|
}
|
||||||
|
code = buildCmdMsg(&pCxt, TDMT_MND_CREATE_SMA, (FSerializeFunc)tSerializeSMCreateSmaReq, pStmt->pReq);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = setQuery(&pCxt, pQuery);
|
||||||
|
}
|
||||||
|
setRefreshMate(&pCxt, pQuery);
|
||||||
|
destroyTranslateContext(&pCxt);
|
||||||
|
tFreeSMCreateSmaReq(pStmt->pReq);
|
||||||
|
taosMemoryFreeClear(pStmt->pReq);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6989,7 +7047,7 @@ static int32_t translateCreateStream(STranslateContext* pCxt, SCreateStreamStmt*
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t buildIntervalForCreateStream(SCreateStreamStmt* pStmt, SInterval* pInterval) {
|
static int32_t buildIntervalForCreateStream(SCreateStreamStmt* pStmt, SInterval* pInterval) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
if (QUERY_NODE_SELECT_STMT != nodeType(pStmt->pQuery)) {
|
if (QUERY_NODE_SELECT_STMT != nodeType(pStmt->pQuery)) {
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -172,6 +172,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
|
||||||
return "%s function is not supported in group query";
|
return "%s function is not supported in group query";
|
||||||
case TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED_FUNC:
|
case TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED_FUNC:
|
||||||
return "%s function is not supported in system table query";
|
return "%s function is not supported in system table query";
|
||||||
|
case TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED:
|
||||||
|
return "%s is not supported in system table query";
|
||||||
case TSDB_CODE_PAR_INVALID_INTERP_CLAUSE:
|
case TSDB_CODE_PAR_INVALID_INTERP_CLAUSE:
|
||||||
return "Invalid usage of RANGE clause, EVERY clause or FILL clause";
|
return "Invalid usage of RANGE clause, EVERY clause or FILL clause";
|
||||||
case TSDB_CODE_PAR_NO_VALID_FUNC_IN_WIN:
|
case TSDB_CODE_PAR_NO_VALID_FUNC_IN_WIN:
|
||||||
|
|
|
@ -227,6 +227,8 @@ int32_t qContinueParsePostQuery(SParseContext* pCxt, SQuery* pQuery, void** pRes
|
||||||
case QUERY_NODE_CREATE_STREAM_STMT:
|
case QUERY_NODE_CREATE_STREAM_STMT:
|
||||||
code = translatePostCreateStream(pCxt, pQuery, pResRow);
|
code = translatePostCreateStream(pCxt, pQuery, pResRow);
|
||||||
break;
|
break;
|
||||||
|
case QUERY_NODE_CREATE_INDEX_STMT:
|
||||||
|
code = translatePostCreateSmaIndex(pCxt, pQuery, pResRow);
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -542,6 +542,18 @@ TEST_F(ParserInitialCTest, createSmaIndex) {
|
||||||
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
|
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
|
||||||
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_CREATE_INDEX_STMT);
|
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_CREATE_INDEX_STMT);
|
||||||
SMCreateSmaReq req = {0};
|
SMCreateSmaReq req = {0};
|
||||||
|
ASSERT_TRUE(pQuery->pPrevRoot);
|
||||||
|
ASSERT_EQ(QUERY_NODE_SELECT_STMT, nodeType(pQuery->pPrevRoot));
|
||||||
|
|
||||||
|
SCreateIndexStmt* pStmt = (SCreateIndexStmt*)pQuery->pRoot;
|
||||||
|
SCmdMsgInfo* pCmdMsg = (SCmdMsgInfo*)taosMemoryMalloc(sizeof(SCmdMsgInfo));
|
||||||
|
if (NULL == pCmdMsg) FAIL();
|
||||||
|
pCmdMsg->msgType = TDMT_MND_CREATE_SMA;
|
||||||
|
pCmdMsg->msgLen = tSerializeSMCreateSmaReq(NULL, 0, pStmt->pReq);
|
||||||
|
pCmdMsg->pMsg = taosMemoryMalloc(pCmdMsg->msgLen);
|
||||||
|
if (!pCmdMsg->pMsg) FAIL();
|
||||||
|
tSerializeSMCreateSmaReq(pCmdMsg->pMsg, pCmdMsg->msgLen, pStmt->pReq);
|
||||||
|
((SQuery*)pQuery)->pCmdMsg = pCmdMsg;
|
||||||
ASSERT_TRUE(TSDB_CODE_SUCCESS == tDeserializeSMCreateSmaReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req));
|
ASSERT_TRUE(TSDB_CODE_SUCCESS == tDeserializeSMCreateSmaReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req));
|
||||||
|
|
||||||
ASSERT_EQ(std::string(req.name), std::string(expect.name));
|
ASSERT_EQ(std::string(req.name), std::string(expect.name));
|
||||||
|
|
|
@ -291,4 +291,13 @@ TEST_F(ParserInitialDTest, dropUser) {
|
||||||
run("DROP USER wxy");
|
run("DROP USER wxy");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(ParserInitialDTest, IntervalOnSysTable) {
|
||||||
|
login("root");
|
||||||
|
run("SELECT count('reboot_time') FROM information_schema.ins_dnodes interval(14m) sliding(9m)",
|
||||||
|
TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED, PARSER_STAGE_TRANSLATE);
|
||||||
|
|
||||||
|
run("SELECT count('create_time') FROM information_schema.ins_qnodes interval(14m) sliding(9m)",
|
||||||
|
TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED, PARSER_STAGE_TRANSLATE);
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace ParserTest
|
} // namespace ParserTest
|
||||||
|
|
|
@ -441,6 +441,16 @@ class PlannerTestBaseImpl {
|
||||||
pCxt->topicQuery = true;
|
pCxt->topicQuery = true;
|
||||||
} else if (QUERY_NODE_CREATE_INDEX_STMT == nodeType(pQuery->pRoot)) {
|
} else if (QUERY_NODE_CREATE_INDEX_STMT == nodeType(pQuery->pRoot)) {
|
||||||
SMCreateSmaReq req = {0};
|
SMCreateSmaReq req = {0};
|
||||||
|
SCreateIndexStmt* pStmt = (SCreateIndexStmt*)pQuery->pRoot;
|
||||||
|
SCmdMsgInfo* pCmdMsg = (SCmdMsgInfo*)taosMemoryMalloc(sizeof(SCmdMsgInfo));
|
||||||
|
if (NULL == pCmdMsg) FAIL();
|
||||||
|
pCmdMsg->msgType = TDMT_MND_CREATE_SMA;
|
||||||
|
pCmdMsg->msgLen = tSerializeSMCreateSmaReq(NULL, 0, pStmt->pReq);
|
||||||
|
pCmdMsg->pMsg = taosMemoryMalloc(pCmdMsg->msgLen);
|
||||||
|
if (!pCmdMsg->pMsg) FAIL();
|
||||||
|
tSerializeSMCreateSmaReq(pCmdMsg->pMsg, pCmdMsg->msgLen, pStmt->pReq);
|
||||||
|
((SQuery*)pQuery)->pCmdMsg = pCmdMsg;
|
||||||
|
|
||||||
tDeserializeSMCreateSmaReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req);
|
tDeserializeSMCreateSmaReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req);
|
||||||
g_mockCatalogService->createSmaIndex(&req);
|
g_mockCatalogService->createSmaIndex(&req);
|
||||||
nodesStringToNode(req.ast, &pCxt->pAstRoot);
|
nodesStringToNode(req.ast, &pCxt->pAstRoot);
|
||||||
|
|
|
@ -351,9 +351,13 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
|
||||||
|
|
||||||
static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId);
|
SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId);
|
||||||
qDebug("s-task:%s scan history task end, update stream task:%s info, transfer exec state", pTask->id.idStr, pStreamTask->id.idStr);
|
if (pStreamTask == NULL) {
|
||||||
|
qError("s-task:%s failed to find related stream task:0x%x, it may have been destoryed or closed",
|
||||||
// todo handle stream task is dropped here
|
pTask->id.idStr, pTask->streamTaskId.taskId);
|
||||||
|
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||||
|
} else {
|
||||||
|
qDebug("s-task:%s scan history task end, update stream task:%s info, transfer exec state", pTask->id.idStr, pStreamTask->id.idStr);
|
||||||
|
}
|
||||||
|
|
||||||
ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId);
|
ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId);
|
||||||
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
|
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
|
||||||
|
@ -377,7 +381,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
// update the scan data range for source task.
|
// update the scan data range for source task.
|
||||||
qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64
|
qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64
|
||||||
", status:%s, sched-status:%d",
|
", status:%s, sched-status:%d",
|
||||||
pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
|
pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
|
||||||
pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus);
|
pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus);
|
||||||
} else {
|
} else {
|
||||||
|
@ -473,6 +477,9 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
ASSERT(batchSize == 0);
|
ASSERT(batchSize == 0);
|
||||||
if (pTask->info.fillHistory && pTask->status.transferState) {
|
if (pTask->info.fillHistory && pTask->status.transferState) {
|
||||||
int32_t code = streamTransferStateToStreamTask(pTask);
|
int32_t code = streamTransferStateToStreamTask(pTask);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) { // todo handle this
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
|
@ -564,7 +571,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
|
||||||
|
|
||||||
if (schedStatus == TASK_SCHED_STATUS__WAITING) {
|
if (schedStatus == TASK_SCHED_STATUS__WAITING) {
|
||||||
int32_t code = streamExecForAll(pTask);
|
int32_t code = streamExecForAll(pTask);
|
||||||
if (code < 0) {
|
if (code < 0) { // todo this status shoudl be removed
|
||||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED);
|
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -233,7 +233,11 @@ int tdbBtreeDelete(SBTree *pBt, const void *pKey, int kLen, TXN *pTxn) {
|
||||||
int ret;
|
int ret;
|
||||||
|
|
||||||
tdbBtcOpen(&btc, pBt, pTxn);
|
tdbBtcOpen(&btc, pBt, pTxn);
|
||||||
|
/*
|
||||||
|
btc.coder.ofps = taosArrayInit(8, sizeof(SPage *));
|
||||||
|
// btc.coder.ofps = taosArrayInit(8, sizeof(SPgno));
|
||||||
|
//pBtc->coder.ofps = taosArrayInit(8, sizeof(SPage *));
|
||||||
|
*/
|
||||||
tdbTrace("tdb delete, btc: %p, pTxn: %p", &btc, pTxn);
|
tdbTrace("tdb delete, btc: %p, pTxn: %p", &btc, pTxn);
|
||||||
|
|
||||||
// move the cursor
|
// move the cursor
|
||||||
|
@ -254,7 +258,18 @@ int tdbBtreeDelete(SBTree *pBt, const void *pKey, int kLen, TXN *pTxn) {
|
||||||
tdbBtcClose(&btc);
|
tdbBtcClose(&btc);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
/*
|
||||||
|
SArray *ofps = btc.coder.ofps;
|
||||||
|
if (ofps) {
|
||||||
|
for (int i = 0; i < TARRAY_SIZE(ofps); ++i) {
|
||||||
|
SPage *ofp = *(SPage **)taosArrayGet(ofps, i);
|
||||||
|
tdbPagerInsertFreePage(btc.pBt->pPager, ofp, btc.pTxn);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(ofps);
|
||||||
|
btc.coder.ofps = NULL;
|
||||||
|
}
|
||||||
|
*/
|
||||||
tdbBtcClose(&btc);
|
tdbBtcClose(&btc);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -563,6 +578,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// copy the parent key out if child pages are not leaf page
|
// copy the parent key out if child pages are not leaf page
|
||||||
|
// childNotLeaf = !(TDB_BTREE_PAGE_IS_LEAF(pOlds[0]) || TDB_BTREE_PAGE_IS_OVFL(pOlds[0]));
|
||||||
childNotLeaf = !TDB_BTREE_PAGE_IS_LEAF(pOlds[0]);
|
childNotLeaf = !TDB_BTREE_PAGE_IS_LEAF(pOlds[0]);
|
||||||
if (childNotLeaf) {
|
if (childNotLeaf) {
|
||||||
for (int i = 0; i < nOlds; i++) {
|
for (int i = 0; i < nOlds; i++) {
|
||||||
|
@ -592,7 +608,30 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
|
||||||
for (int i = 0; i < nOlds; i++) {
|
for (int i = 0; i < nOlds; i++) {
|
||||||
nCells = TDB_PAGE_TOTAL_CELLS(pParent);
|
nCells = TDB_PAGE_TOTAL_CELLS(pParent);
|
||||||
if (sIdx < nCells) {
|
if (sIdx < nCells) {
|
||||||
|
bool destroyOfps = false;
|
||||||
|
if (!childNotLeaf) {
|
||||||
|
if (!pParent->pPager->ofps) {
|
||||||
|
pParent->pPager->ofps = taosArrayInit(8, sizeof(SPage *));
|
||||||
|
destroyOfps = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
tdbPageDropCell(pParent, sIdx, pTxn, pBt);
|
tdbPageDropCell(pParent, sIdx, pTxn, pBt);
|
||||||
|
|
||||||
|
if (!childNotLeaf) {
|
||||||
|
SArray *ofps = pParent->pPager->ofps;
|
||||||
|
if (ofps) {
|
||||||
|
for (int i = 0; i < TARRAY_SIZE(ofps); ++i) {
|
||||||
|
SPage *ofp = *(SPage **)taosArrayGet(ofps, i);
|
||||||
|
tdbPagerInsertFreePage(pParent->pPager, ofp, pTxn);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (destroyOfps) {
|
||||||
|
taosArrayDestroy(ofps);
|
||||||
|
pParent->pPager->ofps = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
((SIntHdr *)pParent->pData)->pgno = 0;
|
((SIntHdr *)pParent->pData)->pgno = 0;
|
||||||
}
|
}
|
||||||
|
@ -861,6 +900,8 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
|
||||||
if (!TDB_BTREE_PAGE_IS_LEAF(pNews[0])) {
|
if (!TDB_BTREE_PAGE_IS_LEAF(pNews[0])) {
|
||||||
((SIntHdr *)(pParent->pData))->pgno = ((SIntHdr *)(pNews[0]->pData))->pgno;
|
((SIntHdr *)(pParent->pData))->pgno = ((SIntHdr *)(pNews[0]->pData))->pgno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tdbPagerInsertFreePage(pBt->pPager, pNews[0], pTxn);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < 3; i++) {
|
for (int i = 0; i < 3; i++) {
|
||||||
|
@ -870,6 +911,9 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
|
||||||
}
|
}
|
||||||
|
|
||||||
for (pageIdx = 0; pageIdx < nOlds; ++pageIdx) {
|
for (pageIdx = 0; pageIdx < nOlds; ++pageIdx) {
|
||||||
|
if (pageIdx >= nNews) {
|
||||||
|
tdbPagerInsertFreePage(pBt->pPager, pOlds[pageIdx], pTxn);
|
||||||
|
}
|
||||||
tdbPagerReturnPage(pBt->pPager, pOlds[pageIdx], pTxn);
|
tdbPagerReturnPage(pBt->pPager, pOlds[pageIdx], pTxn);
|
||||||
}
|
}
|
||||||
for (; pageIdx < nNews; ++pageIdx) {
|
for (; pageIdx < nNews; ++pageIdx) {
|
||||||
|
@ -1311,7 +1355,11 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader,
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
/*
|
||||||
|
if (pDecoder->ofps) {
|
||||||
|
taosArrayPush(pDecoder->ofps, &ofp);
|
||||||
|
}
|
||||||
|
*/
|
||||||
ofpCell = tdbPageGetCell(ofp, 0);
|
ofpCell = tdbPageGetCell(ofp, 0);
|
||||||
|
|
||||||
if (nLeft <= ofp->maxLocal - sizeof(SPgno)) {
|
if (nLeft <= ofp->maxLocal - sizeof(SPgno)) {
|
||||||
|
@ -1346,11 +1394,17 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader,
|
||||||
int lastKeyPageSpace = 0;
|
int lastKeyPageSpace = 0;
|
||||||
// load left key & val to ovpages
|
// load left key & val to ovpages
|
||||||
while (pgno != 0) {
|
while (pgno != 0) {
|
||||||
|
tdbTrace("tdb decode-ofp, pTxn: %p, pgno:%u by cell:%p", pTxn, pgno, pCell);
|
||||||
|
// printf("tdb decode-ofp, pTxn: %p, pgno:%u by cell:%p\n", pTxn, pgno, pCell);
|
||||||
ret = tdbLoadOvflPage(&pgno, &ofp, pTxn, pBt);
|
ret = tdbLoadOvflPage(&pgno, &ofp, pTxn, pBt);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
/*
|
||||||
|
if (pDecoder->ofps) {
|
||||||
|
taosArrayPush(pDecoder->ofps, &ofp);
|
||||||
|
}
|
||||||
|
*/
|
||||||
ofpCell = tdbPageGetCell(ofp, 0);
|
ofpCell = tdbPageGetCell(ofp, 0);
|
||||||
|
|
||||||
int lastKeyPage = 0;
|
int lastKeyPage = 0;
|
||||||
|
@ -1518,8 +1572,8 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN *
|
||||||
|
|
||||||
if (pPage->vLen == TDB_VARIANT_LEN) {
|
if (pPage->vLen == TDB_VARIANT_LEN) {
|
||||||
if (!leaf) {
|
if (!leaf) {
|
||||||
tdbError("tdb/btree-cell-size: not a leaf page.");
|
tdbError("tdb/btree-cell-size: not a leaf page:%p, pgno:%" PRIu32 ".", pPage, TDB_PAGE_PGNO(pPage));
|
||||||
return -1;
|
// return -1;
|
||||||
}
|
}
|
||||||
nHeader += tdbGetVarInt(pCell + nHeader, &vLen);
|
nHeader += tdbGetVarInt(pCell + nHeader, &vLen);
|
||||||
} else if (leaf) {
|
} else if (leaf) {
|
||||||
|
@ -1559,8 +1613,27 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN *
|
||||||
bytes = ofp->maxLocal - sizeof(SPgno);
|
bytes = ofp->maxLocal - sizeof(SPgno);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SPgno origPgno = pgno;
|
||||||
memcpy(&pgno, ofpCell + bytes, sizeof(pgno));
|
memcpy(&pgno, ofpCell + bytes, sizeof(pgno));
|
||||||
|
|
||||||
|
ret = tdbPagerWrite(pBt->pPager, ofp);
|
||||||
|
if (ret < 0) {
|
||||||
|
tdbError("failed to write page since %s", terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
tdbPageDropCell(ofp, 0, pTxn, pBt);
|
||||||
|
*/
|
||||||
|
// SIntHdr *pIntHdr = (SIntHdr *)(ofp->pData);
|
||||||
|
// pIntHdr->flags = TDB_FLAG_ADD(0, TDB_BTREE_OVFL);
|
||||||
|
// pIntHdr->pgno = 0;
|
||||||
|
// ofp->pPager = NULL;
|
||||||
|
|
||||||
|
SArray *ofps = pPage->pPager->ofps;
|
||||||
|
if (ofps) {
|
||||||
|
taosArrayPush(ofps, &ofp);
|
||||||
|
}
|
||||||
|
|
||||||
tdbPagerReturnPage(pPage->pPager, ofp, pTxn);
|
tdbPagerReturnPage(pPage->pPager, ofp, pTxn);
|
||||||
|
|
||||||
nLeft -= bytes;
|
nLeft -= bytes;
|
||||||
|
@ -1980,6 +2053,11 @@ static int tdbBtcMoveDownward(SBTC *pBtc) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (TDB_BTREE_PAGE_IS_OVFL(pBtc->pPage)) {
|
||||||
|
tdbError("tdb/btc-move-downward: should not be a ovfl page here.");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
if (pBtc->idx < TDB_PAGE_TOTAL_CELLS(pBtc->pPage)) {
|
if (pBtc->idx < TDB_PAGE_TOTAL_CELLS(pBtc->pPage)) {
|
||||||
pCell = tdbPageGetCell(pBtc->pPage, pBtc->idx);
|
pCell = tdbPageGetCell(pBtc->pPage, pBtc->idx);
|
||||||
pgno = ((SPgno *)pCell)[0];
|
pgno = ((SPgno *)pCell)[0];
|
||||||
|
@ -2068,8 +2146,27 @@ int tdbBtcDelete(SBTC *pBtc) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool destroyOfps = false;
|
||||||
|
if (!pBtc->pPage->pPager->ofps) {
|
||||||
|
pBtc->pPage->pPager->ofps = taosArrayInit(8, sizeof(SPage *));
|
||||||
|
destroyOfps = true;
|
||||||
|
}
|
||||||
|
|
||||||
tdbPageDropCell(pBtc->pPage, idx, pBtc->pTxn, pBtc->pBt);
|
tdbPageDropCell(pBtc->pPage, idx, pBtc->pTxn, pBtc->pBt);
|
||||||
|
|
||||||
|
SArray *ofps = pBtc->pPage->pPager->ofps;
|
||||||
|
if (ofps) {
|
||||||
|
for (int i = 0; i < TARRAY_SIZE(ofps); ++i) {
|
||||||
|
SPage *ofp = *(SPage **)taosArrayGet(ofps, i);
|
||||||
|
tdbPagerInsertFreePage(pBtc->pPage->pPager, ofp, pBtc->pTxn);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (destroyOfps) {
|
||||||
|
taosArrayDestroy(ofps);
|
||||||
|
pBtc->pPage->pPager->ofps = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// update interior page or do balance
|
// update interior page or do balance
|
||||||
if (idx == nCells - 1) {
|
if (idx == nCells - 1) {
|
||||||
if (idx) {
|
if (idx) {
|
||||||
|
@ -2113,6 +2210,8 @@ int tdbBtcDelete(SBTC *pBtc) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// printf("tdb/btc-delete: btree balance delete pgno: %d.\n", TDB_PAGE_PGNO(pBtc->pPage));
|
||||||
|
|
||||||
ret = tdbBtreeBalance(pBtc);
|
ret = tdbBtreeBalance(pBtc);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
tdbError("tdb/btc-delete: btree balance failed with ret: %d.", ret);
|
tdbError("tdb/btc-delete: btree balance failed with ret: %d.", ret);
|
||||||
|
@ -2181,7 +2280,13 @@ int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pData, int
|
||||||
tdbError("tdb/btc-upsert: page insert/update cell failed with ret: %d.", ret);
|
tdbError("tdb/btc-upsert: page insert/update cell failed with ret: %d.", ret);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
/*
|
||||||
|
bool destroyOfps = false;
|
||||||
|
if (!pBtc->pPage->pPager->ofps) {
|
||||||
|
pBtc->pPage->pPager->ofps = taosArrayInit(8, sizeof(SPage *));
|
||||||
|
destroyOfps = true;
|
||||||
|
}
|
||||||
|
*/
|
||||||
// check balance
|
// check balance
|
||||||
if (pBtc->pPage->nOverflow > 0) {
|
if (pBtc->pPage->nOverflow > 0) {
|
||||||
ret = tdbBtreeBalance(pBtc);
|
ret = tdbBtreeBalance(pBtc);
|
||||||
|
@ -2190,7 +2295,20 @@ int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pData, int
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
/*
|
||||||
|
SArray *ofps = pBtc->pPage->pPager->ofps;
|
||||||
|
if (ofps) {
|
||||||
|
for (int i = 0; i < TARRAY_SIZE(ofps); ++i) {
|
||||||
|
SPage *ofp = *(SPage **)taosArrayGet(ofps, i);
|
||||||
|
tdbPagerInsertFreePage(pBtc->pPage->pPager, ofp, pBtc->pTxn);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (destroyOfps) {
|
||||||
|
taosArrayDestroy(ofps);
|
||||||
|
pBtc->pPage->pPager->ofps = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*/
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -70,6 +70,11 @@ int32_t tdbOpen(const char *dbname, int32_t szPage, int32_t pages, TDB **ppDb, i
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ret = tdbTbOpen(TDB_FREEDB_NAME, sizeof(SPgno), 0, NULL, pDb, &pDb->pFreeDb, rollback);
|
||||||
|
if (ret < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
*ppDb = pDb;
|
*ppDb = pDb;
|
||||||
|
@ -82,6 +87,7 @@ int tdbClose(TDB *pDb) {
|
||||||
if (pDb) {
|
if (pDb) {
|
||||||
#ifdef USE_MAINDB
|
#ifdef USE_MAINDB
|
||||||
if (pDb->pMainDb) tdbTbClose(pDb->pMainDb);
|
if (pDb->pMainDb) tdbTbClose(pDb->pMainDb);
|
||||||
|
if (pDb->pFreeDb) tdbTbClose(pDb->pFreeDb);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
for (pPager = pDb->pgrList; pPager; pPager = pDb->pgrList) {
|
for (pPager = pDb->pgrList; pPager; pPager = pDb->pgrList) {
|
||||||
|
|
|
@ -292,7 +292,23 @@ int tdbPagerBegin(SPager *pPager, TXN *pTxn) {
|
||||||
*/
|
*/
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
/*
|
||||||
|
int tdbPagerCancelDirty(SPager *pPager, SPage *pPage, TXN *pTxn) {
|
||||||
|
SRBTreeNode *pNode = tRBTreeGet(&pPager->rbt, (SRBTreeNode *)pPage);
|
||||||
|
if (pNode) {
|
||||||
|
pPage->isDirty = 0;
|
||||||
|
|
||||||
|
tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
|
||||||
|
if (pTxn->jPageSet) {
|
||||||
|
hashset_remove(pTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage)));
|
||||||
|
}
|
||||||
|
|
||||||
|
tdbPCacheRelease(pPager->pCache, pPage, pTxn);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
*/
|
||||||
int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
|
int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
|
||||||
SPage *pPage;
|
SPage *pPage;
|
||||||
int ret;
|
int ret;
|
||||||
|
@ -338,10 +354,13 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
|
||||||
if (pTxn->jPageSet) {
|
if (pTxn->jPageSet) {
|
||||||
hashset_remove(pTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage)));
|
hashset_remove(pTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tdbTrace("tdb/pager-commit: remove page: %p %d from dirty tree: %p", pPage, TDB_PAGE_PGNO(pPage), &pPager->rbt);
|
||||||
|
|
||||||
tdbPCacheRelease(pPager->pCache, pPage, pTxn);
|
tdbPCacheRelease(pPager->pCache, pPage, pTxn);
|
||||||
}
|
}
|
||||||
|
|
||||||
tdbTrace("pager/commit reset dirty tree: %p", &pPager->rbt);
|
tdbTrace("tdb/pager-commit reset dirty tree: %p", &pPager->rbt);
|
||||||
tRBTreeCreate(&pPager->rbt, pageCmpFn);
|
tRBTreeCreate(&pPager->rbt, pageCmpFn);
|
||||||
|
|
||||||
// sync the db file
|
// sync the db file
|
||||||
|
@ -629,6 +648,8 @@ int tdbPagerFlushPage(SPager *pPager, TXN *pTxn) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno, TXN *pTxn);
|
||||||
|
|
||||||
int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *, int), void *arg,
|
int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *, int), void *arg,
|
||||||
TXN *pTxn) {
|
TXN *pTxn) {
|
||||||
SPage *pPage;
|
SPage *pPage;
|
||||||
|
@ -643,7 +664,7 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa
|
||||||
// alloc new page
|
// alloc new page
|
||||||
if (pgno == 0) {
|
if (pgno == 0) {
|
||||||
loadPage = 0;
|
loadPage = 0;
|
||||||
ret = tdbPagerAllocPage(pPager, &pgno);
|
ret = tdbPagerAllocPage(pPager, &pgno, pTxn);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
tdbError("tdb/pager: %p, ret: %d pgno: %" PRIu32 ", alloc page failed.", pPager, ret, pgno);
|
tdbError("tdb/pager: %p, ret: %d pgno: %" PRIu32 ", alloc page failed.", pPager, ret, pgno);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -695,23 +716,86 @@ void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn) {
|
||||||
// TDB_PAGE_PGNO(pPage), pPage);
|
// TDB_PAGE_PGNO(pPage), pPage);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tdbPagerAllocFreePage(SPager *pPager, SPgno *ppgno) {
|
int tdbPagerInsertFreePage(SPager *pPager, SPage *pPage, TXN *pTxn) {
|
||||||
// TODO: Allocate a page from the free list
|
int code = 0;
|
||||||
|
SPgno pgno = TDB_PAGE_PGNO(pPage);
|
||||||
|
|
||||||
|
// memset(pPage->pData, 0, pPage->pageSize);
|
||||||
|
tdbTrace("tdb/insert-free-page: tbc recycle page: %d.", pgno);
|
||||||
|
// printf("tdb/insert-free-page: tbc recycle page: %d.\n", pgno);
|
||||||
|
code = tdbTbInsert(pPager->pEnv->pFreeDb, &pgno, sizeof(pgno), NULL, 0, pTxn);
|
||||||
|
if (code < 0) {
|
||||||
|
tdbError("tdb/insert-free-page: tb insert failed with ret: %d.", code);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pPage->pPager = NULL;
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int tdbPagerRemoveFreePage(SPager *pPager, SPgno *pPgno, TXN *pTxn) {
|
||||||
|
int code = 0;
|
||||||
|
TBC *pCur;
|
||||||
|
|
||||||
|
if (!pPager->pEnv->pFreeDb) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tdbTbcOpen(pPager->pEnv->pFreeDb, &pCur, pTxn);
|
||||||
|
if (code < 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tdbTbcMoveToFirst(pCur);
|
||||||
|
if (code) {
|
||||||
|
tdbError("tdb/remove-free-page: moveto first failed with ret: %d.", code);
|
||||||
|
tdbTbcClose(pCur);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *pKey = NULL;
|
||||||
|
int nKey = 0;
|
||||||
|
|
||||||
|
code = tdbTbcGet(pCur, (const void **)&pKey, &nKey, NULL, NULL);
|
||||||
|
if (code < 0) {
|
||||||
|
// tdbError("tdb/remove-free-page: tbc get failed with ret: %d.", code);
|
||||||
|
tdbTbcClose(pCur);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
*pPgno = *(SPgno *)pKey;
|
||||||
|
tdbTrace("tdb/remove-free-page: tbc get page: %d.", *pPgno);
|
||||||
|
// printf("tdb/remove-free-page: tbc get page: %d.\n", *pPgno);
|
||||||
|
|
||||||
|
code = tdbTbcDelete(pCur);
|
||||||
|
if (code < 0) {
|
||||||
|
tdbError("tdb/remove-free-page: tbc delete failed with ret: %d.", code);
|
||||||
|
tdbTbcClose(pCur);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
tdbTbcClose(pCur);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int tdbPagerAllocFreePage(SPager *pPager, SPgno *ppgno, TXN *pTxn) {
|
||||||
|
// Allocate a page from the free list
|
||||||
|
return tdbPagerRemoveFreePage(pPager, ppgno, pTxn);
|
||||||
|
}
|
||||||
|
|
||||||
static int tdbPagerAllocNewPage(SPager *pPager, SPgno *ppgno) {
|
static int tdbPagerAllocNewPage(SPager *pPager, SPgno *ppgno) {
|
||||||
*ppgno = ++pPager->dbFileSize;
|
*ppgno = ++pPager->dbFileSize;
|
||||||
|
// tdbError("tdb/alloc-new-page: %d.", *ppgno);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno) {
|
static int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno, TXN *pTxn) {
|
||||||
int ret;
|
int ret;
|
||||||
|
|
||||||
*ppgno = 0;
|
*ppgno = 0;
|
||||||
|
|
||||||
// Try to allocate from the free list of the pager
|
// Try to allocate from the free list of the pager
|
||||||
ret = tdbPagerAllocFreePage(pPager, ppgno);
|
ret = tdbPagerAllocFreePage(pPager, ppgno, pTxn);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -131,13 +131,14 @@ typedef struct SBtInfo {
|
||||||
#define TDB_CELLDECODER_FREE_VAL(pCellDecoder) ((pCellDecoder)->freeKV & TDB_CELLD_F_VAL)
|
#define TDB_CELLDECODER_FREE_VAL(pCellDecoder) ((pCellDecoder)->freeKV & TDB_CELLD_F_VAL)
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int kLen;
|
int kLen;
|
||||||
u8 *pKey;
|
u8 *pKey;
|
||||||
int vLen;
|
int vLen;
|
||||||
u8 *pVal;
|
u8 *pVal;
|
||||||
SPgno pgno;
|
SPgno pgno;
|
||||||
u8 *pBuf;
|
u8 *pBuf;
|
||||||
u8 freeKV;
|
u8 freeKV;
|
||||||
|
SArray *ofps;
|
||||||
} SCellDecoder;
|
} SCellDecoder;
|
||||||
|
|
||||||
struct SBTC {
|
struct SBTC {
|
||||||
|
@ -198,9 +199,10 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn);
|
||||||
int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *, int), void *arg,
|
int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *, int), void *arg,
|
||||||
TXN *pTxn);
|
TXN *pTxn);
|
||||||
void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn);
|
void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn);
|
||||||
int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno);
|
int tdbPagerInsertFreePage(SPager *pPager, SPage *pPage, TXN *pTxn);
|
||||||
int tdbPagerRestoreJournals(SPager *pPager);
|
// int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno);
|
||||||
int tdbPagerRollback(SPager *pPager);
|
int tdbPagerRestoreJournals(SPager *pPager);
|
||||||
|
int tdbPagerRollback(SPager *pPager);
|
||||||
|
|
||||||
// tdbPCache.c ====================================
|
// tdbPCache.c ====================================
|
||||||
#define TDB_PCACHE_PAGE \
|
#define TDB_PCACHE_PAGE \
|
||||||
|
@ -373,6 +375,7 @@ static inline SCell *tdbPageGetCell(SPage *pPage, int idx) {
|
||||||
|
|
||||||
#ifdef USE_MAINDB
|
#ifdef USE_MAINDB
|
||||||
#define TDB_MAINDB_NAME "main.tdb"
|
#define TDB_MAINDB_NAME "main.tdb"
|
||||||
|
#define TDB_FREEDB_NAME "_free.db"
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
struct STDB {
|
struct STDB {
|
||||||
|
@ -386,6 +389,7 @@ struct STDB {
|
||||||
SPager **pgrHash;
|
SPager **pgrHash;
|
||||||
#ifdef USE_MAINDB
|
#ifdef USE_MAINDB
|
||||||
TTB *pMainDb;
|
TTB *pMainDb;
|
||||||
|
TTB *pFreeDb;
|
||||||
#endif
|
#endif
|
||||||
int64_t txnId;
|
int64_t txnId;
|
||||||
};
|
};
|
||||||
|
@ -403,6 +407,7 @@ struct SPager {
|
||||||
SRBTree rbt;
|
SRBTree rbt;
|
||||||
// u8 inTran;
|
// u8 inTran;
|
||||||
TXN *pActiveTxn;
|
TXN *pActiveTxn;
|
||||||
|
SArray *ofps;
|
||||||
SPager *pNext; // used by TDB
|
SPager *pNext; // used by TDB
|
||||||
SPager *pHashNext; // used by TDB
|
SPager *pHashNext; // used by TDB
|
||||||
#ifdef USE_MAINDB
|
#ifdef USE_MAINDB
|
||||||
|
|
|
@ -14,3 +14,7 @@ target_link_libraries(tdbExOVFLTest tdb gtest gtest_main)
|
||||||
add_executable(tdbPageDefragmentTest "tdbPageDefragmentTest.cpp")
|
add_executable(tdbPageDefragmentTest "tdbPageDefragmentTest.cpp")
|
||||||
target_link_libraries(tdbPageDefragmentTest tdb gtest gtest_main)
|
target_link_libraries(tdbPageDefragmentTest tdb gtest gtest_main)
|
||||||
|
|
||||||
|
# page recycling testing
|
||||||
|
add_executable(tdbPageRecycleTest "tdbPageRecycleTest.cpp")
|
||||||
|
target_link_libraries(tdbPageRecycleTest tdb gtest gtest_main)
|
||||||
|
|
||||||
|
|
|
@ -190,6 +190,15 @@ static void insertOfp(void) {
|
||||||
// commit current transaction
|
// commit current transaction
|
||||||
tdbCommit(pEnv, txn);
|
tdbCommit(pEnv, txn);
|
||||||
tdbPostCommit(pEnv, txn);
|
tdbPostCommit(pEnv, txn);
|
||||||
|
|
||||||
|
closePool(pPool);
|
||||||
|
|
||||||
|
// Close a database
|
||||||
|
tdbTbClose(pDb);
|
||||||
|
|
||||||
|
// Close Env
|
||||||
|
ret = tdbClose(pEnv);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TEST(TdbOVFLPagesTest, DISABLED_TbInsertTest) {
|
// TEST(TdbOVFLPagesTest, DISABLED_TbInsertTest) {
|
||||||
|
@ -233,6 +242,13 @@ TEST(TdbOVFLPagesTest, TbGetTest) {
|
||||||
|
|
||||||
tdbFree(pVal);
|
tdbFree(pVal);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close a database
|
||||||
|
tdbTbClose(pDb);
|
||||||
|
|
||||||
|
// Close Env
|
||||||
|
ret = tdbClose(pEnv);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TEST(TdbOVFLPagesTest, DISABLED_TbDeleteTest) {
|
// TEST(TdbOVFLPagesTest, DISABLED_TbDeleteTest) {
|
||||||
|
@ -334,6 +350,15 @@ tdbBegin(pEnv, &txn);
|
||||||
// commit current transaction
|
// commit current transaction
|
||||||
tdbCommit(pEnv, txn);
|
tdbCommit(pEnv, txn);
|
||||||
tdbPostCommit(pEnv, txn);
|
tdbPostCommit(pEnv, txn);
|
||||||
|
|
||||||
|
closePool(pPool);
|
||||||
|
|
||||||
|
// Close a database
|
||||||
|
tdbTbClose(pDb);
|
||||||
|
|
||||||
|
// Close Env
|
||||||
|
ret = tdbClose(pEnv);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TEST(tdb_test, DISABLED_simple_insert1) {
|
// TEST(tdb_test, DISABLED_simple_insert1) {
|
||||||
|
@ -407,6 +432,8 @@ TEST(tdb_test, simple_insert1) {
|
||||||
tdbCommit(pEnv, txn);
|
tdbCommit(pEnv, txn);
|
||||||
tdbPostCommit(pEnv, txn);
|
tdbPostCommit(pEnv, txn);
|
||||||
|
|
||||||
|
closePool(pPool);
|
||||||
|
|
||||||
{ // Query the data
|
{ // Query the data
|
||||||
void *pVal = NULL;
|
void *pVal = NULL;
|
||||||
int vLen;
|
int vLen;
|
||||||
|
|
|
@ -0,0 +1,835 @@
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
|
||||||
|
#define ALLOW_FORBID_FUNC
|
||||||
|
#include "os.h"
|
||||||
|
#include "tdb.h"
|
||||||
|
|
||||||
|
#include <shared_mutex>
|
||||||
|
#include <string>
|
||||||
|
#include <thread>
|
||||||
|
#include <vector>
|
||||||
|
#include "tlog.h"
|
||||||
|
|
||||||
|
typedef struct SPoolMem {
|
||||||
|
int64_t size;
|
||||||
|
struct SPoolMem *prev;
|
||||||
|
struct SPoolMem *next;
|
||||||
|
} SPoolMem;
|
||||||
|
|
||||||
|
static SPoolMem *openPool() {
|
||||||
|
SPoolMem *pPool = (SPoolMem *)taosMemoryMalloc(sizeof(*pPool));
|
||||||
|
|
||||||
|
pPool->prev = pPool->next = pPool;
|
||||||
|
pPool->size = 0;
|
||||||
|
|
||||||
|
return pPool;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void clearPool(SPoolMem *pPool) {
|
||||||
|
SPoolMem *pMem;
|
||||||
|
|
||||||
|
do {
|
||||||
|
pMem = pPool->next;
|
||||||
|
|
||||||
|
if (pMem == pPool) break;
|
||||||
|
|
||||||
|
pMem->next->prev = pMem->prev;
|
||||||
|
pMem->prev->next = pMem->next;
|
||||||
|
pPool->size -= pMem->size;
|
||||||
|
|
||||||
|
taosMemoryFree(pMem);
|
||||||
|
} while (1);
|
||||||
|
|
||||||
|
assert(pPool->size == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void closePool(SPoolMem *pPool) {
|
||||||
|
clearPool(pPool);
|
||||||
|
taosMemoryFree(pPool);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void *poolMalloc(void *arg, size_t size) {
|
||||||
|
void *ptr = NULL;
|
||||||
|
SPoolMem *pPool = (SPoolMem *)arg;
|
||||||
|
SPoolMem *pMem;
|
||||||
|
|
||||||
|
pMem = (SPoolMem *)taosMemoryMalloc(sizeof(*pMem) + size);
|
||||||
|
if (pMem == NULL) {
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
pMem->size = sizeof(*pMem) + size;
|
||||||
|
pMem->next = pPool->next;
|
||||||
|
pMem->prev = pPool;
|
||||||
|
|
||||||
|
pPool->next->prev = pMem;
|
||||||
|
pPool->next = pMem;
|
||||||
|
pPool->size += pMem->size;
|
||||||
|
|
||||||
|
ptr = (void *)(&pMem[1]);
|
||||||
|
return ptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void poolFree(void *arg, void *ptr) {
|
||||||
|
SPoolMem *pPool = (SPoolMem *)arg;
|
||||||
|
SPoolMem *pMem;
|
||||||
|
|
||||||
|
pMem = &(((SPoolMem *)ptr)[-1]);
|
||||||
|
|
||||||
|
pMem->next->prev = pMem->prev;
|
||||||
|
pMem->prev->next = pMem->next;
|
||||||
|
pPool->size -= pMem->size;
|
||||||
|
|
||||||
|
taosMemoryFree(pMem);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int tKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
|
||||||
|
int k1, k2;
|
||||||
|
|
||||||
|
std::string s1((char *)pKey1 + 3, kLen1 - 3);
|
||||||
|
std::string s2((char *)pKey2 + 3, kLen2 - 3);
|
||||||
|
k1 = stoi(s1);
|
||||||
|
k2 = stoi(s2);
|
||||||
|
|
||||||
|
if (k1 < k2) {
|
||||||
|
return -1;
|
||||||
|
} else if (k1 > k2) {
|
||||||
|
return 1;
|
||||||
|
} else {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int tDefaultKeyCmpr(const void *pKey1, int keyLen1, const void *pKey2, int keyLen2) {
|
||||||
|
int mlen;
|
||||||
|
int cret;
|
||||||
|
|
||||||
|
ASSERT(keyLen1 > 0 && keyLen2 > 0 && pKey1 != NULL && pKey2 != NULL);
|
||||||
|
|
||||||
|
mlen = keyLen1 < keyLen2 ? keyLen1 : keyLen2;
|
||||||
|
cret = memcmp(pKey1, pKey2, mlen);
|
||||||
|
if (cret == 0) {
|
||||||
|
if (keyLen1 < keyLen2) {
|
||||||
|
cret = -1;
|
||||||
|
} else if (keyLen1 > keyLen2) {
|
||||||
|
cret = 1;
|
||||||
|
} else {
|
||||||
|
cret = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return cret;
|
||||||
|
}
|
||||||
|
|
||||||
|
static TDB *openEnv(char const *envName, int const pageSize, int const pageNum) {
|
||||||
|
TDB *pEnv = NULL;
|
||||||
|
|
||||||
|
int ret = tdbOpen(envName, pageSize, pageNum, &pEnv, 0);
|
||||||
|
if (ret) {
|
||||||
|
pEnv = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pEnv;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void generateBigVal(char *val, int valLen) {
|
||||||
|
for (int i = 0; i < valLen; ++i) {
|
||||||
|
char c = char(i & 0xff);
|
||||||
|
if (c == 0) {
|
||||||
|
c = 1;
|
||||||
|
}
|
||||||
|
val[i] = c;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void insertOfp(void) {
|
||||||
|
int ret = 0;
|
||||||
|
|
||||||
|
// open Env
|
||||||
|
int const pageSize = 4096;
|
||||||
|
int const pageNum = 64;
|
||||||
|
TDB *pEnv = openEnv("tdb", pageSize, pageNum);
|
||||||
|
GTEST_ASSERT_NE(pEnv, nullptr);
|
||||||
|
|
||||||
|
// open db
|
||||||
|
TTB *pDb = NULL;
|
||||||
|
tdb_cmpr_fn_t compFunc = tKeyCmpr;
|
||||||
|
// ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0);
|
||||||
|
ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
|
// open the pool
|
||||||
|
SPoolMem *pPool = openPool();
|
||||||
|
|
||||||
|
// start a transaction
|
||||||
|
TXN *txn = NULL;
|
||||||
|
|
||||||
|
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
|
||||||
|
|
||||||
|
// generate value payload
|
||||||
|
// char val[((4083 - 4 - 3 - 2) + 1) * 100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
|
||||||
|
char val[32605];
|
||||||
|
int valLen = sizeof(val) / sizeof(val[0]);
|
||||||
|
generateBigVal(val, valLen);
|
||||||
|
|
||||||
|
// insert the generated big data
|
||||||
|
// char const *key = "key1";
|
||||||
|
char const *key = "key123456789";
|
||||||
|
ret = tdbTbInsert(pDb, key, strlen(key) + 1, val, valLen, txn);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
|
// commit current transaction
|
||||||
|
tdbCommit(pEnv, txn);
|
||||||
|
tdbPostCommit(pEnv, txn);
|
||||||
|
|
||||||
|
closePool(pPool);
|
||||||
|
|
||||||
|
// Close a database
|
||||||
|
tdbTbClose(pDb);
|
||||||
|
|
||||||
|
// Close Env
|
||||||
|
ret = tdbClose(pEnv);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void clearDb(char const *db) { taosRemoveDir(db); }
|
||||||
|
|
||||||
|
TEST(TdbPageRecycleTest, DISABLED_TbInsertTest) {
|
||||||
|
// TEST(TdbPageRecycleTest, TbInsertTest) {
|
||||||
|
// ofp inserting
|
||||||
|
clearDb("tdb");
|
||||||
|
insertOfp();
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(TdbPageRecycleTest, DISABLED_TbGetTest) {
|
||||||
|
// TEST(TdbPageRecycleTest, TbGetTest) {
|
||||||
|
clearDb("tdb");
|
||||||
|
insertOfp();
|
||||||
|
|
||||||
|
// open Env
|
||||||
|
int const pageSize = 4096;
|
||||||
|
int const pageNum = 64;
|
||||||
|
TDB *pEnv = openEnv("tdb", pageSize, pageNum);
|
||||||
|
GTEST_ASSERT_NE(pEnv, nullptr);
|
||||||
|
|
||||||
|
// open db
|
||||||
|
TTB *pDb = NULL;
|
||||||
|
tdb_cmpr_fn_t compFunc = tKeyCmpr;
|
||||||
|
// int ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0);
|
||||||
|
int ret = tdbTbOpen("ofp_insert.db", 12, -1, compFunc, pEnv, &pDb, 0);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
|
// generate value payload
|
||||||
|
// char val[((4083 - 4 - 3 - 2) + 1) * 100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
|
||||||
|
char val[32605];
|
||||||
|
int valLen = sizeof(val) / sizeof(val[0]);
|
||||||
|
generateBigVal(val, valLen);
|
||||||
|
|
||||||
|
{ // Query the data
|
||||||
|
void *pVal = NULL;
|
||||||
|
int vLen;
|
||||||
|
|
||||||
|
// char const *key = "key1";
|
||||||
|
char const *key = "key123456789";
|
||||||
|
ret = tdbTbGet(pDb, key, strlen(key), &pVal, &vLen);
|
||||||
|
ASSERT(ret == 0);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
|
GTEST_ASSERT_EQ(vLen, valLen);
|
||||||
|
GTEST_ASSERT_EQ(memcmp(val, pVal, vLen), 0);
|
||||||
|
|
||||||
|
tdbFree(pVal);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(TdbPageRecycleTest, DISABLED_TbDeleteTest) {
|
||||||
|
// TEST(TdbPageRecycleTest, TbDeleteTest) {
|
||||||
|
int ret = 0;
|
||||||
|
|
||||||
|
taosRemoveDir("tdb");
|
||||||
|
|
||||||
|
// open Env
|
||||||
|
int const pageSize = 4096;
|
||||||
|
int const pageNum = 64;
|
||||||
|
TDB *pEnv = openEnv("tdb", pageSize, pageNum);
|
||||||
|
GTEST_ASSERT_NE(pEnv, nullptr);
|
||||||
|
|
||||||
|
// open db
|
||||||
|
TTB *pDb = NULL;
|
||||||
|
tdb_cmpr_fn_t compFunc = tKeyCmpr;
|
||||||
|
ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
|
// open the pool
|
||||||
|
SPoolMem *pPool = openPool();
|
||||||
|
|
||||||
|
// start a transaction
|
||||||
|
TXN *txn;
|
||||||
|
|
||||||
|
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
|
||||||
|
|
||||||
|
// generate value payload
|
||||||
|
// char val[((4083 - 4 - 3 - 2) + 1) * 100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
|
||||||
|
char val[((4083 - 4 - 3 - 2) + 1) * 2]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
|
||||||
|
int valLen = sizeof(val) / sizeof(val[0]);
|
||||||
|
generateBigVal(val, valLen);
|
||||||
|
|
||||||
|
{ // insert the generated big data
|
||||||
|
ret = tdbTbInsert(pDb, "key1", strlen("key1"), val, valLen, txn);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
{ // query the data
|
||||||
|
void *pVal = NULL;
|
||||||
|
int vLen;
|
||||||
|
|
||||||
|
ret = tdbTbGet(pDb, "key1", strlen("key1"), &pVal, &vLen);
|
||||||
|
ASSERT(ret == 0);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
|
GTEST_ASSERT_EQ(vLen, valLen);
|
||||||
|
GTEST_ASSERT_EQ(memcmp(val, pVal, vLen), 0);
|
||||||
|
|
||||||
|
tdbFree(pVal);
|
||||||
|
}
|
||||||
|
/* open to debug committed file
|
||||||
|
tdbCommit(pEnv, &txn);
|
||||||
|
tdbTxnClose(&txn);
|
||||||
|
|
||||||
|
++txnid;
|
||||||
|
tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
|
||||||
|
tdbBegin(pEnv, &txn);
|
||||||
|
*/
|
||||||
|
{ // upsert the data
|
||||||
|
ret = tdbTbUpsert(pDb, "key1", strlen("key1"), "value1", strlen("value1"), txn);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
{ // query the upserted data
|
||||||
|
void *pVal = NULL;
|
||||||
|
int vLen;
|
||||||
|
|
||||||
|
ret = tdbTbGet(pDb, "key1", strlen("key1"), &pVal, &vLen);
|
||||||
|
ASSERT(ret == 0);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
|
GTEST_ASSERT_EQ(vLen, strlen("value1"));
|
||||||
|
GTEST_ASSERT_EQ(memcmp("value1", pVal, vLen), 0);
|
||||||
|
|
||||||
|
tdbFree(pVal);
|
||||||
|
}
|
||||||
|
|
||||||
|
{ // delete the data
|
||||||
|
ret = tdbTbDelete(pDb, "key1", strlen("key1"), txn);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
{ // query the deleted data
|
||||||
|
void *pVal = NULL;
|
||||||
|
int vLen = -1;
|
||||||
|
|
||||||
|
ret = tdbTbGet(pDb, "key1", strlen("key1"), &pVal, &vLen);
|
||||||
|
ASSERT(ret == -1);
|
||||||
|
GTEST_ASSERT_EQ(ret, -1);
|
||||||
|
|
||||||
|
GTEST_ASSERT_EQ(vLen, -1);
|
||||||
|
GTEST_ASSERT_EQ(pVal, nullptr);
|
||||||
|
|
||||||
|
tdbFree(pVal);
|
||||||
|
}
|
||||||
|
|
||||||
|
// commit current transaction
|
||||||
|
tdbCommit(pEnv, txn);
|
||||||
|
tdbPostCommit(pEnv, txn);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(TdbPageRecycleTest, DISABLED_simple_insert1) {
|
||||||
|
// TEST(TdbPageRecycleTest, simple_insert1) {
|
||||||
|
int ret;
|
||||||
|
TDB *pEnv;
|
||||||
|
TTB *pDb;
|
||||||
|
tdb_cmpr_fn_t compFunc;
|
||||||
|
int nData = 1;
|
||||||
|
TXN *txn;
|
||||||
|
int const pageSize = 4096;
|
||||||
|
|
||||||
|
taosRemoveDir("tdb");
|
||||||
|
|
||||||
|
// Open Env
|
||||||
|
ret = tdbOpen("tdb", pageSize, 64, &pEnv, 0);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
|
// Create a database
|
||||||
|
compFunc = tKeyCmpr;
|
||||||
|
ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb, 0);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
|
{
|
||||||
|
char key[64];
|
||||||
|
// char val[(4083 - 4 - 3 - 2)]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
|
||||||
|
char val[(4083 - 4 - 3 - 2) + 1]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
|
||||||
|
int64_t poolLimit = 4096; // 1M pool limit
|
||||||
|
SPoolMem *pPool;
|
||||||
|
|
||||||
|
// open the pool
|
||||||
|
pPool = openPool();
|
||||||
|
|
||||||
|
// start a transaction
|
||||||
|
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
|
||||||
|
|
||||||
|
for (int iData = 1; iData <= nData; iData++) {
|
||||||
|
sprintf(key, "key0");
|
||||||
|
sprintf(val, "value%d", iData);
|
||||||
|
|
||||||
|
// ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), &txn);
|
||||||
|
// GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
|
// generate value payload
|
||||||
|
int valLen = sizeof(val) / sizeof(val[0]);
|
||||||
|
for (int i = 6; i < valLen; ++i) {
|
||||||
|
char c = char(i & 0xff);
|
||||||
|
if (c == 0) {
|
||||||
|
c = 1;
|
||||||
|
}
|
||||||
|
val[i] = c;
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = tdbTbInsert(pDb, "key1", strlen("key1"), val, valLen, txn);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
|
// if pool is full, commit the transaction and start a new one
|
||||||
|
if (pPool->size >= poolLimit) {
|
||||||
|
// commit current transaction
|
||||||
|
tdbCommit(pEnv, txn);
|
||||||
|
tdbPostCommit(pEnv, txn);
|
||||||
|
|
||||||
|
// start a new transaction
|
||||||
|
clearPool(pPool);
|
||||||
|
|
||||||
|
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// commit the transaction
|
||||||
|
tdbCommit(pEnv, txn);
|
||||||
|
tdbPostCommit(pEnv, txn);
|
||||||
|
|
||||||
|
{ // Query the data
|
||||||
|
void *pVal = NULL;
|
||||||
|
int vLen;
|
||||||
|
|
||||||
|
for (int i = 1; i <= nData; i++) {
|
||||||
|
sprintf(key, "key%d", i);
|
||||||
|
// sprintf(val, "value%d", i);
|
||||||
|
|
||||||
|
ret = tdbTbGet(pDb, key, strlen(key), &pVal, &vLen);
|
||||||
|
ASSERT(ret == 0);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
|
GTEST_ASSERT_EQ(vLen, sizeof(val) / sizeof(val[0]));
|
||||||
|
GTEST_ASSERT_EQ(memcmp(val, pVal, vLen), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
tdbFree(pVal);
|
||||||
|
}
|
||||||
|
|
||||||
|
{ // Iterate to query the DB data
|
||||||
|
TBC *pDBC;
|
||||||
|
void *pKey = NULL;
|
||||||
|
void *pVal = NULL;
|
||||||
|
int vLen, kLen;
|
||||||
|
int count = 0;
|
||||||
|
|
||||||
|
ret = tdbTbcOpen(pDb, &pDBC, NULL);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
|
tdbTbcMoveToFirst(pDBC);
|
||||||
|
|
||||||
|
for (;;) {
|
||||||
|
ret = tdbTbcNext(pDBC, &pKey, &kLen, &pVal, &vLen);
|
||||||
|
if (ret < 0) break;
|
||||||
|
|
||||||
|
// std::cout.write((char *)pKey, kLen) /* << " " << kLen */ << " ";
|
||||||
|
// std::cout.write((char *)pVal, vLen) /* << " " << vLen */;
|
||||||
|
// std::cout << std::endl;
|
||||||
|
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
|
||||||
|
GTEST_ASSERT_EQ(count, nData);
|
||||||
|
|
||||||
|
tdbTbcClose(pDBC);
|
||||||
|
|
||||||
|
tdbFree(pKey);
|
||||||
|
tdbFree(pVal);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = tdbTbDrop(pDb);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
|
// Close a database
|
||||||
|
tdbTbClose(pDb);
|
||||||
|
|
||||||
|
// Close Env
|
||||||
|
ret = tdbClose(pEnv);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void insertDb(int nData) {
|
||||||
|
int ret = 0;
|
||||||
|
TDB *pEnv = NULL;
|
||||||
|
TTB *pDb = NULL;
|
||||||
|
tdb_cmpr_fn_t compFunc;
|
||||||
|
TXN *txn = NULL;
|
||||||
|
int const pageSize = 4 * 1024;
|
||||||
|
|
||||||
|
// Open Env
|
||||||
|
ret = tdbOpen("tdb", pageSize, 64, &pEnv, 0);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
|
// Create a database
|
||||||
|
compFunc = tKeyCmpr;
|
||||||
|
ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb, 0);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
|
// 1, insert nData kv
|
||||||
|
{
|
||||||
|
char key[64];
|
||||||
|
char val[(4083 - 4 - 3 - 2) + 1]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
|
||||||
|
int64_t poolLimit = 4096; // 1M pool limit
|
||||||
|
SPoolMem *pPool;
|
||||||
|
|
||||||
|
// open the pool
|
||||||
|
pPool = openPool();
|
||||||
|
|
||||||
|
// start a transaction
|
||||||
|
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
|
||||||
|
|
||||||
|
for (int iData = 0; iData < nData; ++iData) {
|
||||||
|
sprintf(key, "key%03d", iData);
|
||||||
|
sprintf(val, "value%03d", iData);
|
||||||
|
|
||||||
|
ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), txn);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
// if pool is full, commit the transaction and start a new one
|
||||||
|
if (pPool->size >= poolLimit) {
|
||||||
|
// commit current transaction
|
||||||
|
tdbCommit(pEnv, txn);
|
||||||
|
tdbPostCommit(pEnv, txn);
|
||||||
|
|
||||||
|
// start a new transaction
|
||||||
|
clearPool(pPool);
|
||||||
|
|
||||||
|
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// commit the transaction
|
||||||
|
tdbCommit(pEnv, txn);
|
||||||
|
tdbPostCommit(pEnv, txn);
|
||||||
|
|
||||||
|
// 2, delete nData/2 records
|
||||||
|
|
||||||
|
closePool(pPool);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close a database
|
||||||
|
tdbTbClose(pDb);
|
||||||
|
|
||||||
|
// Close Env
|
||||||
|
ret = tdbClose(pEnv);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
|
system("ls -l ./tdb");
|
||||||
|
}
|
||||||
|
|
||||||
|
static void deleteDb(int nData) {
|
||||||
|
int ret = 0;
|
||||||
|
TDB *pEnv = NULL;
|
||||||
|
TTB *pDb = NULL;
|
||||||
|
tdb_cmpr_fn_t compFunc;
|
||||||
|
TXN *txn = NULL;
|
||||||
|
int const pageSize = 4 * 1024;
|
||||||
|
|
||||||
|
// Open Env
|
||||||
|
ret = tdbOpen("tdb", pageSize, 64, &pEnv, 0);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
|
// Create a database
|
||||||
|
compFunc = tKeyCmpr;
|
||||||
|
ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb, 0);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
|
// 2, delete nData/2 records
|
||||||
|
{
|
||||||
|
char key[64];
|
||||||
|
char val[(4083 - 4 - 3 - 2) + 1]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
|
||||||
|
int64_t poolLimit = 4096; // 1M pool limit
|
||||||
|
SPoolMem *pPool;
|
||||||
|
|
||||||
|
// open the pool
|
||||||
|
pPool = openPool();
|
||||||
|
|
||||||
|
// start a transaction
|
||||||
|
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
|
||||||
|
|
||||||
|
for (int iData = 0; iData < nData; iData++) {
|
||||||
|
// if (iData % 2 == 0) continue;
|
||||||
|
|
||||||
|
sprintf(key, "key%03d", iData);
|
||||||
|
sprintf(val, "value%03d", iData);
|
||||||
|
|
||||||
|
{ // delete the data
|
||||||
|
ret = tdbTbDelete(pDb, key, strlen(key), txn);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
}
|
||||||
|
// if pool is full, commit the transaction and start a new one
|
||||||
|
if (pPool->size >= poolLimit) {
|
||||||
|
// commit current transaction
|
||||||
|
tdbCommit(pEnv, txn);
|
||||||
|
tdbPostCommit(pEnv, txn);
|
||||||
|
|
||||||
|
// start a new transaction
|
||||||
|
clearPool(pPool);
|
||||||
|
|
||||||
|
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// commit the transaction
|
||||||
|
tdbCommit(pEnv, txn);
|
||||||
|
tdbPostCommit(pEnv, txn);
|
||||||
|
|
||||||
|
closePool(pPool);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close a database
|
||||||
|
tdbTbClose(pDb);
|
||||||
|
|
||||||
|
// Close Env
|
||||||
|
ret = tdbClose(pEnv);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
|
system("ls -l ./tdb");
|
||||||
|
}
|
||||||
|
|
||||||
|
static const int nDataConst = 256 * 19;
|
||||||
|
|
||||||
|
// TEST(TdbPageRecycleTest, DISABLED_seq_insert) {
|
||||||
|
TEST(TdbPageRecycleTest, seq_insert) {
|
||||||
|
clearDb("tdb");
|
||||||
|
insertDb(nDataConst);
|
||||||
|
}
|
||||||
|
|
||||||
|
// TEST(TdbPageRecycleTest, DISABLED_seq_delete) {
|
||||||
|
TEST(TdbPageRecycleTest, seq_delete) { deleteDb(nDataConst); }
|
||||||
|
|
||||||
|
// TEST(TdbPageRecycleTest, DISABLED_recycly_insert) {
|
||||||
|
TEST(TdbPageRecycleTest, recycly_insert) { insertDb(nDataConst); }
|
||||||
|
|
||||||
|
// TEST(TdbPageRecycleTest, DISABLED_recycly_seq_insert_ofp) {
|
||||||
|
TEST(TdbPageRecycleTest, recycly_seq_insert_ofp) {
|
||||||
|
clearDb("tdb");
|
||||||
|
insertOfp();
|
||||||
|
system("ls -l ./tdb");
|
||||||
|
}
|
||||||
|
|
||||||
|
static void deleteOfp(void) {
|
||||||
|
// open Env
|
||||||
|
int ret = 0;
|
||||||
|
int const pageSize = 4096;
|
||||||
|
int const pageNum = 64;
|
||||||
|
TDB *pEnv = openEnv("tdb", pageSize, pageNum);
|
||||||
|
GTEST_ASSERT_NE(pEnv, nullptr);
|
||||||
|
|
||||||
|
// open db
|
||||||
|
TTB *pDb = NULL;
|
||||||
|
tdb_cmpr_fn_t compFunc = tKeyCmpr;
|
||||||
|
ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
|
// open the pool
|
||||||
|
SPoolMem *pPool = openPool();
|
||||||
|
|
||||||
|
// start a transaction
|
||||||
|
TXN *txn;
|
||||||
|
|
||||||
|
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
|
||||||
|
|
||||||
|
{ // delete the data
|
||||||
|
char const *key = "key123456789";
|
||||||
|
ret = tdbTbDelete(pDb, key, strlen(key) + 1, txn);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// commit current transaction
|
||||||
|
tdbCommit(pEnv, txn);
|
||||||
|
tdbPostCommit(pEnv, txn);
|
||||||
|
|
||||||
|
closePool(pPool);
|
||||||
|
|
||||||
|
ret = tdbTbDrop(pDb);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
|
// Close a database
|
||||||
|
tdbTbClose(pDb);
|
||||||
|
|
||||||
|
// Close Env
|
||||||
|
ret = tdbClose(pEnv);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// TEST(TdbPageRecycleTest, DISABLED_seq_delete_ofp) {
|
||||||
|
TEST(TdbPageRecycleTest, seq_delete_ofp) {
|
||||||
|
deleteOfp();
|
||||||
|
system("ls -l ./tdb");
|
||||||
|
}
|
||||||
|
|
||||||
|
// TEST(TdbPageRecycleTest, DISABLED_recycly_seq_insert_ofp_again) {
|
||||||
|
TEST(TdbPageRecycleTest, recycly_seq_insert_ofp_again) {
|
||||||
|
insertOfp();
|
||||||
|
system("ls -l ./tdb");
|
||||||
|
}
|
||||||
|
|
||||||
|
// TEST(TdbPageRecycleTest, DISABLED_recycly_seq_insert_ofp_nocommit) {
|
||||||
|
TEST(TdbPageRecycleTest, recycly_seq_insert_ofp_nocommit) {
|
||||||
|
clearDb("tdb");
|
||||||
|
insertOfp();
|
||||||
|
system("ls -l ./tdb");
|
||||||
|
|
||||||
|
// open Env
|
||||||
|
int ret = 0;
|
||||||
|
int const pageSize = 4096;
|
||||||
|
int const pageNum = 64;
|
||||||
|
TDB *pEnv = openEnv("tdb", pageSize, pageNum);
|
||||||
|
GTEST_ASSERT_NE(pEnv, nullptr);
|
||||||
|
|
||||||
|
// open db
|
||||||
|
TTB *pDb = NULL;
|
||||||
|
tdb_cmpr_fn_t compFunc = tKeyCmpr;
|
||||||
|
ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
|
// open the pool
|
||||||
|
SPoolMem *pPool = openPool();
|
||||||
|
|
||||||
|
// start a transaction
|
||||||
|
TXN *txn;
|
||||||
|
|
||||||
|
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
|
||||||
|
|
||||||
|
{ // delete the data
|
||||||
|
char const *key = "key123456789";
|
||||||
|
ret = tdbTbDelete(pDb, key, strlen(key) + 1, txn);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 1, insert nData kv
|
||||||
|
{
|
||||||
|
int nData = nDataConst;
|
||||||
|
char key[64];
|
||||||
|
char val[(4083 - 4 - 3 - 2) + 1]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
|
||||||
|
int64_t poolLimit = 4096; // 1M pool limit
|
||||||
|
|
||||||
|
for (int iData = 0; iData < nData; ++iData) {
|
||||||
|
sprintf(key, "key%03d", iData);
|
||||||
|
sprintf(val, "value%03d", iData);
|
||||||
|
|
||||||
|
ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), txn);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
// if pool is full, commit the transaction and start a new one
|
||||||
|
if (pPool->size >= poolLimit) {
|
||||||
|
// commit current transaction
|
||||||
|
tdbCommit(pEnv, txn);
|
||||||
|
tdbPostCommit(pEnv, txn);
|
||||||
|
|
||||||
|
// start a new transaction
|
||||||
|
clearPool(pPool);
|
||||||
|
|
||||||
|
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// commit current transaction
|
||||||
|
tdbCommit(pEnv, txn);
|
||||||
|
tdbPostCommit(pEnv, txn);
|
||||||
|
|
||||||
|
closePool(pPool);
|
||||||
|
|
||||||
|
// Close a database
|
||||||
|
tdbTbClose(pDb);
|
||||||
|
|
||||||
|
// Close Env
|
||||||
|
ret = tdbClose(pEnv);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
|
system("ls -l ./tdb");
|
||||||
|
}
|
||||||
|
|
||||||
|
// TEST(TdbPageRecycleTest, DISABLED_recycly_delete_interior_ofp_nocommit) {
|
||||||
|
TEST(TdbPageRecycleTest, recycly_delete_interior_ofp_nocommit) {
|
||||||
|
clearDb("tdb");
|
||||||
|
|
||||||
|
// open Env
|
||||||
|
int ret = 0;
|
||||||
|
int const pageSize = 4096;
|
||||||
|
int const pageNum = 64;
|
||||||
|
TDB *pEnv = openEnv("tdb", pageSize, pageNum);
|
||||||
|
GTEST_ASSERT_NE(pEnv, nullptr);
|
||||||
|
|
||||||
|
// open db
|
||||||
|
TTB *pDb = NULL;
|
||||||
|
tdb_cmpr_fn_t compFunc = NULL; // tKeyCmpr;
|
||||||
|
ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
|
// open the pool
|
||||||
|
SPoolMem *pPool = openPool();
|
||||||
|
|
||||||
|
// start a transaction
|
||||||
|
TXN *txn;
|
||||||
|
|
||||||
|
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
|
||||||
|
|
||||||
|
char key[1024] = {0};
|
||||||
|
int count = sizeof(key) / sizeof(key[0]);
|
||||||
|
for (int i = 0; i < count - 1; ++i) {
|
||||||
|
key[i] = 'a';
|
||||||
|
}
|
||||||
|
|
||||||
|
// insert n ofp keys to form 2-layer btree
|
||||||
|
{
|
||||||
|
for (int i = 0; i < 7; ++i) {
|
||||||
|
// sprintf(&key[count - 2], "%c", i);
|
||||||
|
key[count - 2] = '0' + i;
|
||||||
|
|
||||||
|
ret = tdbTbInsert(pDb, key, count, NULL, NULL, txn);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
// delete one interior key
|
||||||
|
{
|
||||||
|
sprintf(&key[count - 2], "%c", 2);
|
||||||
|
key[count - 2] = '0' + 2;
|
||||||
|
|
||||||
|
ret = tdbTbDelete(pDb, key, strlen(key) + 1, txn);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
// commit current transaction
|
||||||
|
tdbCommit(pEnv, txn);
|
||||||
|
tdbPostCommit(pEnv, txn);
|
||||||
|
|
||||||
|
closePool(pPool);
|
||||||
|
|
||||||
|
// Close a database
|
||||||
|
tdbTbClose(pDb);
|
||||||
|
|
||||||
|
// Close Env
|
||||||
|
ret = tdbClose(pEnv);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
|
system("ls -l ./tdb");
|
||||||
|
}
|
|
@ -568,6 +568,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_SELECTED_EXPR, "Invalid SELECTed ex
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_GET_META_ERROR, "Fail to get table info")
|
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_GET_META_ERROR, "Fail to get table info")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_NOT_UNIQUE_TABLE_ALIAS, "Not unique table/alias")
|
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_NOT_UNIQUE_TABLE_ALIAS, "Not unique table/alias")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED_FUNC, "System table not allowed")
|
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED_FUNC, "System table not allowed")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED, "System table not allowed")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error")
|
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error")
|
||||||
|
|
||||||
//planner
|
//planner
|
||||||
|
|
|
@ -129,6 +129,7 @@ sql DROP INDEX sma_index_3 ;
|
||||||
|
|
||||||
print ========== step8
|
print ========== step8
|
||||||
sql drop database if exists db;
|
sql drop database if exists db;
|
||||||
|
sleep 2000
|
||||||
sql create database db duration 300;
|
sql create database db duration 300;
|
||||||
sql use db;
|
sql use db;
|
||||||
sql create table stb1(ts timestamp, c_int int, c_bint bigint, c_sint smallint, c_tint tinyint,c_float float, c_double double, c_bool bool,c_binary binary(16), c_nchar nchar(32), c_ts timestamp,c_tint_un tinyint unsigned, c_sint_un smallint unsigned,c_int_un int unsigned, c_bint_un bigint unsigned) tags (t_int int);
|
sql create table stb1(ts timestamp, c_int int, c_bint bigint, c_sint smallint, c_tint tinyint,c_float float, c_double double, c_bool bool,c_binary binary(16), c_nchar nchar(32), c_ts timestamp,c_tint_un tinyint unsigned, c_sint_un smallint unsigned,c_int_un int unsigned, c_bint_un bigint unsigned) tags (t_int int);
|
||||||
|
|
Loading…
Reference in New Issue